MQTT实现消息推送

本贴最后更新于 4202 天前,其中的信息可能已经时移世易

MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法

[java]

mqttClient.registerSimpleHandler(simpleCallbackHandler);
// 注册接收消息方法

[/java]

和订阅接主题

[java]</pre>
mqttClient.subscribe(TOPICS, QOS_VALUES);

// 订阅接主题
<pre>[/java]

服务端:

[java]</pre>
package com.gmcc.kuchuan.business;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttSimpleCallback;

/**
* MQTT消息发送与接收
* @author Join
*
*/
public class MqttBroker {
private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象
// 连接参数
private final static String CONNECTION_STRING = "tcp://localhost:9901";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = "master";// 客户端标识
private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别
private final static String[] TOPICS = { "Test/TestTopics/Topic1",
"Test/TestTopics/Topic2", "Test/TestTopics/Topic3",
"client/keepalive" };
private static MqttBroker instance = new MqttBroker();

private MqttClient mqttClient;

/**
* 返回实例对象
*
* @return
*/
public static MqttBroker getInstance() {
return instance;
}

/**
* 重新连接服务
*/
private void connect() throws MqttException {
logger.info("connect to mqtt broker.");
mqttClient = new MqttClient(CONNECTION_STRING);
logger.info("***********register Simple Handler***********");
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
logger.info("***********subscribe receiver topics***********");
mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题

logger.info("***********CLIENT_ID:" + CLIENT_ID);
/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/
mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],
true);// 增加心跳,保持网络通畅
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

logger.info("send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
mqttClient.publish("GMCC/client/" + clientId, message.getBytes(),
0, false);
} catch (MqttException e) {
logger.error(e.getCause());
e.printStackTrace();
}
}

/**
* 简单回调函数,处理server接收到的主题消息
*
* @author Join
*
*/
class SimpleCallbackHandler implements MqttSimpleCallback {

/**
* 当客户机和broker意外断开时触发 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
}

/**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos,
boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题: " + topicName);
System.out.println("消息数据: " + new String(payload));
System.out.println("消息级别(0,1,2): " + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "
+ retained);
}

}

public static void main(String[] args) {
new MqttBroker().sendMessage("client", "message");
}
}
<pre>
[/java]

Android客户端:

核心代码:MQTTConnection内部类

[java]

<pre class="java" name="code">import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;

import android.app.AlarmManager;
import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.SharedPreferences;
import android.database.Cursor;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Binder;
import android.os.Bundle;
import android.os.IBinder;
import android.provider.ContactsContract;
import android.util.Log;
//此部分项目导包已被删除</pre><pre class="java" name="code">import com.ibm.mqtt.IMqttClient;
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttPersistence;
import com.ibm.mqtt.MqttPersistenceException;
import com.ibm.mqtt.MqttSimpleCallback;

/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
*/
public class PushService extends Service {
private MyBinder mBinder = new MyBinder();
// this is the log tag
public static final String TAG = "PushService";

// the IP address, where your MQTT broker is running.
private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";//
// the port at which the broker is running.
private static int MQTT_BROKER_PORT_NUM = 9901;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a
// clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
// this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 60 * 15;
// Set quality of services to 0 (at most once delivery), since we don't want
// push notifications
// arrive more than once. However, this means that some messages might get
// lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;

// MQTT client ID, which is given the broker. In this example, I also use
// this for the topic header.
// You can use this to run push notifications for multiple apps with one
// MQTT broker.
public static String MQTT_CLIENT_ID = "client";

// These are the actions for the service (name are descriptive enough)
public static final String ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
+ ".KEEP_ALIVE";
private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
+ ".RECONNECT";

// Connection log for the push service. Good for debugging.
private ConnectionLog mLog;

// Connectivity manager to determining, when the phone loses connection
private ConnectivityManager mConnMan;
// Notification manager to displaying arrived push notifications
private NotificationManager mNotifMan;

// Whether or not the service has been started.
private boolean mStarted;

// This the application level keep-alive interval, that is used by the
// AlarmManager
// to keep the connection active, even when the device goes to sleep.
private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;

// Retry intervals, when the connection is lost.
private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;

// Preferences instance
private SharedPreferences mPrefs;
// We store in the preferences, whether or not the service has been started
public static final String PREF_STARTED = "isStarted";
// We also store the deviceID (target)
public static final String PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String PREF_RETRY = "retryInterval";

// Notification title
public static String NOTIF_TITLE = "client";
// Notification id
private static final int NOTIF_CONNECTED = 0;

// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private long mStartTime;
boolean mShowFlag = true;// 是否显示通知
public static Context ctx;
private boolean mRunFlag = true;// 是否向服务器发送心跳
Timer mTimer = new Timer();

// Static method to start the service
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_START);
ctx.startService(i);
PushService.ctx = ctx;
}

// Static method to stop the service
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}

// Static method to send a keep alive message
public static void actionPing(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}

@Override
public void onCreate() {
super.onCreate();

log("Creating service");
mStartTime = System.currentTimeMillis();

try {
mLog = new ConnectionLog();
Log.i(TAG, "Opened log at " + mLog.getPath());
} catch (IOException e) {
Log.e(TAG, "Failed to open log", e);
}

// Get instances of preferences, connectivity manager and notification
// manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);

/*
* If our process was reaped by the system for any reason we need to
* restore our state with merely a call to onCreate. We record the last
* "started" value and restore it here if necessary.
*/
handleCrashedService();
}

// This method does any necessary clean-up need in case the server has been
// destroyed by the system
// and then restarted
private void handleCrashedService() {
if (wasStarted() == true) {
log("Handling crashed service...");
// stop the keep alives
stopKeepAlives();

// Do a clean start
start();
}
}

@Override
public void onDestroy() {
log("Service destroyed (started=" + mStarted + ")");

// Stop the services, if it has been started
if (mStarted == true) {
stop();
}

try {
if (mLog != null)
mLog.close();
} catch (IOException e) {
}
}

@Override
public void onStart(Intent intent, int startId) {
super.onStart(intent, startId);
log("Service started with intent=" + intent);
if (intent == null) {
return;
}
// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true) {
stop();
stopSelf();
} else if (intent.getAction().equals(ACTION_START) == true) {
start();

} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
keepAlive();
} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
if (isNetworkAvailable()) {
reconnectIfNecessary();
}
}
}

public class MyBinder extends Binder {
public PushService getService() {
return PushService.this;
}
}

@Override
public IBinder onBind(Intent intent) {
return mBinder;
}

// log helper function
private void log(String message) {
log(message, null);
}

private void log(String message, Throwable e) {
if (e != null) {
Log.e(TAG, message, e);

} else {
Log.i(TAG, message);
}

if (mLog != null) {
try {
mLog.println(message);
} catch (IOException ex) {
}
}
}

// Reads whether or not the service has been started from the preferences
private boolean wasStarted() {
return mPrefs.getBoolean(PREF_STARTED, false);
}

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started) {
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;
}

private synchronized void start() {
log("Starting service...");

// Do nothing, if the service is already running.
if (mStarted == true) {
Log.w(TAG, "Attempt to start connection that is already active");
return;
}

// Establish an MQTT connection

connect();

// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
mTimer.schedule(new TimerTask() {
@Override
public void run() {
if (!mRunFlag) {
// this.cancel();
// PushService.this.stopSelf();
return;
}
System.out.println("run");
try {
if (isNetworkAvailable()) {
SharedPreferences pref = getSharedPreferences(
"client", 0);
String MOBILE_NUM = pref.getString("MOBILE_NUM", "");
HttpUtil.post(Constants.KEEPALIVE + "&mobile="
+ MOBILE_NUM + "&online_flag=1");
}
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}, 0, 60 * 1000);
// Register a connectivity listener
registerReceiver(mConnectivityChanged, new IntentFilter(
ConnectivityManager.CONNECTIVITY_ACTION));
}

private synchronized void stop() {
// Do nothing, if the service is not running.
if (mStarted == false) {
Log.w(TAG, "Attempt to stop connection not active.");
return;
}

// Save stopped state in the preferences
setStarted(false);

// Remove the connectivity receiver
unregisterReceiver(mConnectivityChanged);
// Any existing reconnect timers should be removed, since we explicitly
// stopping the service.
cancelReconnect();

// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}

//
private synchronized void connect() {
log("Connecting...");
// Thread t = new Thread() {
// @Override
// public void run() {
// fetch the device ID from the preferences.
String deviceID = "GMCC/client/"
+ mPrefs.getString(PREF_DEVICE_ID, null);

// Create a new connection only if the device id is not NULL
try {
mConnection = new MQTTConnection(MQTT_HOST, deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
if (isNetworkAvailable()) {
scheduleReconnect(mStartTime);
}
}
setStarted(true);
// }
// };
// t.start();
// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
}

private synchronized void keepAlive() {
try {
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"), e);

mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}

// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}

// Remove all scheduled keep alives
private void stopKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

// We schedule a reconnect based on the starttime of the service
public void scheduleReconnect(long startTime) {
// the last keep-alive interval
long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);

// Calculate the elapsed time since the start
long now = System.currentTimeMillis();
long elapsed = now - startTime;

// Set an appropriate interval based on the elapsed time since start
if (elapsed < interval) {
interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
} else {
interval = INITIAL_RETRY_INTERVAL;
}

log("Rescheduling connection in " + interval + "ms.");

// Save the new internval
mPrefs.edit().putLong(PREF_RETRY, interval).commit();

// Schedule a reconnect using the alarm manager.
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
}

// Remove the scheduled reconnect
public void cancelReconnect() {
Intent i = new Intent();
i.setClass(PushService.this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

private synchronized void reconnectIfNecessary() {
log("mStarted" + mStarted);
log("mConnection" + mConnection);
if (mStarted == true && mConnection == null) {
log("Reconnecting...");
connect();
}
}

// This receiver listeners for network changes and updates the MQTT
// connection
// accordingly
private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
@Override
public void onReceive(Context context, final Intent intent) {
// Get network info
// Thread mReconnect = new Thread(){
// public void run() {
NetworkInfo info = (NetworkInfo) intent
.getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);
// Is there connectivity?
boolean hasConnectivity = (info != null && info.isConnected()) ? true
: false;

log("Connectivity changed: connected=" + hasConnectivity);

if (hasConnectivity) {
reconnectIfNecessary();
} else if (mConnection != null) {
// Thread cancelConn = new Thread(){
// public void run() {
// // if there no connectivity, make sure MQTT connection is
// destroyed
log("cancelReconnect");
mConnection.disconnect();
mConnection = null;
log("cancelReconnect" + mConnection);
cancelReconnect();
// }
// };
// cancelConn.start();
}
// };
//
// };
// mReconnect.start();
}
};

// Display the topbar notification
private void showNotification(String text, Request request) {

Notification n = new Notification();
n.flags |= Notification.FLAG_SHOW_LIGHTS;
n.flags |= Notification.FLAG_AUTO_CANCEL;
n.defaults = Notification.DEFAULT_ALL;
n.icon = R.drawable.ico;
n.when = System.currentTimeMillis();
Intent intent = new Intent();
Bundle bundle = new Bundle();
bundle.putSerializable("request", request);
bundle.putString("currentTab", "1");
intent.putExtras(bundle);
intent.setClass(this, MainActivity.class);
intent.setAction(Intent.ACTION_MAIN);
intent.addCategory(Intent.CATEGORY_LAUNCHER);
intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK
| Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED);
// Simply open the parent activity
PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0);

// Change the name of the notification here
n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);
mNotifMan.notify(NOTIF_CONNECTED, n);
}

// Check if we are online
private boolean isNetworkAvailable() {
NetworkInfo info = mConnMan.getActiveNetworkInfo();
if (info == null) {
return false;
}
return info.isConnected();
}

<span style="BACKGROUND-COLOR: #ccffff"> // This inner class is a wrapper on top of MQTT client.
private class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;

// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic)
throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + brokerHostName + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);

// Subscribe to an initial topic, which is combination of client ID
// and device ID.
// initTopic = MQTT_CLIENT_ID + "/" + initTopic;
subscribeToTopic(initTopic);

log("Connection established to " + brokerHostName + " on topic "
+ initTopic);

// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
}

// Disconnect
public void disconnect() {
// try {
stopKeepAlives();
log("stopKeepAlives");
Thread t = new Thread() {
public void run() {
try {
mqttClient.disconnect();
log("mqttClient.disconnect();");
} catch (MqttPersistenceException e) {
log("MqttException"
+ (e.getMessage() != null ? e.getMessage()
: " NULL"), e);
}
};
};
t.start();
// } catch (MqttPersistenceException e) {
// log("MqttException"
// + (e.getMessage() != null ? e.getMessage() : " NULL"),
// e);
// }
}

/*
* Send a request to the message broker to be sent messages published
* with the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName) throws MqttException {

if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
log("Connection error" + "No connection");
} else {
String[] topics = { topicName };
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}

/*
* Sends a message to the message broker, requesting that it be
* published to the specified topic.
*/
private void publishToTopic(String topicName, String message)
throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
// a connection
log("No connection to public to");
} else {
mqttClient.publish(topicName, message.getBytes(),
MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);
}
}

/*
* Called if the application loses it's connection to the message
* broker.
*/
public void connectionLost() throws Exception {
log("Loss of connection" + "connection downed");
stopKeepAlives();
// 取消定时发送心跳
mRunFlag = false;
// 向服务器发送请求,更改在线状态
// SharedPreferences pref = getSharedPreferences("client",0);
// String MOBILE_NUM=pref.getString("MOBILE_NUM", "");
// HttpUtil.post(Constants.KEEPALIVE + "&mobile="
// + MOBILE_NUM+"&online_flag=0");
// null itself
mConnection = null;
if (isNetworkAvailable() == true) {
reconnectIfNecessary();
}
}

/*
* Called when we receive a message from the message broker.
*/
public void publishArrived(String topicName, byte[] payload, int qos,
boolean retained) throws MqttException {
// Show a notification
// synchronized (lock) {
String s = new String(payload);
Request request = null;
try {// 解析服务端推送过来的消息
request = XmlPaserTool.getMessage(new ByteArrayInputStream(s
.getBytes()));
// request=Constants.request;
} catch (Exception e) {
e.printStackTrace();
}
final Request mRequest = request;
DownloadInfo down = new DownloadInfo(mRequest);
down.setDownLoad(down);
downloadInfos.add(down);
sendUpdateBroast();
down.start();
showNotification("您有一条新的消息!", mRequest);
Log.d(PushService.TAG, s);
Log.d(PushService.TAG, mRequest.getMessageId());
// 再向服务端推送消息
new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID
+ "/keepalive", "***********send message**********");
}

public void sendKeepAlive() throws MqttException {
log("Sending keep alive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive",
mPrefs.getString(PREF_DEVICE_ID, ""));
}
}

class AdvancedCallbackHandler {
IMqttClient mqttClient = null;
public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别

/**
* 重新连接服务
*/
private void connect() throws MqttException {
String mqttConnSpec = "tcp://" + MQTT_HOST + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
Log.d(TAG, "连接服务器,推送消息");
Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
Log.d(TAG, MQTT_CLIENT_ID + "/keepalive");
// 增加心跳,保持网络通畅
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
"keepalive".getBytes(), QOS_VALUES[0], true);
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

Log.d(TAG, "send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
// mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
// message.getBytes(), 0, false);
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
message.getBytes(), 0, false);
} catch (MqttException e) {
Log.d(TAG, e.getCause() + "");
e.printStackTrace();
}
}
}
</span>
public String getPeople(String phone_number) {
String name = "";
String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME,
ContactsContract.CommonDataKinds.Phone.NUMBER };
Log.d(TAG, "getPeople ---------");
// 将自己添加到 msPeers 中
Cursor cursor = this.getContentResolver().query(
ContactsContract.CommonDataKinds.Phone.CONTENT_URI,
projection, // Which columns to return.
ContactsContract.CommonDataKinds.Phone.NUMBER + " = '"
+ phone_number + "'", // WHERE clause.
null, // WHERE clause value substitution
null); // Sort order.

if (cursor == null) {
Log.d(TAG, "getPeople null");
return "";
}
Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount());
if (cursor.getCount() > 0) {
cursor.moveToPosition(0);

// 取得联系人名字
int nameFieldColumnIndex = cursor
.getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME);
name = cursor.getString(nameFieldColumnIndex);
Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 这里提示
// force
// close
System.out.println("联系人姓名:" + name);
return name;
}
return phone_number;
}

public void sendUpdateBroast() {
Intent intent = new Intent();
intent.setAction("update");
sendBroadcast(intent);
}

public void sendUpdateFinishBroast() {
Intent intent = new Intent();
intent.setAction("updateFinishList");
sendBroadcast(intent);
}

public class DownloadInfo extends Thread {
boolean runflag = true;
Request mRequest;
public float progress;
public MessageBean bean = null;
DownloadInfo download = null;
MessageDetailDao dao = new MessageDetailDao(
PushService.this.getApplicationContext());

public synchronized void stopthread() {
runflag = false;
}

public synchronized boolean getrunflag() {
return runflag;
}

DownloadInfo(Request mRequest) {
this.mRequest = mRequest;

}

public void setDownLoad(DownloadInfo download) {
this.download = download;
}

@Override
public void run() {
try {

File dir = new File(Constants.DOWNLOAD_PATH);
if (!dir.exists()) {
dir.mkdirs();
}
String filePath = Constants.DOWNLOAD_PATH
+ mRequest.getMessageId() + "." + mRequest.getExt();
bean = new MessageBean();
bean.setPath(filePath);
bean.setStatus(0);
bean.setDate(mRequest.getTimestamp());
bean.setLayoutID(R.layout.list_say_he_item);
bean.setPhotoID(R.drawable.receive_ico);
bean.setMessage_key(mRequest.getMessageId());
bean.setPhone_number(mRequest.getReceiver());
bean.setAction(1);
String name = getPeople(mRequest.getSender());
bean.setName(name);
bean.setFileType(Integer.parseInt(mRequest.getCommand()));
if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) {
bean.setMsgIco(R.drawable.music_temp);
bean.setText(name + "给你发送了音乐");
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_CARD)) {
bean.setMsgIco(R.drawable.card_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_LBS)) {
bean.setMsgIco(R.drawable.address_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) {
bean.setText(name + "向你发送了照片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_PIC)) {
bean.setText(name + "向你发送了图片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_SMS)) {
bean.setFileType(0);
}

if (!mRequest.getCommand().equals(Request.TYPE_CARD)
&& !mRequest.getCommand().equals(Request.TYPE_SMS)) {
String path = Constants.FILE_DOWNLOAD_URL
+ mRequest.getMessageId();
URL url = new URL(path);
HttpURLConnection hurlconn = (HttpURLConnection) url
.openConnection();// 基于HTTP协议的连接对象
hurlconn.setConnectTimeout(5000);// 请求超时时间 5s
hurlconn.setRequestMethod("GET");// 请求方式
hurlconn.connect();
long fileSize = hurlconn.getContentLength();
InputStream instream = hurlconn.getInputStream();
byte[] buffer = new byte[1024];
int len = 0;
int number = 0;
RandomAccessFile rasf = new RandomAccessFile(filePath,
"rwd");
while ((len = instream.read(buffer)) != -1) {// 开始下载文件
if (getrunflag() && progress < 100) {
rasf.seek(number);
number += len;
rasf.write(buffer, 0, len);
progress = (((float) number) / fileSize) * 100;
// 发送广播,修改进度条进度
sendUpdateBroast();
} else {
this.interrupt();
if (number != fileSize) {// 取消下载,将已经缓存的未下载完成的文件删除
File file = new File(filePath);
if (file.exists())
file.delete();
}
PushService.downloadInfos.remove(download);
sendUpdateBroast();
return;
}
}
instream.close();
PushService.downloadInfos.remove(download);
sendUpdateBroast();
} else {// 收到消息,将信息保存到数据库

PushService.downloadInfos.remove(download);
sendUpdateBroast();
}
// 将文件信息保存到数据库
dao.create(bean);
sendUpdateFinishBroast();

} catch (Exception e) {
PushService.downloadInfos.remove(download);
sendUpdateBroast();
e.printStackTrace();
}
}
}

public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>();

public ArrayList<DownloadInfo> getDownloadInfos() {
return PushService.downloadInfos;
}

public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) {
PushService.downloadInfos = downloadInfos;
}
}</pre>
<p><br>
ps:</p>
<p>接收者必须订阅发送者的TOPICS才能收到消息</p>
<p> </p>
<pre></pre>
<pre></pre>
<pre></pre>
<pre></pre>

[/java]

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    556 引用 • 674 回帖
  • 思源笔记

    思源笔记是一款隐私优先的个人知识管理系统,支持完全离线使用,同时也支持端到端加密同步。

    融合块、大纲和双向链接,重构你的思维。

    24709 引用 • 101459 回帖
  • Maven

    Maven 是基于项目对象模型(POM)、通过一小段描述信息来管理项目的构建、报告和文档的软件项目管理工具。

    186 引用 • 318 回帖 • 263 关注
  • sts
    2 引用 • 2 回帖 • 225 关注
  • 爬虫

    网络爬虫(Spider、Crawler),是一种按照一定的规则,自动地抓取万维网信息的程序。

    106 引用 • 275 回帖 • 1 关注
  • 前端

    前端技术一般分为前端设计和前端开发,前端设计可以理解为网站的视觉设计,前端开发则是网站的前台代码实现,包括 HTML、CSS 以及 JavaScript 等。

    245 引用 • 1338 回帖
  • Gzip

    gzip (GNU zip)是 GNU 自由软件的文件压缩程序。我们在 Linux 中经常会用到后缀为 .gz 的文件,它们就是 Gzip 格式的。现今已经成为互联网上使用非常普遍的一种数据压缩格式,或者说一种文件格式。

    9 引用 • 12 回帖 • 167 关注
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖
  • IDEA

    IDEA 全称 IntelliJ IDEA,是一款 Java 语言开发的集成环境,在业界被公认为最好的 Java 开发工具之一。IDEA 是 JetBrains 公司的产品,这家公司总部位于捷克共和国的首都布拉格,开发人员以严谨著称的东欧程序员为主。

    181 引用 • 400 回帖 • 1 关注
  • 大疆创新

    深圳市大疆创新科技有限公司(DJI-Innovations,简称 DJI),成立于 2006 年,是全球领先的无人飞行器控制系统及无人机解决方案的研发和生产商,客户遍布全球 100 多个国家。通过持续的创新,大疆致力于为无人机工业、行业用户以及专业航拍应用提供性能最强、体验最佳的革命性智能飞控产品和解决方案。

    2 引用 • 14 回帖 • 1 关注
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 679 关注
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    7 引用 • 69 回帖 • 1 关注
  • Android

    Android 是一种以 Linux 为基础的开放源码操作系统,主要使用于便携设备。2005 年由 Google 收购注资,并拉拢多家制造商组成开放手机联盟开发改良,逐渐扩展到到平板电脑及其他领域上。

    335 引用 • 324 回帖
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖 • 1 关注
  • Git

    Git 是 Linux Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。

    211 引用 • 358 回帖 • 1 关注
  • 招聘

    哪里都缺人,哪里都不缺人。

    189 引用 • 1057 回帖 • 1 关注
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    210 引用 • 2040 回帖
  • Follow
    4 引用 • 12 回帖 • 7 关注
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    92 引用 • 752 回帖 • 1 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 45 关注
  • Sphinx

    Sphinx 是一个基于 SQL 的全文检索引擎,可以结合 MySQL、PostgreSQL 做全文搜索,它可以提供比数据库本身更专业的搜索功能,使得应用程序更容易实现专业化的全文检索。

    1 引用 • 213 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 568 关注
  • PWA

    PWA(Progressive Web App)是 Google 在 2015 年提出、2016 年 6 月开始推广的项目。它结合了一系列现代 Web 技术,在网页应用中实现和原生应用相近的用户体验。

    14 引用 • 69 回帖 • 176 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 1 关注
  • Ngui

    Ngui 是一个 GUI 的排版显示引擎和跨平台的 GUI 应用程序开发框架,基于
    Node.js / OpenGL。目标是在此基础上开发 GUI 应用程序可拥有开发 WEB 应用般简单与速度同时兼顾 Native 应用程序的性能与体验。

    7 引用 • 9 回帖 • 397 关注
  • Solidity

    Solidity 是一种智能合约高级语言,运行在 [以太坊] 虚拟机(EVM)之上。它的语法接近于 JavaScript,是一种面向对象的语言。

    3 引用 • 18 回帖 • 430 关注