MQTT实现消息推送

本贴最后更新于 4110 天前,其中的信息可能已经时移世易

MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法

[java]

mqttClient.registerSimpleHandler(simpleCallbackHandler);
// 注册接收消息方法

[/java]

和订阅接主题

[java]</pre>
mqttClient.subscribe(TOPICS, QOS_VALUES);

// 订阅接主题
<pre>[/java]

服务端:

[java]</pre>
package com.gmcc.kuchuan.business;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttSimpleCallback;

/**
* MQTT消息发送与接收
* @author Join
*
*/
public class MqttBroker {
private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象
// 连接参数
private final static String CONNECTION_STRING = "tcp://localhost:9901";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = "master";// 客户端标识
private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别
private final static String[] TOPICS = { "Test/TestTopics/Topic1",
"Test/TestTopics/Topic2", "Test/TestTopics/Topic3",
"client/keepalive" };
private static MqttBroker instance = new MqttBroker();

private MqttClient mqttClient;

/**
* 返回实例对象
*
* @return
*/
public static MqttBroker getInstance() {
return instance;
}

/**
* 重新连接服务
*/
private void connect() throws MqttException {
logger.info("connect to mqtt broker.");
mqttClient = new MqttClient(CONNECTION_STRING);
logger.info("***********register Simple Handler***********");
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
logger.info("***********subscribe receiver topics***********");
mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题

logger.info("***********CLIENT_ID:" + CLIENT_ID);
/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/
mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],
true);// 增加心跳,保持网络通畅
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

logger.info("send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
mqttClient.publish("GMCC/client/" + clientId, message.getBytes(),
0, false);
} catch (MqttException e) {
logger.error(e.getCause());
e.printStackTrace();
}
}

/**
* 简单回调函数,处理server接收到的主题消息
*
* @author Join
*
*/
class SimpleCallbackHandler implements MqttSimpleCallback {

/**
* 当客户机和broker意外断开时触发 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
}

/**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos,
boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题: " + topicName);
System.out.println("消息数据: " + new String(payload));
System.out.println("消息级别(0,1,2): " + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "
+ retained);
}

}

public static void main(String[] args) {
new MqttBroker().sendMessage("client", "message");
}
}
<pre>
[/java]

Android客户端:

核心代码:MQTTConnection内部类

[java]

<pre class="java" name="code">import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;

import android.app.AlarmManager;
import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.SharedPreferences;
import android.database.Cursor;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Binder;
import android.os.Bundle;
import android.os.IBinder;
import android.provider.ContactsContract;
import android.util.Log;
//此部分项目导包已被删除</pre><pre class="java" name="code">import com.ibm.mqtt.IMqttClient;
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttPersistence;
import com.ibm.mqtt.MqttPersistenceException;
import com.ibm.mqtt.MqttSimpleCallback;

/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
*/
public class PushService extends Service {
private MyBinder mBinder = new MyBinder();
// this is the log tag
public static final String TAG = "PushService";

// the IP address, where your MQTT broker is running.
private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";//
// the port at which the broker is running.
private static int MQTT_BROKER_PORT_NUM = 9901;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a
// clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
// this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 60 * 15;
// Set quality of services to 0 (at most once delivery), since we don't want
// push notifications
// arrive more than once. However, this means that some messages might get
// lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;

// MQTT client ID, which is given the broker. In this example, I also use
// this for the topic header.
// You can use this to run push notifications for multiple apps with one
// MQTT broker.
public static String MQTT_CLIENT_ID = "client";

// These are the actions for the service (name are descriptive enough)
public static final String ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
+ ".KEEP_ALIVE";
private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
+ ".RECONNECT";

// Connection log for the push service. Good for debugging.
private ConnectionLog mLog;

// Connectivity manager to determining, when the phone loses connection
private ConnectivityManager mConnMan;
// Notification manager to displaying arrived push notifications
private NotificationManager mNotifMan;

// Whether or not the service has been started.
private boolean mStarted;

// This the application level keep-alive interval, that is used by the
// AlarmManager
// to keep the connection active, even when the device goes to sleep.
private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;

// Retry intervals, when the connection is lost.
private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;

// Preferences instance
private SharedPreferences mPrefs;
// We store in the preferences, whether or not the service has been started
public static final String PREF_STARTED = "isStarted";
// We also store the deviceID (target)
public static final String PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String PREF_RETRY = "retryInterval";

// Notification title
public static String NOTIF_TITLE = "client";
// Notification id
private static final int NOTIF_CONNECTED = 0;

// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private long mStartTime;
boolean mShowFlag = true;// 是否显示通知
public static Context ctx;
private boolean mRunFlag = true;// 是否向服务器发送心跳
Timer mTimer = new Timer();

// Static method to start the service
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_START);
ctx.startService(i);
PushService.ctx = ctx;
}

// Static method to stop the service
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}

// Static method to send a keep alive message
public static void actionPing(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}

@Override
public void onCreate() {
super.onCreate();

log("Creating service");
mStartTime = System.currentTimeMillis();

try {
mLog = new ConnectionLog();
Log.i(TAG, "Opened log at " + mLog.getPath());
} catch (IOException e) {
Log.e(TAG, "Failed to open log", e);
}

// Get instances of preferences, connectivity manager and notification
// manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);

/*
* If our process was reaped by the system for any reason we need to
* restore our state with merely a call to onCreate. We record the last
* "started" value and restore it here if necessary.
*/
handleCrashedService();
}

// This method does any necessary clean-up need in case the server has been
// destroyed by the system
// and then restarted
private void handleCrashedService() {
if (wasStarted() == true) {
log("Handling crashed service...");
// stop the keep alives
stopKeepAlives();

// Do a clean start
start();
}
}

@Override
public void onDestroy() {
log("Service destroyed (started=" + mStarted + ")");

// Stop the services, if it has been started
if (mStarted == true) {
stop();
}

try {
if (mLog != null)
mLog.close();
} catch (IOException e) {
}
}

@Override
public void onStart(Intent intent, int startId) {
super.onStart(intent, startId);
log("Service started with intent=" + intent);
if (intent == null) {
return;
}
// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true) {
stop();
stopSelf();
} else if (intent.getAction().equals(ACTION_START) == true) {
start();

} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
keepAlive();
} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
if (isNetworkAvailable()) {
reconnectIfNecessary();
}
}
}

public class MyBinder extends Binder {
public PushService getService() {
return PushService.this;
}
}

@Override
public IBinder onBind(Intent intent) {
return mBinder;
}

// log helper function
private void log(String message) {
log(message, null);
}

private void log(String message, Throwable e) {
if (e != null) {
Log.e(TAG, message, e);

} else {
Log.i(TAG, message);
}

if (mLog != null) {
try {
mLog.println(message);
} catch (IOException ex) {
}
}
}

// Reads whether or not the service has been started from the preferences
private boolean wasStarted() {
return mPrefs.getBoolean(PREF_STARTED, false);
}

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started) {
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;
}

private synchronized void start() {
log("Starting service...");

// Do nothing, if the service is already running.
if (mStarted == true) {
Log.w(TAG, "Attempt to start connection that is already active");
return;
}

// Establish an MQTT connection

connect();

// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
mTimer.schedule(new TimerTask() {
@Override
public void run() {
if (!mRunFlag) {
// this.cancel();
// PushService.this.stopSelf();
return;
}
System.out.println("run");
try {
if (isNetworkAvailable()) {
SharedPreferences pref = getSharedPreferences(
"client", 0);
String MOBILE_NUM = pref.getString("MOBILE_NUM", "");
HttpUtil.post(Constants.KEEPALIVE + "&mobile="
+ MOBILE_NUM + "&online_flag=1");
}
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}, 0, 60 * 1000);
// Register a connectivity listener
registerReceiver(mConnectivityChanged, new IntentFilter(
ConnectivityManager.CONNECTIVITY_ACTION));
}

private synchronized void stop() {
// Do nothing, if the service is not running.
if (mStarted == false) {
Log.w(TAG, "Attempt to stop connection not active.");
return;
}

// Save stopped state in the preferences
setStarted(false);

// Remove the connectivity receiver
unregisterReceiver(mConnectivityChanged);
// Any existing reconnect timers should be removed, since we explicitly
// stopping the service.
cancelReconnect();

// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}

//
private synchronized void connect() {
log("Connecting...");
// Thread t = new Thread() {
// @Override
// public void run() {
// fetch the device ID from the preferences.
String deviceID = "GMCC/client/"
+ mPrefs.getString(PREF_DEVICE_ID, null);

// Create a new connection only if the device id is not NULL
try {
mConnection = new MQTTConnection(MQTT_HOST, deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
if (isNetworkAvailable()) {
scheduleReconnect(mStartTime);
}
}
setStarted(true);
// }
// };
// t.start();
// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
}

private synchronized void keepAlive() {
try {
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"), e);

mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}

// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}

// Remove all scheduled keep alives
private void stopKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

// We schedule a reconnect based on the starttime of the service
public void scheduleReconnect(long startTime) {
// the last keep-alive interval
long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);

// Calculate the elapsed time since the start
long now = System.currentTimeMillis();
long elapsed = now - startTime;

// Set an appropriate interval based on the elapsed time since start
if (elapsed < interval) {
interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
} else {
interval = INITIAL_RETRY_INTERVAL;
}

log("Rescheduling connection in " + interval + "ms.");

// Save the new internval
mPrefs.edit().putLong(PREF_RETRY, interval).commit();

// Schedule a reconnect using the alarm manager.
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
}

// Remove the scheduled reconnect
public void cancelReconnect() {
Intent i = new Intent();
i.setClass(PushService.this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

private synchronized void reconnectIfNecessary() {
log("mStarted" + mStarted);
log("mConnection" + mConnection);
if (mStarted == true && mConnection == null) {
log("Reconnecting...");
connect();
}
}

// This receiver listeners for network changes and updates the MQTT
// connection
// accordingly
private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
@Override
public void onReceive(Context context, final Intent intent) {
// Get network info
// Thread mReconnect = new Thread(){
// public void run() {
NetworkInfo info = (NetworkInfo) intent
.getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);
// Is there connectivity?
boolean hasConnectivity = (info != null && info.isConnected()) ? true
: false;

log("Connectivity changed: connected=" + hasConnectivity);

if (hasConnectivity) {
reconnectIfNecessary();
} else if (mConnection != null) {
// Thread cancelConn = new Thread(){
// public void run() {
// // if there no connectivity, make sure MQTT connection is
// destroyed
log("cancelReconnect");
mConnection.disconnect();
mConnection = null;
log("cancelReconnect" + mConnection);
cancelReconnect();
// }
// };
// cancelConn.start();
}
// };
//
// };
// mReconnect.start();
}
};

// Display the topbar notification
private void showNotification(String text, Request request) {

Notification n = new Notification();
n.flags |= Notification.FLAG_SHOW_LIGHTS;
n.flags |= Notification.FLAG_AUTO_CANCEL;
n.defaults = Notification.DEFAULT_ALL;
n.icon = R.drawable.ico;
n.when = System.currentTimeMillis();
Intent intent = new Intent();
Bundle bundle = new Bundle();
bundle.putSerializable("request", request);
bundle.putString("currentTab", "1");
intent.putExtras(bundle);
intent.setClass(this, MainActivity.class);
intent.setAction(Intent.ACTION_MAIN);
intent.addCategory(Intent.CATEGORY_LAUNCHER);
intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK
| Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED);
// Simply open the parent activity
PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0);

// Change the name of the notification here
n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);
mNotifMan.notify(NOTIF_CONNECTED, n);
}

// Check if we are online
private boolean isNetworkAvailable() {
NetworkInfo info = mConnMan.getActiveNetworkInfo();
if (info == null) {
return false;
}
return info.isConnected();
}

<span style="BACKGROUND-COLOR: #ccffff"> // This inner class is a wrapper on top of MQTT client.
private class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;

// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic)
throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + brokerHostName + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);

// Subscribe to an initial topic, which is combination of client ID
// and device ID.
// initTopic = MQTT_CLIENT_ID + "/" + initTopic;
subscribeToTopic(initTopic);

log("Connection established to " + brokerHostName + " on topic "
+ initTopic);

// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
}

// Disconnect
public void disconnect() {
// try {
stopKeepAlives();
log("stopKeepAlives");
Thread t = new Thread() {
public void run() {
try {
mqttClient.disconnect();
log("mqttClient.disconnect();");
} catch (MqttPersistenceException e) {
log("MqttException"
+ (e.getMessage() != null ? e.getMessage()
: " NULL"), e);
}
};
};
t.start();
// } catch (MqttPersistenceException e) {
// log("MqttException"
// + (e.getMessage() != null ? e.getMessage() : " NULL"),
// e);
// }
}

/*
* Send a request to the message broker to be sent messages published
* with the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName) throws MqttException {

if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
log("Connection error" + "No connection");
} else {
String[] topics = { topicName };
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}

/*
* Sends a message to the message broker, requesting that it be
* published to the specified topic.
*/
private void publishToTopic(String topicName, String message)
throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
// a connection
log("No connection to public to");
} else {
mqttClient.publish(topicName, message.getBytes(),
MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);
}
}

/*
* Called if the application loses it's connection to the message
* broker.
*/
public void connectionLost() throws Exception {
log("Loss of connection" + "connection downed");
stopKeepAlives();
// 取消定时发送心跳
mRunFlag = false;
// 向服务器发送请求,更改在线状态
// SharedPreferences pref = getSharedPreferences("client",0);
// String MOBILE_NUM=pref.getString("MOBILE_NUM", "");
// HttpUtil.post(Constants.KEEPALIVE + "&mobile="
// + MOBILE_NUM+"&online_flag=0");
// null itself
mConnection = null;
if (isNetworkAvailable() == true) {
reconnectIfNecessary();
}
}

/*
* Called when we receive a message from the message broker.
*/
public void publishArrived(String topicName, byte[] payload, int qos,
boolean retained) throws MqttException {
// Show a notification
// synchronized (lock) {
String s = new String(payload);
Request request = null;
try {// 解析服务端推送过来的消息
request = XmlPaserTool.getMessage(new ByteArrayInputStream(s
.getBytes()));
// request=Constants.request;
} catch (Exception e) {
e.printStackTrace();
}
final Request mRequest = request;
DownloadInfo down = new DownloadInfo(mRequest);
down.setDownLoad(down);
downloadInfos.add(down);
sendUpdateBroast();
down.start();
showNotification("您有一条新的消息!", mRequest);
Log.d(PushService.TAG, s);
Log.d(PushService.TAG, mRequest.getMessageId());
// 再向服务端推送消息
new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID
+ "/keepalive", "***********send message**********");
}

public void sendKeepAlive() throws MqttException {
log("Sending keep alive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive",
mPrefs.getString(PREF_DEVICE_ID, ""));
}
}

class AdvancedCallbackHandler {
IMqttClient mqttClient = null;
public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别

/**
* 重新连接服务
*/
private void connect() throws MqttException {
String mqttConnSpec = "tcp://" + MQTT_HOST + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
Log.d(TAG, "连接服务器,推送消息");
Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
Log.d(TAG, MQTT_CLIENT_ID + "/keepalive");
// 增加心跳,保持网络通畅
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
"keepalive".getBytes(), QOS_VALUES[0], true);
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

Log.d(TAG, "send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
// mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
// message.getBytes(), 0, false);
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
message.getBytes(), 0, false);
} catch (MqttException e) {
Log.d(TAG, e.getCause() + "");
e.printStackTrace();
}
}
}
</span>
public String getPeople(String phone_number) {
String name = "";
String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME,
ContactsContract.CommonDataKinds.Phone.NUMBER };
Log.d(TAG, "getPeople ---------");
// 将自己添加到 msPeers 中
Cursor cursor = this.getContentResolver().query(
ContactsContract.CommonDataKinds.Phone.CONTENT_URI,
projection, // Which columns to return.
ContactsContract.CommonDataKinds.Phone.NUMBER + " = '"
+ phone_number + "'", // WHERE clause.
null, // WHERE clause value substitution
null); // Sort order.

if (cursor == null) {
Log.d(TAG, "getPeople null");
return "";
}
Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount());
if (cursor.getCount() > 0) {
cursor.moveToPosition(0);

// 取得联系人名字
int nameFieldColumnIndex = cursor
.getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME);
name = cursor.getString(nameFieldColumnIndex);
Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 这里提示
// force
// close
System.out.println("联系人姓名:" + name);
return name;
}
return phone_number;
}

public void sendUpdateBroast() {
Intent intent = new Intent();
intent.setAction("update");
sendBroadcast(intent);
}

public void sendUpdateFinishBroast() {
Intent intent = new Intent();
intent.setAction("updateFinishList");
sendBroadcast(intent);
}

public class DownloadInfo extends Thread {
boolean runflag = true;
Request mRequest;
public float progress;
public MessageBean bean = null;
DownloadInfo download = null;
MessageDetailDao dao = new MessageDetailDao(
PushService.this.getApplicationContext());

public synchronized void stopthread() {
runflag = false;
}

public synchronized boolean getrunflag() {
return runflag;
}

DownloadInfo(Request mRequest) {
this.mRequest = mRequest;

}

public void setDownLoad(DownloadInfo download) {
this.download = download;
}

@Override
public void run() {
try {

File dir = new File(Constants.DOWNLOAD_PATH);
if (!dir.exists()) {
dir.mkdirs();
}
String filePath = Constants.DOWNLOAD_PATH
+ mRequest.getMessageId() + "." + mRequest.getExt();
bean = new MessageBean();
bean.setPath(filePath);
bean.setStatus(0);
bean.setDate(mRequest.getTimestamp());
bean.setLayoutID(R.layout.list_say_he_item);
bean.setPhotoID(R.drawable.receive_ico);
bean.setMessage_key(mRequest.getMessageId());
bean.setPhone_number(mRequest.getReceiver());
bean.setAction(1);
String name = getPeople(mRequest.getSender());
bean.setName(name);
bean.setFileType(Integer.parseInt(mRequest.getCommand()));
if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) {
bean.setMsgIco(R.drawable.music_temp);
bean.setText(name + "给你发送了音乐");
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_CARD)) {
bean.setMsgIco(R.drawable.card_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_LBS)) {
bean.setMsgIco(R.drawable.address_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) {
bean.setText(name + "向你发送了照片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_PIC)) {
bean.setText(name + "向你发送了图片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_SMS)) {
bean.setFileType(0);
}

if (!mRequest.getCommand().equals(Request.TYPE_CARD)
&& !mRequest.getCommand().equals(Request.TYPE_SMS)) {
String path = Constants.FILE_DOWNLOAD_URL
+ mRequest.getMessageId();
URL url = new URL(path);
HttpURLConnection hurlconn = (HttpURLConnection) url
.openConnection();// 基于HTTP协议的连接对象
hurlconn.setConnectTimeout(5000);// 请求超时时间 5s
hurlconn.setRequestMethod("GET");// 请求方式
hurlconn.connect();
long fileSize = hurlconn.getContentLength();
InputStream instream = hurlconn.getInputStream();
byte[] buffer = new byte[1024];
int len = 0;
int number = 0;
RandomAccessFile rasf = new RandomAccessFile(filePath,
"rwd");
while ((len = instream.read(buffer)) != -1) {// 开始下载文件
if (getrunflag() && progress < 100) {
rasf.seek(number);
number += len;
rasf.write(buffer, 0, len);
progress = (((float) number) / fileSize) * 100;
// 发送广播,修改进度条进度
sendUpdateBroast();
} else {
this.interrupt();
if (number != fileSize) {// 取消下载,将已经缓存的未下载完成的文件删除
File file = new File(filePath);
if (file.exists())
file.delete();
}
PushService.downloadInfos.remove(download);
sendUpdateBroast();
return;
}
}
instream.close();
PushService.downloadInfos.remove(download);
sendUpdateBroast();
} else {// 收到消息,将信息保存到数据库

PushService.downloadInfos.remove(download);
sendUpdateBroast();
}
// 将文件信息保存到数据库
dao.create(bean);
sendUpdateFinishBroast();

} catch (Exception e) {
PushService.downloadInfos.remove(download);
sendUpdateBroast();
e.printStackTrace();
}
}
}

public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>();

public ArrayList<DownloadInfo> getDownloadInfos() {
return PushService.downloadInfos;
}

public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) {
PushService.downloadInfos = downloadInfos;
}
}</pre>
<p><br>
ps:</p>
<p>接收者必须订阅发送者的TOPICS才能收到消息</p>
<p> </p>
<pre></pre>
<pre></pre>
<pre></pre>
<pre></pre>

[/java]

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖
  • RYMCU

    RYMCU 致力于打造一个即严谨又活泼、专业又不失有趣,为数百万人服务的开源嵌入式知识学习交流平台。

    4 引用 • 6 回帖 • 52 关注
  • 友情链接

    确认过眼神后的灵魂连接,站在链在!

    24 引用 • 373 回帖
  • 星云链

    星云链是一个开源公链,业内简单的将其称为区块链上的谷歌。其实它不仅仅是区块链搜索引擎,一个公链的所有功能,它基本都有,比如你可以用它来开发部署你的去中心化的 APP,你可以在上面编写智能合约,发送交易等等。3 分钟快速接入星云链 (NAS) 测试网

    3 引用 • 16 回帖 • 5 关注
  • 面试

    面试造航母,上班拧螺丝。多面试,少加班。

    325 引用 • 1395 回帖 • 1 关注
  • IBM

    IBM(国际商业机器公司)或万国商业机器公司,简称 IBM(International Business Machines Corporation),总公司在纽约州阿蒙克市。1911 年托马斯·沃森创立于美国,是全球最大的信息技术和业务解决方案公司,拥有全球雇员 30 多万人,业务遍及 160 多个国家和地区。

    17 引用 • 53 回帖 • 140 关注
  • Google

    Google(Google Inc.,NASDAQ:GOOG)是一家美国上市公司(公有股份公司),于 1998 年 9 月 7 日以私有股份公司的形式创立,设计并管理一个互联网搜索引擎。Google 公司的总部称作“Googleplex”,它位于加利福尼亚山景城。Google 目前被公认为是全球规模最大的搜索引擎,它提供了简单易用的免费服务。不作恶(Don't be evil)是谷歌公司的一项非正式的公司口号。

    49 引用 • 192 回帖
  • 小说

    小说是以刻画人物形象为中心,通过完整的故事情节和环境描写来反映社会生活的文学体裁。

    28 引用 • 108 回帖
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    524 引用 • 4601 回帖 • 700 关注
  • 微服务

    微服务架构是一种架构模式,它提倡将单一应用划分成一组小的服务。服务之间互相协调,互相配合,为用户提供最终价值。每个服务运行在独立的进程中。服务于服务之间才用轻量级的通信机制互相沟通。每个服务都围绕着具体业务构建,能够被独立的部署。

    96 引用 • 155 回帖
  • PHP

    PHP(Hypertext Preprocessor)是一种开源脚本语言。语法吸收了 C 语言、 Java 和 Perl 的特点,主要适用于 Web 开发领域,据说是世界上最好的编程语言。

    179 引用 • 407 回帖 • 492 关注
  • JavaScript

    JavaScript 一种动态类型、弱类型、基于原型的直译式脚本语言,内置支持类型。它的解释器被称为 JavaScript 引擎,为浏览器的一部分,广泛用于客户端的脚本语言,最早是在 HTML 网页上使用,用来给 HTML 网页增加动态功能。

    728 引用 • 1273 回帖 • 1 关注
  • ReactiveX

    ReactiveX 是一个专注于异步编程与控制可观察数据(或者事件)流的 API。它组合了观察者模式,迭代器模式和函数式编程的优秀思想。

    1 引用 • 2 回帖 • 161 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    77 引用 • 430 回帖 • 1 关注
  • sts
    2 引用 • 2 回帖 • 197 关注
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖 • 2 关注
  • Typecho

    Typecho 是一款博客程序,它在 GPLv2 许可证下发行,基于 PHP 构建,可以运行在各种平台上,支持多种数据库(MySQL、PostgreSQL、SQLite)。

    12 引用 • 65 回帖 • 446 关注
  • C

    C 语言是一门通用计算机编程语言,应用广泛。C 语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。

    85 引用 • 165 回帖 • 2 关注
  • Chrome

    Chrome 又称 Google 浏览器,是一个由谷歌公司开发的网页浏览器。该浏览器是基于其他开源软件所编写,包括 WebKit,目标是提升稳定性、速度和安全性,并创造出简单且有效率的使用者界面。

    62 引用 • 289 回帖
  • IPFS

    IPFS(InterPlanetary File System,星际文件系统)是永久的、去中心化保存和共享文件的方法,这是一种内容可寻址、版本化、点对点超媒体的分布式协议。请浏览 IPFS 入门笔记了解更多细节。

    21 引用 • 245 回帖 • 243 关注
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 105 关注
  • GAE

    Google App Engine(GAE)是 Google 管理的数据中心中用于 WEB 应用程序的开发和托管的平台。2008 年 4 月 发布第一个测试版本。目前支持 Python、Java 和 Go 开发部署。全球已有数十万的开发者在其上开发了众多的应用。

    14 引用 • 42 回帖 • 780 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖 • 4 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 53 关注
  • 生活

    生活是指人类生存过程中的各项活动的总和,范畴较广,一般指为幸福的意义而存在。生活实际上是对人生的一种诠释。生活包括人类在社会中与自己息息相关的日常活动和心理影射。

    230 引用 • 1454 回帖 • 1 关注
  • 小薇

    小薇是一个用 Java 写的 QQ 聊天机器人 Web 服务,可以用于社群互动。

    由于 Smart QQ 从 2019 年 1 月 1 日起停止服务,所以该项目也已经停止维护了!

    34 引用 • 467 回帖 • 748 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖