RocketMQ 入门基础 - 环境 & 整合

本贴最后更新于 388 天前,其中的信息可能已经物是人非

概述&选型

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要用于三种典型场景:应用解耦流量消峰消息分发

目前主流的 MQ 主要是 Rocketmq、kafka、Rabbitmq,Rocketmq 相比于 Rabbitmq、kafka 具有主要优势特性有:

本文主要介绍 RocketMQ 的单机安装、双机主从高可用安装配置、运维管理平台搭建、与 SpringBoot 整合几个知识点,具备相关知识技能的同学请直接拉到最后点个“在看”即可。
image.png

文章开始之前需要先准备好 JDK1.8 或以上的服务器环境以及从 rocketmq 官网下载好二进制安装包,下载地址 http://rocketmq.apache.org/dowloading/releases/

单机安装配置

工欲善其事必先利其器,要想深入了解 RocketMQ 得先把环境安装好,咱们先开始单机版 RocketMQ 的安装!

单机情况下安装使用 RocketMQ 很简单,只需要分别启动 NameServer 和 Broker Server 即可!

关闭 RockerMQ 需要使用下面的命令:

# 先关闭Broker Server
> sh bin/mqshutdown broker
# 再关闭NameServer
> sh bin/mqshutdown namesrv

双机主从高可用搭建

为了消除单机故障,增加可靠性或增大吞吐量,可以在多台服务器上部署多个 NameServer 和 Broker,并为每个 Broker 部署一个或多个 Slave。本节将说明使用两台机器,搭建双主、双从、无单点故障的高可用 RocketMQ 集群。假设现在有两台服务器,IP 地址分别为:192.168.100.43 和 192.168.100.44,部署架构如下:
image.png

启动多个 NameServer 和 Broker

首先需要在两台服务器上分别启动 NameServer(nohup sh bin/mqnamesrv &),这样我们就得到了一个无单点的 NameServer 服务,服务地址为 192.168.100.43:9876 和 192.168.100.44:9876。

然后在两台服务器中 RocketMQ 的 conf 目录分别建立两个文件 broker-master.propertiesbroker-slave.properties,下面是不同服务器的配置说明:

namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort = 10911
storePathRootDir = /app/rocketmq/store-a
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
listenPort = 11011
storePathRootDir = /app/rocketmq/store-b
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-b
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort = 10911
storePathRootDir = /app/rocketmq/store-b
namesrvAddr = 192.168.100.43:9876;192.168.100.44:9876
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 1
deleteWhen = 04
fileReservedTime = 48
brokerRole = SLAVE
flushDiskType = ASYNC_FLUSH
listenPort = 11011
storePathRootDir = /app/rocketmq/store-a

然后分别使用如下命令启动两台服务器的主节点和从节点
nohup sh bin/mqbroker -c conf/broker-master.properties &
nohup sh bin/mqbroker -c conf/broker-slave.properties &

这样一个高可用的 RockerMQ 集群就搭建好了,我们登陆可视化运维管理界面查看集群状态,集群正常启动。
image.png

重要参数说明

本节主要是对 Broker 的配置文件中用到的参数进行说明

可视化管理平台

RocketMQ 可以使用 rocketmq-externals 作为运维管理平台,Github 地址 https://github.com/apache/rocketmq-externals,我们需要将源码下载下来后再进行手动编译,过程如下:

SpringBoot 整合 RocketMQ

在 SpringBoot 中整合 RocketMQ 主要用到 rocketmq-spring-boot-starter 组件,下面是详细整合过程。

<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
	<version>2.1.0</version>
</dependency>
rocketmq:
  name-server: 192.168.100.43:9876;192.168.100.44:9876
  producer:
    group: test-group
    send-message-timeout: 3000

如果是集群,多个 name-server 使用英文 ; 分割。

/**
 * Description:
 * rocketMQ消息发送方法
 * @author javadaily
 */
@Component
public class MessageProduce {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送消息
     * @param topic 主题
     * @param message 消息体
     */
    public void sendMessage(String topic,String message){
        this.rocketMQTemplate.convertAndSend(topic,message);
    }
}

使用 RocketMQTemplate 发送消息

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "test-topic",
        consumerGroup = "test-group",
        selectorExpression = "*"
)
public class MessageConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message is {}", message);
    }
}

消费者只需要继承 RocketMQListener 类即可,主要关注实现类上的 @RocketMQMessageListener 注解,配置的 topicconsumerGroup 需要跟消息生产者的配置保持一致。

@RunWith(SpringRunner.class)
@SpringBootTest
public class MessageProduceTest {
    @Autowired
    private MessageProduce messageProduce;

    @Test
    public void testSendMessage() {
        messageProduce.sendMessage("test-topic","Hello,JAVA日知录");
    }
}
  • RocketMQ
    18 引用 • 10 回帖
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    858 引用 • 1420 回帖 • 618 关注

广告 我要投放

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • yp

    没关系的,这是看业务的 rabbitmq 使用频率不低,相反 RocketMQ 虽然是 apache 项目 但大部分也只是在国内应用
    主流的 MQ 是 Kafka 和 RabbitMQ
    目前 Apache Pulsar 的概念非常好 可以了解一下,从长远来看,有取代 Kafka 的趋势

  • 其他回帖
  • someone

    果然被嘲讽了,印证了梦

    1 回复
  • someone

    试试 rabbit?

  • someone
    作者

    都 2020 年了还 rabbit????

  • 查看全部回帖