MQTT实现消息推送

本贴最后更新于 4079 天前,其中的信息可能已经时移世易

MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法

[java]

mqttClient.registerSimpleHandler(simpleCallbackHandler);
// 注册接收消息方法

[/java]

和订阅接主题

[java]</pre>
mqttClient.subscribe(TOPICS, QOS_VALUES);

// 订阅接主题
<pre>[/java]

服务端:

[java]</pre>
package com.gmcc.kuchuan.business;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttSimpleCallback;

/**
* MQTT消息发送与接收
* @author Join
*
*/
public class MqttBroker {
private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象
// 连接参数
private final static String CONNECTION_STRING = "tcp://localhost:9901";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = "master";// 客户端标识
private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别
private final static String[] TOPICS = { "Test/TestTopics/Topic1",
"Test/TestTopics/Topic2", "Test/TestTopics/Topic3",
"client/keepalive" };
private static MqttBroker instance = new MqttBroker();

private MqttClient mqttClient;

/**
* 返回实例对象
*
* @return
*/
public static MqttBroker getInstance() {
return instance;
}

/**
* 重新连接服务
*/
private void connect() throws MqttException {
logger.info("connect to mqtt broker.");
mqttClient = new MqttClient(CONNECTION_STRING);
logger.info("***********register Simple Handler***********");
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
logger.info("***********subscribe receiver topics***********");
mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题

logger.info("***********CLIENT_ID:" + CLIENT_ID);
/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/
mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],
true);// 增加心跳,保持网络通畅
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

logger.info("send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
mqttClient.publish("GMCC/client/" + clientId, message.getBytes(),
0, false);
} catch (MqttException e) {
logger.error(e.getCause());
e.printStackTrace();
}
}

/**
* 简单回调函数,处理server接收到的主题消息
*
* @author Join
*
*/
class SimpleCallbackHandler implements MqttSimpleCallback {

/**
* 当客户机和broker意外断开时触发 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
}

/**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos,
boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题: " + topicName);
System.out.println("消息数据: " + new String(payload));
System.out.println("消息级别(0,1,2): " + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "
+ retained);
}

}

public static void main(String[] args) {
new MqttBroker().sendMessage("client", "message");
}
}
<pre>
[/java]

Android客户端:

核心代码:MQTTConnection内部类

[java]

<pre class="java" name="code">import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;

import android.app.AlarmManager;
import android.app.Notification;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.SharedPreferences;
import android.database.Cursor;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.Binder;
import android.os.Bundle;
import android.os.IBinder;
import android.provider.ContactsContract;
import android.util.Log;
//此部分项目导包已被删除</pre><pre class="java" name="code">import com.ibm.mqtt.IMqttClient;
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttPersistence;
import com.ibm.mqtt.MqttPersistenceException;
import com.ibm.mqtt.MqttSimpleCallback;

/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
*/
public class PushService extends Service {
private MyBinder mBinder = new MyBinder();
// this is the log tag
public static final String TAG = "PushService";

// the IP address, where your MQTT broker is running.
private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";//
// the port at which the broker is running.
private static int MQTT_BROKER_PORT_NUM = 9901;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a
// clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
// this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 60 * 15;
// Set quality of services to 0 (at most once delivery), since we don't want
// push notifications
// arrive more than once. However, this means that some messages might get
// lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;

// MQTT client ID, which is given the broker. In this example, I also use
// this for the topic header.
// You can use this to run push notifications for multiple apps with one
// MQTT broker.
public static String MQTT_CLIENT_ID = "client";

// These are the actions for the service (name are descriptive enough)
public static final String ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
+ ".KEEP_ALIVE";
private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
+ ".RECONNECT";

// Connection log for the push service. Good for debugging.
private ConnectionLog mLog;

// Connectivity manager to determining, when the phone loses connection
private ConnectivityManager mConnMan;
// Notification manager to displaying arrived push notifications
private NotificationManager mNotifMan;

// Whether or not the service has been started.
private boolean mStarted;

// This the application level keep-alive interval, that is used by the
// AlarmManager
// to keep the connection active, even when the device goes to sleep.
private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;

// Retry intervals, when the connection is lost.
private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;

// Preferences instance
private SharedPreferences mPrefs;
// We store in the preferences, whether or not the service has been started
public static final String PREF_STARTED = "isStarted";
// We also store the deviceID (target)
public static final String PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String PREF_RETRY = "retryInterval";

// Notification title
public static String NOTIF_TITLE = "client";
// Notification id
private static final int NOTIF_CONNECTED = 0;

// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private long mStartTime;
boolean mShowFlag = true;// 是否显示通知
public static Context ctx;
private boolean mRunFlag = true;// 是否向服务器发送心跳
Timer mTimer = new Timer();

// Static method to start the service
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_START);
ctx.startService(i);
PushService.ctx = ctx;
}

// Static method to stop the service
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
}

// Static method to send a keep alive message
public static void actionPing(Context ctx) {
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
}

@Override
public void onCreate() {
super.onCreate();

log("Creating service");
mStartTime = System.currentTimeMillis();

try {
mLog = new ConnectionLog();
Log.i(TAG, "Opened log at " + mLog.getPath());
} catch (IOException e) {
Log.e(TAG, "Failed to open log", e);
}

// Get instances of preferences, connectivity manager and notification
// manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);

/*
* If our process was reaped by the system for any reason we need to
* restore our state with merely a call to onCreate. We record the last
* "started" value and restore it here if necessary.
*/
handleCrashedService();
}

// This method does any necessary clean-up need in case the server has been
// destroyed by the system
// and then restarted
private void handleCrashedService() {
if (wasStarted() == true) {
log("Handling crashed service...");
// stop the keep alives
stopKeepAlives();

// Do a clean start
start();
}
}

@Override
public void onDestroy() {
log("Service destroyed (started=" + mStarted + ")");

// Stop the services, if it has been started
if (mStarted == true) {
stop();
}

try {
if (mLog != null)
mLog.close();
} catch (IOException e) {
}
}

@Override
public void onStart(Intent intent, int startId) {
super.onStart(intent, startId);
log("Service started with intent=" + intent);
if (intent == null) {
return;
}
// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true) {
stop();
stopSelf();
} else if (intent.getAction().equals(ACTION_START) == true) {
start();

} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
keepAlive();
} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
if (isNetworkAvailable()) {
reconnectIfNecessary();
}
}
}

public class MyBinder extends Binder {
public PushService getService() {
return PushService.this;
}
}

@Override
public IBinder onBind(Intent intent) {
return mBinder;
}

// log helper function
private void log(String message) {
log(message, null);
}

private void log(String message, Throwable e) {
if (e != null) {
Log.e(TAG, message, e);

} else {
Log.i(TAG, message);
}

if (mLog != null) {
try {
mLog.println(message);
} catch (IOException ex) {
}
}
}

// Reads whether or not the service has been started from the preferences
private boolean wasStarted() {
return mPrefs.getBoolean(PREF_STARTED, false);
}

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started) {
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;
}

private synchronized void start() {
log("Starting service...");

// Do nothing, if the service is already running.
if (mStarted == true) {
Log.w(TAG, "Attempt to start connection that is already active");
return;
}

// Establish an MQTT connection

connect();

// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
mTimer.schedule(new TimerTask() {
@Override
public void run() {
if (!mRunFlag) {
// this.cancel();
// PushService.this.stopSelf();
return;
}
System.out.println("run");
try {
if (isNetworkAvailable()) {
SharedPreferences pref = getSharedPreferences(
"client", 0);
String MOBILE_NUM = pref.getString("MOBILE_NUM", "");
HttpUtil.post(Constants.KEEPALIVE + "&mobile="
+ MOBILE_NUM + "&online_flag=1");
}
} catch (Exception e) {
e.printStackTrace();
// TODO: handle exception
}
}
}, 0, 60 * 1000);
// Register a connectivity listener
registerReceiver(mConnectivityChanged, new IntentFilter(
ConnectivityManager.CONNECTIVITY_ACTION));
}

private synchronized void stop() {
// Do nothing, if the service is not running.
if (mStarted == false) {
Log.w(TAG, "Attempt to stop connection not active.");
return;
}

// Save stopped state in the preferences
setStarted(false);

// Remove the connectivity receiver
unregisterReceiver(mConnectivityChanged);
// Any existing reconnect timers should be removed, since we explicitly
// stopping the service.
cancelReconnect();

// Destroy the MQTT connection if there is one
if (mConnection != null) {
mConnection.disconnect();
mConnection = null;
}
}

//
private synchronized void connect() {
log("Connecting...");
// Thread t = new Thread() {
// @Override
// public void run() {
// fetch the device ID from the preferences.
String deviceID = "GMCC/client/"
+ mPrefs.getString(PREF_DEVICE_ID, null);

// Create a new connection only if the device id is not NULL
try {
mConnection = new MQTTConnection(MQTT_HOST, deviceID);
} catch (MqttException e) {
// Schedule a reconnect, if we failed to connect
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
if (isNetworkAvailable()) {
scheduleReconnect(mStartTime);
}
}
setStarted(true);
// }
// };
// t.start();
// 向服务器定时发送心跳,一分钟一次
mRunFlag = true;
}

private synchronized void keepAlive() {
try {
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null) {
mConnection.sendKeepAlive();
}
} catch (MqttException e) {
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"), e);

mConnection.disconnect();
mConnection = null;
cancelReconnect();
}
}

// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);
}

// Remove all scheduled keep alives
private void stopKeepAlives() {
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

// We schedule a reconnect based on the starttime of the service
public void scheduleReconnect(long startTime) {
// the last keep-alive interval
long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);

// Calculate the elapsed time since the start
long now = System.currentTimeMillis();
long elapsed = now - startTime;

// Set an appropriate interval based on the elapsed time since start
if (elapsed < interval) {
interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
} else {
interval = INITIAL_RETRY_INTERVAL;
}

log("Rescheduling connection in " + interval + "ms.");

// Save the new internval
mPrefs.edit().putLong(PREF_RETRY, interval).commit();

// Schedule a reconnect using the alarm manager.
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
}

// Remove the scheduled reconnect
public void cancelReconnect() {
Intent i = new Intent();
i.setClass(PushService.this, PushService.class);
i.setAction(ACTION_RECONNECT);
PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.cancel(pi);
}

private synchronized void reconnectIfNecessary() {
log("mStarted" + mStarted);
log("mConnection" + mConnection);
if (mStarted == true && mConnection == null) {
log("Reconnecting...");
connect();
}
}

// This receiver listeners for network changes and updates the MQTT
// connection
// accordingly
private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
@Override
public void onReceive(Context context, final Intent intent) {
// Get network info
// Thread mReconnect = new Thread(){
// public void run() {
NetworkInfo info = (NetworkInfo) intent
.getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);
// Is there connectivity?
boolean hasConnectivity = (info != null && info.isConnected()) ? true
: false;

log("Connectivity changed: connected=" + hasConnectivity);

if (hasConnectivity) {
reconnectIfNecessary();
} else if (mConnection != null) {
// Thread cancelConn = new Thread(){
// public void run() {
// // if there no connectivity, make sure MQTT connection is
// destroyed
log("cancelReconnect");
mConnection.disconnect();
mConnection = null;
log("cancelReconnect" + mConnection);
cancelReconnect();
// }
// };
// cancelConn.start();
}
// };
//
// };
// mReconnect.start();
}
};

// Display the topbar notification
private void showNotification(String text, Request request) {

Notification n = new Notification();
n.flags |= Notification.FLAG_SHOW_LIGHTS;
n.flags |= Notification.FLAG_AUTO_CANCEL;
n.defaults = Notification.DEFAULT_ALL;
n.icon = R.drawable.ico;
n.when = System.currentTimeMillis();
Intent intent = new Intent();
Bundle bundle = new Bundle();
bundle.putSerializable("request", request);
bundle.putString("currentTab", "1");
intent.putExtras(bundle);
intent.setClass(this, MainActivity.class);
intent.setAction(Intent.ACTION_MAIN);
intent.addCategory(Intent.CATEGORY_LAUNCHER);
intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK
| Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED);
// Simply open the parent activity
PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0);

// Change the name of the notification here
n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);
mNotifMan.notify(NOTIF_CONNECTED, n);
}

// Check if we are online
private boolean isNetworkAvailable() {
NetworkInfo info = mConnMan.getActiveNetworkInfo();
if (info == null) {
return false;
}
return info.isConnected();
}

<span style="BACKGROUND-COLOR: #ccffff"> // This inner class is a wrapper on top of MQTT client.
private class MQTTConnection implements MqttSimpleCallback {
IMqttClient mqttClient = null;

// Creates a new connection given the broker address and initial topic
public MQTTConnection(String brokerHostName, String initTopic)
throws MqttException {
// Create connection spec
String mqttConnSpec = "tcp://" + brokerHostName + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);

// register this client app has being able to receive messages
mqttClient.registerSimpleHandler(this);

// Subscribe to an initial topic, which is combination of client ID
// and device ID.
// initTopic = MQTT_CLIENT_ID + "/" + initTopic;
subscribeToTopic(initTopic);

log("Connection established to " + brokerHostName + " on topic "
+ initTopic);

// Save start time
mStartTime = System.currentTimeMillis();
// Star the keep-alives
startKeepAlives();
}

// Disconnect
public void disconnect() {
// try {
stopKeepAlives();
log("stopKeepAlives");
Thread t = new Thread() {
public void run() {
try {
mqttClient.disconnect();
log("mqttClient.disconnect();");
} catch (MqttPersistenceException e) {
log("MqttException"
+ (e.getMessage() != null ? e.getMessage()
: " NULL"), e);
}
};
};
t.start();
// } catch (MqttPersistenceException e) {
// log("MqttException"
// + (e.getMessage() != null ? e.getMessage() : " NULL"),
// e);
// }
}

/*
* Send a request to the message broker to be sent messages published
* with the specified topic name. Wildcards are allowed.
*/
private void subscribeToTopic(String topicName) throws MqttException {

if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and subscribe if we don't have
// a connection
log("Connection error" + "No connection");
} else {
String[] topics = { topicName };
mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
}
}

/*
* Sends a message to the message broker, requesting that it be
* published to the specified topic.
*/
private void publishToTopic(String topicName, String message)
throws MqttException {
if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
// quick sanity check - don't try and publish if we don't have
// a connection
log("No connection to public to");
} else {
mqttClient.publish(topicName, message.getBytes(),
MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);
}
}

/*
* Called if the application loses it's connection to the message
* broker.
*/
public void connectionLost() throws Exception {
log("Loss of connection" + "connection downed");
stopKeepAlives();
// 取消定时发送心跳
mRunFlag = false;
// 向服务器发送请求,更改在线状态
// SharedPreferences pref = getSharedPreferences("client",0);
// String MOBILE_NUM=pref.getString("MOBILE_NUM", "");
// HttpUtil.post(Constants.KEEPALIVE + "&mobile="
// + MOBILE_NUM+"&online_flag=0");
// null itself
mConnection = null;
if (isNetworkAvailable() == true) {
reconnectIfNecessary();
}
}

/*
* Called when we receive a message from the message broker.
*/
public void publishArrived(String topicName, byte[] payload, int qos,
boolean retained) throws MqttException {
// Show a notification
// synchronized (lock) {
String s = new String(payload);
Request request = null;
try {// 解析服务端推送过来的消息
request = XmlPaserTool.getMessage(new ByteArrayInputStream(s
.getBytes()));
// request=Constants.request;
} catch (Exception e) {
e.printStackTrace();
}
final Request mRequest = request;
DownloadInfo down = new DownloadInfo(mRequest);
down.setDownLoad(down);
downloadInfos.add(down);
sendUpdateBroast();
down.start();
showNotification("您有一条新的消息!", mRequest);
Log.d(PushService.TAG, s);
Log.d(PushService.TAG, mRequest.getMessageId());
// 再向服务端推送消息
new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID
+ "/keepalive", "***********send message**********");
}

public void sendKeepAlive() throws MqttException {
log("Sending keep alive");
// publish to a keep-alive topic
publishToTopic(MQTT_CLIENT_ID + "/keepalive",
mPrefs.getString(PREF_DEVICE_ID, ""));
}
}

class AdvancedCallbackHandler {
IMqttClient mqttClient = null;
public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别

/**
* 重新连接服务
*/
private void connect() throws MqttException {
String mqttConnSpec = "tcp://" + MQTT_HOST + "@"
+ MQTT_BROKER_PORT_NUM;
// Create the client and connect
mqttClient = MqttClient.createMqttClient(mqttConnSpec,
MQTT_PERSISTENCE);
String clientID = MQTT_CLIENT_ID + "/"
+ mPrefs.getString(PREF_DEVICE_ID, "");
mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
Log.d(TAG, "连接服务器,推送消息");
Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + " clientID:"
+ clientID);
Log.d(TAG, MQTT_CLIENT_ID + "/keepalive");
// 增加心跳,保持网络通畅
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
"keepalive".getBytes(), QOS_VALUES[0], true);
}

/**
* 发送消息
*
* @param clientId
* @param messageId
*/
public void sendMessage(String clientId, String message) {
try {
if (mqttClient == null || !mqttClient.isConnected()) {
connect();
}

Log.d(TAG, "send message to " + clientId + ", message is "
+ message);
// 发布自己的消息
// mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
// message.getBytes(), 0, false);
mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
message.getBytes(), 0, false);
} catch (MqttException e) {
Log.d(TAG, e.getCause() + "");
e.printStackTrace();
}
}
}
</span>
public String getPeople(String phone_number) {
String name = "";
String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME,
ContactsContract.CommonDataKinds.Phone.NUMBER };
Log.d(TAG, "getPeople ---------");
// 将自己添加到 msPeers 中
Cursor cursor = this.getContentResolver().query(
ContactsContract.CommonDataKinds.Phone.CONTENT_URI,
projection, // Which columns to return.
ContactsContract.CommonDataKinds.Phone.NUMBER + " = '"
+ phone_number + "'", // WHERE clause.
null, // WHERE clause value substitution
null); // Sort order.

if (cursor == null) {
Log.d(TAG, "getPeople null");
return "";
}
Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount());
if (cursor.getCount() > 0) {
cursor.moveToPosition(0);

// 取得联系人名字
int nameFieldColumnIndex = cursor
.getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME);
name = cursor.getString(nameFieldColumnIndex);
Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 这里提示
// force
// close
System.out.println("联系人姓名:" + name);
return name;
}
return phone_number;
}

public void sendUpdateBroast() {
Intent intent = new Intent();
intent.setAction("update");
sendBroadcast(intent);
}

public void sendUpdateFinishBroast() {
Intent intent = new Intent();
intent.setAction("updateFinishList");
sendBroadcast(intent);
}

public class DownloadInfo extends Thread {
boolean runflag = true;
Request mRequest;
public float progress;
public MessageBean bean = null;
DownloadInfo download = null;
MessageDetailDao dao = new MessageDetailDao(
PushService.this.getApplicationContext());

public synchronized void stopthread() {
runflag = false;
}

public synchronized boolean getrunflag() {
return runflag;
}

DownloadInfo(Request mRequest) {
this.mRequest = mRequest;

}

public void setDownLoad(DownloadInfo download) {
this.download = download;
}

@Override
public void run() {
try {

File dir = new File(Constants.DOWNLOAD_PATH);
if (!dir.exists()) {
dir.mkdirs();
}
String filePath = Constants.DOWNLOAD_PATH
+ mRequest.getMessageId() + "." + mRequest.getExt();
bean = new MessageBean();
bean.setPath(filePath);
bean.setStatus(0);
bean.setDate(mRequest.getTimestamp());
bean.setLayoutID(R.layout.list_say_he_item);
bean.setPhotoID(R.drawable.receive_ico);
bean.setMessage_key(mRequest.getMessageId());
bean.setPhone_number(mRequest.getReceiver());
bean.setAction(1);
String name = getPeople(mRequest.getSender());
bean.setName(name);
bean.setFileType(Integer.parseInt(mRequest.getCommand()));
if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) {
bean.setMsgIco(R.drawable.music_temp);
bean.setText(name + "给你发送了音乐");
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_CARD)) {
bean.setMsgIco(R.drawable.card_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_LBS)) {
bean.setMsgIco(R.drawable.address_temp);
bean.setText(new String(Base64.decode(mRequest.getBody(),
Base64.DEFAULT)));
mRequest.setBody(Base64.encodeToString(bean.getText()
.getBytes(), Base64.DEFAULT));
} else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) {
bean.setText(name + "向你发送了照片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_PIC)) {
bean.setText(name + "向你发送了图片");
bean.setMsgIco(-1);
} else if (mRequest.getCommand().equals(Request.TYPE_SMS)) {
bean.setFileType(0);
}

if (!mRequest.getCommand().equals(Request.TYPE_CARD)
&& !mRequest.getCommand().equals(Request.TYPE_SMS)) {
String path = Constants.FILE_DOWNLOAD_URL
+ mRequest.getMessageId();
URL url = new URL(path);
HttpURLConnection hurlconn = (HttpURLConnection) url
.openConnection();// 基于HTTP协议的连接对象
hurlconn.setConnectTimeout(5000);// 请求超时时间 5s
hurlconn.setRequestMethod("GET");// 请求方式
hurlconn.connect();
long fileSize = hurlconn.getContentLength();
InputStream instream = hurlconn.getInputStream();
byte[] buffer = new byte[1024];
int len = 0;
int number = 0;
RandomAccessFile rasf = new RandomAccessFile(filePath,
"rwd");
while ((len = instream.read(buffer)) != -1) {// 开始下载文件
if (getrunflag() && progress < 100) {
rasf.seek(number);
number += len;
rasf.write(buffer, 0, len);
progress = (((float) number) / fileSize) * 100;
// 发送广播,修改进度条进度
sendUpdateBroast();
} else {
this.interrupt();
if (number != fileSize) {// 取消下载,将已经缓存的未下载完成的文件删除
File file = new File(filePath);
if (file.exists())
file.delete();
}
PushService.downloadInfos.remove(download);
sendUpdateBroast();
return;
}
}
instream.close();
PushService.downloadInfos.remove(download);
sendUpdateBroast();
} else {// 收到消息,将信息保存到数据库

PushService.downloadInfos.remove(download);
sendUpdateBroast();
}
// 将文件信息保存到数据库
dao.create(bean);
sendUpdateFinishBroast();

} catch (Exception e) {
PushService.downloadInfos.remove(download);
sendUpdateBroast();
e.printStackTrace();
}
}
}

public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>();

public ArrayList<DownloadInfo> getDownloadInfos() {
return PushService.downloadInfos;
}

public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) {
PushService.downloadInfos = downloadInfos;
}
}</pre>
<p><br>
ps:</p>
<p>接收者必须订阅发送者的TOPICS才能收到消息</p>
<p> </p>
<pre></pre>
<pre></pre>
<pre></pre>
<pre></pre>

[/java]

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • RYMCU

    RYMCU 致力于打造一个即严谨又活泼、专业又不失有趣,为数百万人服务的开源嵌入式知识学习交流平台。

    4 引用 • 6 回帖 • 51 关注
  • jQuery

    jQuery 是一套跨浏览器的 JavaScript 库,强化 HTML 与 JavaScript 之间的操作。由 John Resig 在 2006 年 1 月的 BarCamp NYC 上释出第一个版本。全球约有 28% 的网站使用 jQuery,是非常受欢迎的 JavaScript 库。

    63 引用 • 134 回帖 • 724 关注
  • 新人

    让我们欢迎这对新人。哦,不好意思说错了,让我们欢迎这位新人!
    新手上路,请谨慎驾驶!

    52 引用 • 228 回帖 • 1 关注
  • 开源

    Open Source, Open Mind, Open Sight, Open Future!

    408 引用 • 3574 回帖
  • RESTful

    一种软件架构设计风格而不是标准,提供了一组设计原则和约束条件,主要用于客户端和服务器交互类的软件。基于这个风格设计的软件可以更简洁,更有层次,更易于实现缓存等机制。

    30 引用 • 114 回帖 • 2 关注
  • abitmean

    有点意思就行了

    29 关注
  • C

    C 语言是一门通用计算机编程语言,应用广泛。C 语言的设计目标是提供一种能以简易的方式编译、处理低级存储器、产生少量的机器码以及不需要任何运行环境支持便能运行的编程语言。

    85 引用 • 165 回帖 • 2 关注
  • NGINX

    NGINX 是一个高性能的 HTTP 和反向代理服务器,也是一个 IMAP/POP3/SMTP 代理服务器。 NGINX 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,第一个公开版本 0.1.0 发布于 2004 年 10 月 4 日。

    311 引用 • 546 回帖
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    85 引用 • 139 回帖 • 1 关注
  • PHP

    PHP(Hypertext Preprocessor)是一种开源脚本语言。语法吸收了 C 语言、 Java 和 Perl 的特点,主要适用于 Web 开发领域,据说是世界上最好的编程语言。

    179 引用 • 407 回帖 • 488 关注
  • GitLab

    GitLab 是利用 Ruby 一个开源的版本管理系统,实现一个自托管的 Git 项目仓库,可通过 Web 界面操作公开或私有项目。

    46 引用 • 72 回帖
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 6 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 47 关注
  • DNSPod

    DNSPod 建立于 2006 年 3 月份,是一款免费智能 DNS 产品。 DNSPod 可以为同时有电信、网通、教育网服务器的网站提供智能的解析,让电信用户访问电信的服务器,网通的用户访问网通的服务器,教育网的用户访问教育网的服务器,达到互联互通的效果。

    6 引用 • 26 回帖 • 510 关注
  • 大数据

    大数据(big data)是指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    93 引用 • 113 回帖
  • TensorFlow

    TensorFlow 是一个采用数据流图(data flow graphs),用于数值计算的开源软件库。节点(Nodes)在图中表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。

    20 引用 • 19 回帖
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    524 引用 • 4601 回帖 • 700 关注
  • Bug

    Bug 本意是指臭虫、缺陷、损坏、犯贫、窃听器、小虫等。现在人们把在程序中一些缺陷或问题统称为 bug(漏洞)。

    75 引用 • 1737 回帖 • 5 关注
  • 职场

    找到自己的位置,萌新烦恼少。

    127 引用 • 1705 回帖 • 1 关注
  • HBase

    HBase 是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了 Google 文件系统所提供的分布式数据存储一样,HBase 在 Hadoop 之上提供了类似于 Bigtable 的能力。

    17 引用 • 6 回帖 • 73 关注
  • Vim

    Vim 是类 UNIX 系统文本编辑器 Vi 的加强版本,加入了更多特性来帮助编辑源代码。Vim 的部分增强功能包括文件比较(vimdiff)、语法高亮、全面的帮助系统、本地脚本(Vimscript)和便于选择的可视化模式。

    29 引用 • 66 回帖
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 670 关注
  • 导航

    各种网址链接、内容导航。

    40 引用 • 173 回帖
  • 服务

    提供一个服务绝不仅仅是简单的把硬件和软件累加在一起,它包括了服务的可靠性、服务的标准化、以及对服务的监控、维护、技术支持等。

    41 引用 • 24 回帖 • 2 关注
  • 锤子科技

    锤子科技(Smartisan)成立于 2012 年 5 月,是一家制造移动互联网终端设备的公司,公司的使命是用完美主义的工匠精神,打造用户体验一流的数码消费类产品(智能手机为主),改善人们的生活质量。

    4 引用 • 31 回帖 • 4 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 595 关注
  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    21 引用 • 140 回帖 • 1 关注