概述&选型
消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要用于三种典型场景:应用解耦、流量消峰、消息分发。
目前主流的 MQ 主要是 Rocketmq、kafka、Rabbitmq,Rocketmq 相比于 Rabbitmq、kafka 具有主要优势特性有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,rabbitmq 和 kafka 不支持)
- 支持结合 rocketmq 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(rabbitmq 和 kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(kafka 不支持,rabbitmq 需要手动确认)
- 支持 consumer 端 tag 过滤,减少不必要的网络传输(rabbitmq 和 kafka 不支持)
- 支持重复消费(rabbitmq 不支持,kafka 支持)
本文主要介绍 RocketMQ 的单机安装、双机主从高可用安装配置、运维管理平台搭建、与 SpringBoot 整合几个知识点,具备相关知识技能的同学请直接拉到最后点个“在看”即可。
文章开始之前需要先准备好 JDK1.8 或以上的服务器环境以及从 rocketmq 官网下载好二进制安装包,下载地址 http://rocketmq.apache.org/dowloading/releases/
单机安装配置
工欲善其事必先利其器,要想深入了解 RocketMQ 得先把环境安装好,咱们先开始单机版 RocketMQ 的安装!
-
解压安装
unzip rocketmq-all-4.7.0-bin-release.zip
-
启动 Name Server
> nohup sh bin/mqnamesrv &
-
查看 Name Server 启动日志
> tail -f ~/logs/rocketmqlogs/namesrv.log
-
启动 Broker Server
> nohup sh bin/mqbroker -n localhost:9876 &
-
查看 Broker Server 启动日志
> tail -f ~/logs/rocketmqlogs/broker.log
单机情况下安装使用 RocketMQ 很简单,只需要分别启动 NameServer 和 Broker Server 即可!
关闭 RockerMQ 需要使用下面的命令:
# 先关闭Broker Server
> sh bin/mqshutdown broker
# 再关闭NameServer
> sh bin/mqshutdown namesrv
双机主从高可用搭建
为了消除单机故障,增加可靠性或增大吞吐量,可以在多台服务器上部署多个 NameServer 和 Broker,并为每个 Broker 部署一个或多个 Slave。本节将说明使用两台机器,搭建双主、双从、无单点故障的高可用 RocketMQ 集群。假设现在有两台服务器,IP 地址分别为:192.168.100.43 和 192.168.100.44,部署架构如下:
启动多个 NameServer 和 Broker
首先需要在两台服务器上分别启动 NameServer(nohup sh bin/mqnamesrv &),这样我们就得到了一个无单点的 NameServer 服务,服务地址为 192.168.100.43:9876 和 192.168.100.44:9876。
然后在两台服务器中 RocketMQ 的 conf 目录分别建立两个文件 broker-master.properties
,broker-slave.properties
,下面是不同服务器的配置说明:
- 192.168.100.43 机器上的 broker-master.properties 文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort = 10911
storePathRootDir = /app/rocketmq/store-a
- 192.168.100.43 机器上的 broker-slave.properties 文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
listenPort = 11011
storePathRootDir = /app/rocketmq/store-b
- 192.168.100.44 机器上的 broker-master.properties 文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort = 10911
storePathRootDir = /app/rocketmq/store-b
- 192.168.100.44 机器上的 broker-slave.properties 文件:
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
listenPort = 11011
storePathRootDir = /app/rocketmq/store-a
然后分别使用如下命令启动两台服务器的主节点和从节点
nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &
这样一个高可用的 RockerMQ 集群就搭建好了,我们登陆可视化运维管理界面查看集群状态,集群正常启动。
重要参数说明
本节主要是对 Broker 的配置文件中用到的参数进行说明
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
指定 NameServer 的地址,可以是多个。brokerClusterName = DefaultCluster
Cluster 地址,如果集群数量比较多,可以分成多个 Cluster,每个 Cluster 供一个业务群使用。brokerName = broker-a
Broker 的名称,Master 和 Slave 通过使用相同的 Broker 名称来表明相互关系,以说明某个 Slave 是哪个 Master 的 Slave。brokerId = 1
一个 Master 可以有多个 Slave,0 表示 Master,大于 0 的表示不同 Slave 的 ID。fileReservedTime = 48
在磁盘上保存消息的时长,单位是小时,自动删除超时的消息。deleteWhen = 04
与 fileReservedTime 参数对应,表明在几点做消息删除动作,默认是凌晨 4 点。brokerRole = SYNC_MASTER
brokerRole
的可选参数有SYNC_MASTER
,ASYNC_MASTER
,SLAVE
三种。SYNC
和ASYNC
表示MASTER
和SLAVE
之间同步消息的机制,SYNC
的意思是当Slave
和Master
的消息同步完成后再返回发送成功的状态。flushDiskType = ASYNC_FLUSH
flushDiskType
表示刷盘策略,可选值有 ASYNC_FLUSH 和 SYNC_FLUSH 两种,分别代表同步刷盘和异步刷盘。同步情况下,消息只有真正写入磁盘才返回成功状态;异步情况下,消息写入 page_cache 后就返回成功状态。listenPort = 11011
Broker 监听的端口,一台服务器启动多个 Broker,需要设置不同的监听端口避免端口冲突。storePathRootDir = /app/rocketmq/store-a
存储消息以及配置信息的根目录。
可视化管理平台
RocketMQ 可以使用 rocketmq-externals
作为运维管理平台,Github 地址 https://github.com/apache/rocketmq-externals,我们需要将源码下载下来后再进行手动编译,过程如下:
-
下载
从 github(https://github.com/apache/rocketmq-externals) 下载 RocketMQ 可视化管理工具rocketmq-externals
的源码; -
打包
下载完成后切换进 rocketmq-console 目录,使用 maven 命令对其打包mvn clean package -Dmaven.test.skip=true
,打包完成后生成可执行文件 rocketmq-console-ng-1.0.1.jar -
运行
使用java -jar rocketmq-console-ng-1.0.1.jar --server.port=8080 --rocketmq.config.namesrvAddr=xxxx.xxx.xxx.xxx:9876
命令启动,这里注意需要设置两个参数:
--server.port
为运行的这个 web 应用的端口,如果不设置的话默认为 8080;
--rocketmq.config.namesrvAddr
为 RocketMQ 命名服务地址,若 NameServer 为集群则使用英文 ; 分割 -
访问
浏览器访问xxx.xxx.xxx.xxx:8080
进入控制台界面,效果如下
SpringBoot 整合 RocketMQ
在 SpringBoot 中整合 RocketMQ 主要用到 rocketmq-spring-boot-starter
组件,下面是详细整合过程。
- 引入组件
rocketmq-spring-boot-starter
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
- 修改 application.yml,添加 RocketMQ 相关配置
rocketmq:
name-server: 192.168.100.43:9876;192.168.100.44:9876
producer:
group: test-group
send-message-timeout: 3000
如果是集群,多个 name-server 使用英文 ; 分割。
- 编写消息生产者
MessageProduce
/**
* Description:
* rocketMQ消息发送方法
* @author javadaily
*/
@Component
public class MessageProduce {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
* @param topic 主题
* @param message 消息体
*/
public void sendMessage(String topic,String message){
this.rocketMQTemplate.convertAndSend(topic,message);
}
}
使用 RocketMQTemplate 发送消息
- 编写消息消费者
MessageConsumer
@Slf4j
@Component
@RocketMQMessageListener(
topic = "test-topic",
consumerGroup = "test-group",
selectorExpression = "*"
)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("received message is {}", message);
}
}
消费者只需要继承 RocketMQListener 类即可,主要关注实现类上的 @RocketMQMessageListener
注解,配置的 topic
和 consumerGroup
需要跟消息生产者的配置保持一致。
- 编写单元测试发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class MessageProduceTest {
@Autowired
private MessageProduce messageProduce;
@Test
public void testSendMessage() {
messageProduce.sendMessage("test-topic","Hello,JAVA日知录");
}
}
- 测试
先启动 springboot 应用,再执行测试用例。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于