Rocket 安装配置
官方安装教程
:http://rocketmq.apache.org/docs/quick-start/
-
下载解压安装
Maven、jdk环境需要配置好!!
Rocket4.2.0下载地址
:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.2.0/rocketmq-all-4.2.0-source-release.zipunzip rocketmq-all-4.2.0-source-release.zip cd rocketmq-all-4.2.0/ mvn -Prelease-all -DskipTests clean install -U
-
启动服务
//后台启动mqnamesrc服务,生成临时nohup.out日志文件
cd distribution/target/apache-rocketmq nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
//后台启动Broker服务,相当于注册中心,默认端口号9876
//指定broker配置文件启动
Vim ./conf/broker.conf
nohup sh bin/mqbroker -n 192.168.12.3:9876 -c ./conf/broker.conf & tail -f ~/logs/rocketmqlogs/broker.log The broker[%s, 172.30.30.233:10911] boot success...
-
报错处理
//如果出现启动报错,内存异常则需要调整namesrc和broker的启动内存,启动内存要小于服务器内存, vim修改runserver.sh和runbroker.sh
vim ./bin/runserver.sh vim ./bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=320m"
//如果出现错误: 找不到或无法加载主类 org.apache.rocketmq.namesrv.NamesrvStartup
//设置mq的环境变量
Cd /usr/local/rocketmq-all-4.2.0/distribution/target/apache-rocketmq echo "ROCKETMQ_HOME=`pwd`" >> ~/.bash_profile source ~/.bash_profile
-
推送消息到 rocketmq 代碼
private static void addMessToMq(String target, String targetValue, String apnsEnv, String body, String iextParameters, String aextParameters, String title) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); //设置消息中心地址 producer.setNamesrvAddr("192.168.12.3:9876"); //异步成功和失败信息处理 SendCallback sendCallback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System._out_.println(sendResult); } @Override public void onException(Throwable throwable) { System._out_.println(throwable); } }; try { producer.start(); Message msg = new Message(target,//topic title, //tags targetValue, //keys apnsEnv.getBytes() //apnsEnv ); msg.putUserProperty("body", body); msg.putUserProperty("iextParameters", iextParameters); msg.putUserProperty("aextParameters", aextParameters); //设置消息队列延迟等级;这里的等级是由服务器broker.conf配置文件决定 msg.setDelayTimeLevel(3); try { //异步处理消息 producer.send(msg,sendCallback); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } catch (MQClientException e) { e.printStackTrace(); } finally { producer.shutdown(); } }
-
消费者监听接收消息
public class MemConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("192.168.12.3:9876"); try { //消费者订阅主题为ACCOUNT名称下的所有内容 consumer.subscribe("ACCOUNT","*"); //设置消费开始读取的节点 consumer.setConsumeFromWhere(ConsumeFromWhere._CONSUME_FROM_FIRST_OFFSET_); //设置消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { MessageExt msg = list.get(0); String apnsEnv = null; try { apnsEnv = new String(msg.getBody(),"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } pushMess("ACCOUNT",msg.getKeys(),apnsEnv,msg.getUserProperty("body"),msg.getProperty("iextParameters"),msg.getProperty("aextParameters"),msg.getTags()); return ConsumeConcurrentlyStatus._CONSUME_SUCCESS_; } }); consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } }
-
将消费者打包成 jar 包运行在服务器上面即可
java –jar owl1.jar nohup java -jar XXX.jar >memberupdown.log &
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于