前言
本篇文章是我学习 RabbitMQ 的笔记,主要包括两大部分,第一部分是 RabbitMQ 的安装与基本概念,第二部分是在 SpringBoot 项目中编写测试 RabbitMQ。
RabbitMQ 基础
简介
RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为消息中间件),主要用来实现异步通信和程序解耦。
安装
推荐使用 Docker 来安装
1. 拉取 RabbitMQ 镜像
docker pull rabbitmq
2. 创建 RabbitMQ 容器
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq -d rabbitmq
3. 进入 RabbitMQ 容器
docker exec -it rabbitmq /bin/bash
4. 使用管理插件
在 RabbitMQ 容器里输入下面的指令即可开启插件
rabbitmq-plugins enable rabbitmq_management
在浏览器输入 http://127.0.0.1:15672/ 出现下图所示界面
然后输入默认的账号 guest,密码也是 guest 就可以登录
RabbitMQ 模型
RabbitMQ 支持的是 AMQP 0-9-1 Model 模型,如下图所示
简单描述以下这个模型,一个 发布者(Publisher) 给 交换机(Exchange) 发送一条消息,然后 交换机(Exchange) 根据 路由关系(binding) 将消息复制分发给和它绑定的 队列(Queue) ,最后,订阅了 队列(Queue) 的 消费者(Consumer) 消费这些消息。
1. 发布(生产)者(Publisher/Producer)
指的是给交换机发送消息的程序
2. 队列(Queue)
队列本质上是一个消息缓冲区,负责接收来自交换机的消息
3. 消费者(Consumer)
指的是从队列中消费消息的程序
4. 绑定(Bindings)
交换机和一个队列的关系称为一个 binding
5. 交换机(Exchange)
生产者将消息发送给交换机,交换机再根据绑定关系把这些消息分发给对应的队列,交换机有四种类型,分别是
Direct Exchange
这是默认的交换机类型,Direct Exchange 会根据一个 路由键(routing key) 去分发消息给对应的队列。它的过程是这样的,生产者发消息的时候会指定一个 routing key R , Direct Exchange 绑定队列时也会指定一个 routing key K, 当 Direct Exchange 接收到消息的时候, 会给那些满足 (K = R) 的队列分发消息。
Fanout Exchange
这种类型的交换机会把接收到的消息分发给所有和自己绑定的队列(会忽略 routing key),就是广播模式。
Topic Exchange
Topic 类型的交换机和 direct 类型的效果很像,也是根据路由键来匹配队列,但是匹配规则更加灵活。Topic Exchange 可以根据通配符 *
和 #
来匹配,其中:
- *: 表示可以替换一个字符
- #: 表示可以替换一个或者多个字符
如上图所示,共有三个绑定关系(bindings),交换机 X 和 队列 Q1 有一个绑定关系 *.orange.*
,和 队列 Q2 有两个绑定关系,*.*.rabbit"
和 lazy.#
。举例说明一下:
- 假设现在有一个消息设置了 routing key 为
quick.orange.rabbit
,那么这个消息会被交换机分发给 Q1 和 Q2。 - 假设现在有一个消息设置了 routing key 为
lazy.orange.elephant
,那么这个消息也会被交换机分发给 Q1 和 Q2。 - 假设现在有一个消息设置了 routing key 为
quick.orange.fox
,那么这个消息只会被交换机分发给 Q1。 - 假设现在有一个消息设置了 routing key 为
lazy.pink.rabbit
,那么这个消息会被交换机分发给 Q2,并且只会分发一次。
Headers Exchange
Headers 类型的交换机,舍弃了 routing key,而是根据 消息头(message headers) 来匹配队列。有两种匹配模式,消息头带有一个参数 x-match
,如果这个参数的值是 all
,代表匹配所有;如果是 any
,代表匹配任意一个值。
SpringBoot 整合 RabbitMQ
创建 RabbitMQ 项目
通过 IDEA 创建项目时选择 RabbitMQ 的依赖
创建完成后,pom.xml
文件会包含以下两个依赖(我创建的时候,遇到一个问题,spring-boot-starter-parent 版本高于 2.3.0 会报错,找不到 spring-rabbit-test 依赖,所以我手动将 spring-boot-starter-parent 的版本改为了 2.3.0.)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
</dependency>
项目创建好了,我们就开始写测试 demo
一对一使用
先通过一个简单的入门程序,来感受一些 RabbitMQ 消息队列,模型图如下:
1. 创建 RabbitMQ 配置类
HelloConfig
这个配置类注入了一个名为 hello 的队列
@Configuration
public class HelloConfig {
@Bean
public Queue helloQueue() {
return new Queue("hello");
}
}
2. 创建一个生产者
HelloProducer
这个类负责生产消息
@Component
@Slf4j
public class HelloProducer {
@Autowired
private RabbitTemplate template;
public void produce(int i) {
String message = "hello : " + i;
log.info("hello produce: ===============> " + message);
// 参考:<a>https://www.rabbitmq.com/tutorials/tutorial-one-python.html<a/>
// The queue name needs to be specified in the routing_key parameter:
// convertAndSend函数有三个参数,第一个是交换机名称,如果不给就是默认的;第二个是路由key指队列名称;第三个是消息体
template.convertAndSend("hello", message);
}
}
3. 创建一个消费者
HelloReceiver
类用来监听队列 hello
,负责消费从队列 hello
分发过来的消息。
@RabbitListener(queues = "hello")
这个注解可以标注在类上面,也可以标注在方法上,当标注在类上时,还需要通过 @RabbitHandler
来标注接收消息的方法
@Component
@RabbitListener(queues = "hello")
@Slf4j
public class HelloReceiver {
@RabbitHandler
public void process(String message) {
log.info("hello receive : ================> " + message);
}
}
4. 测试类
测试类通过注入生产者 HelloProducer
,通过循环调用 HelloProducer
类的 produce
方法来产生消息
@SpringBootTest
@Slf4j
public class HelloTest {
@Autowired
private HelloProducer helloProducer;
@Test
public void hello() throws Exception{
int i = 0;
while (i < 10) {
helloProducer.produce(i);
Thread.sleep(3000);
i++;
}
}
}
输出结果:
hello produce: ===============> hello : 0
hello receive : ================> hello : 0
hello produce: ===============> hello : 1
hello receive : ================> hello : 1
hello produce: ===============> hello : 2
hello receive : ================> hello : 2
.....
5. 结论
从输出日志可以看出,生产者每生产一个消息,消费者就消费一个,这是最简单的一对一消息模型
一对多和多对多使用
1. 创建一个配置类
@Configuration
public class WorkQueueConfig {
@Bean
public Queue workQueue() {
return new Queue("workQueue");
}
}
2. 创建两个生产者
这两个生产者都将消息发送给队列 workQueue
@Slf4j
public class WorkProducerA {
@Autowired
private RabbitTemplate template;
public void produce(int i) {
template.convertAndSend("workQueue", ("WorkProducerA message ******* " + i));
}
}
@Component
@Slf4j
public class WorkProducerB {
@Autowired
private RabbitTemplate template;
public void produce(int i) {
template.convertAndSend("workQueue", ("WorkProducerB message ******* " + i));
}
}
3. 创建两个消费者
这两个消费者都监听队列 workQueue
@Component
@RabbitListener(queues = "workQueue")
@Slf4j
public class WorkReceiverA {
@RabbitHandler
public void process(String message) {
log.info("workReceiverA : " + message);
}
}
@Component
@RabbitListener(queues = "workQueue")
@Slf4j
public class WorkReceiverB {
@RabbitHandler
public void process(String message) {
log.info("workReceiverB : " + message);
}
}
4. 测试
测试类有两个方法,分别是测试一对多和多对多的
@SpringBootTest
public class WorkQueueTest {
@Autowired
private WorkProducerA workProducerA;
@Autowired
private WorkProducerB workProducerB;
/**
* 一对多
*
* 参考:<a>https://www.rabbitmq.com/tutorials/tutorial-two-python.html</a>
* <p>
* By default, RabbitMQ will send each message to the next consumer, in sequence.
* On average every consumer will get the same number of messages
*/
@Test
public void oneToMany() {
int i = 0;
while (i < 20) {
workProducerA.produce(i);
i++;
}
}
/**
* 多对多
*
*/
@Test
public void manyToMany() {
int i = 0;
while (i < 40) {
workProducerA.produce(i);
workProducerB.produce(i);
i++;
}
}
}
一对多输出结果:
workReceiverB : WorkProducerA message ******* 0
workReceiverA : WorkProducerA message ******* 1
workReceiverB : WorkProducerA message ******* 2
workReceiverA : WorkProducerA message ******* 3
.....
workReceiverB : WorkProducerA message ******* 18
workReceiverA : WorkProducerA message ******* 19
5. 结论
当同一个队列向多个消费者提供消息时,采取的是均分策略,每个消费者会得到数量相同的消息
多对多输出结果:
workReceiverB : WorkProducerA message ******* 0
workReceiverA : WorkProducerB message ******* 0
workReceiverA : WorkProducerB message ******* 1
workReceiverB : WorkProducerA message ******* 1
workReceiverA : WorkProducerB message ******* 2
workReceiverB : WorkProducerA message ******* 2
.....
workReceiverB : WorkProducerA message ******* 39
workReceiverA : WorkProducerB message ******* 39
结论:当多个生产者给同一个队列发送消息,而这个队列给多个消费者提供消息时,也是平均分配的
Direct Exchange
在实际应用中,我们可能需要根据一些实际情况,将特定类型的消息分发给特定的队列,再通过队列将消息提供给特定的消费者。举个例子,我们需要做一个处理日志的功能,我们的应用程序(日志生产者)会产生的 error
类型日志和 info
类型日志,现在有两个日志消费者,其中一个只接收 error
类型的日志,另一个则接收 error
和 info
类型的日志,现在我们通过代码来实现。
1. 创建一个 Direct Exchange 配置类
该配置类创建了两个队列 logQueueA 和 logQueueB,创建了一个 Direcrt 类型交换机(direct.logs),然后将队列 logQueueA 和交换机 direct.logs 绑定,并指定 router key 为 error,再将队列 logQueueB 和交换机 direct.logs 绑定,分别指定 router key 为 error 和 info。
@Configuration
public class DirectConfig {
@Bean
public Queue logQueueA() {
return new Queue("logQueueA");
}
@Bean
public Queue logQueueB() {
return new Queue("logQueueB");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.logs");
}
/**
* logQueueA 和 error 日志绑定
*
* @return
*/
@Bean
public Binding bindingExchangeErrorLogs() {
return BindingBuilder.bind(logQueueA()).to(directExchange()).with("error");
}
/**
* logQueueB 和 error 日志绑定
*
* @return
*/
@Bean
public Binding bindingExchangeMixLogsA() {
return BindingBuilder.bind(logQueueB()).to(directExchange()).with("error");
}
/**
* logQueueB 和 info 日志绑定
*
* @return
*/
@Bean
public Binding bindingExchangeMixLogsB() {
return BindingBuilder.bind(logQueueB()).to(directExchange()).with("info");
}
}
2. 创建一个日志生产者
模拟生产日志消息,根据传入的参数类型,决定日志类型和 router key
@Configuration
public class LogsProducer {
@Autowired
private RabbitTemplate template;
/**
* 根据参数指定routingKey
*
* @param type
*/
public void produce(String type) {
template.convertAndSend("direct.logs", type, "This is " + type + " logs");
}
}
3. 创建两个日志消费者
消费者 LogReceiverA 监听队列 logQueueA
@Component
@Slf4j
@RabbitListener(queues = "logQueueA")
public class LogReceiverA {
@RabbitHandler
public void receive(String message) {
log.info("error receiver receive : " + message);
}
}
消费者 LogReceiverB 监听队列 logQueueB
@Component
@Slf4j
@RabbitListener(queues = "logQueueB")
public class LogReceiverB {
@RabbitHandler
public void receive(String message) {
log.info("mix receiver receive : " + message);
}
}
4. 测试类
测试类有两个方法,分别测试生产 error 类型日志和 info 类型日志,当生产 error 类型日志时,LogReceiverA 和 LogReceiverB 两个消费者都接收到了日志消息;当生产 info 日志时,只有消费者 LogReceiverB 接收到日志消息。
@SpringBootTest
public class DirectTest {
@Autowired
private LogsProducer logsProducer;
@Test
public void testErrorLog() {
logsProducer.produce("error");
// mix receiver receive : This is error logs
// error receiver receive : This is error logs
}
@Test
public void testInfoLog() {
logsProducer.produce("info");
// mix receiver receive : This is info logs
}
}
5.结论
Direct Exchange 通过 router key 来对消息进行匹配
Fanout Exchange
Fanout Exhcnage 主要应用于广播消息,该类型的交换机会把生产者生产的消息通过广播的形式发送给所有和自己绑定的交换机。
1. 创建 Fanout Exchange 配置类
该配置类创建了三个队列 broadcastQueueA、broadcastQueueB、broadcastQueueC,又创建了一个 Fanout Exhcnage (fanout.broadcast),再将这三个队列都和交换机(fanout.broadcast)绑定。
@Configuration
public class FanoutConfig {
@Bean
public Queue broadcastQueueA() {
return new Queue("broadcastQueueA");
}
@Bean
public Queue broadcastQueueB() {
return new Queue("broadcastQueueB");
}
@Bean
public Queue broadcastQueueC() {
return new Queue("broadcastQueueC");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.broadcast");
}
@Bean
public Binding bindingExchangeMessageA() {
return BindingBuilder.bind(broadcastQueueA()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeMessageB() {
return BindingBuilder.bind(broadcastQueueB()).to(fanoutExchange());
}
@Bean
public Binding bindingExchangeMessageC() {
return BindingBuilder.bind(broadcastQueueC()).to(fanoutExchange());
}
}
2. 创建生产者
该生产者主要生产一条广播消息
@Component
@Slf4j
public class BroadcastProducer {
@Autowired
private RabbitTemplate template;
public void send() {
String context = "This is a broadcast message";
template.convertAndSend("fanout.broadcast","", context);
}
}
3. 创建消费者
消费者 BroadcastReceiverA 监听队列 broadcastQueueA
@Component
@Slf4j
@RabbitListener(queues = "broadcastQueueA")
public class BroadcastReceiverA {
@RabbitHandler
public void process(String message) {
log.info("BroadcastReceiverA receive : " + message);
}
}
消费者 BroadcastReceiverB 监听队列 broadcastQueueB
@Component
@Slf4j
@RabbitListener(queues = "broadcastQueueB")
public class BroadcastReceiverB {
@RabbitHandler
public void process(String message) {
log.info("BroadcastReceiverB receive : " + message);
}
}
消费者 BroadcastReceiverC 监听队列 broadcastQueueC
@Component
@Slf4j
@RabbitListener(queues = "broadcastQueueC")
public class BroadcastReceiverC {
@RabbitHandler
public void process(String message) {
log.info("BroadcastReceiverC receive : " + message);
}
}
4. 测试类
@SpringBootTest
public class FanoutTest {
@Autowired
private BroadcastProducer fanoutSender;
/**
* 发布订阅模式
*
*/
@Test
public void testFanout() {
fanoutSender.send();
}
}
输出结果:
BroadcastReceiverA receive : This is a broadcast message
BroadcastReceiverB receive : This is a broadcast message
BroadcastReceiverC receive : This is a broadcast message
5. 结论
Fanout Exhcnage 不需要指定 router key,它会将接收到的来自生产者的生产的消息发送给所有和自己绑定的队列,它的路由效果如下图所示:
Topic Exchange
Topic Exchange 也是根据 router key 来匹配队列,但是匹配规则很灵活,因为可以根据通配符来匹配。我们通过一个匹配颜色的例子来演示
1. 创建 Topic Exchange 配置类
该配置类创建了两个队列 colorQueueA 和 colorQueueB,以及一个 Topic Exchange(topic.color),再将队列 colorQueueA 和交换机绑定,并指定了 router ky 为 *.blue.*
,将队列 colorQueueB 和交换机绑定,并指定了 router key 为 green.#
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueueA() {
return new Queue("colorQueueA");
}
@Bean
public Queue topicQueueB() {
return new Queue("colorQueueB");
}
@Bean
public TopicExchange colorTopicExchange() {
return new TopicExchange("topic.color");
}
@Bean
public Binding bindingExchangeTopicQueueA() {
return BindingBuilder.bind(topicQueueA()).to(colorTopicExchange()).with("*.blue.*");
}
@Bean
public Binding bindingExchangeTopicQueueB() {
return BindingBuilder.bind(topicQueueB()).to(colorTopicExchange()).with("green.#");
}
}
2. 创建生产者
根据传入的参数,指定 router key
@Component
@Slf4j
public class ColorProducer {
@Autowired
private RabbitTemplate template;
public void produce(String criteria) {
template.convertAndSend("topic.color", criteria, "This is " + criteria);
}
}
3. 创建两个消费者
消费者 ColorReceiverA 监听队列 colorQueueA
@Component
@Slf4j
@RabbitListener(queues = "colorQueueA")
public class ColorReceiverA {
@RabbitHandler
public void receive(String color) {
log.info("ColorReceiverA receive: " + color);
}
}
消费者 ColorReceiverB 监听队列 colorQueueB
@Component
@Slf4j
@RabbitListener(queues = "colorQueueB")
public class ColorReceiverB {
@RabbitHandler
public void receive(String color) {
log.info("ColorReceiverB receive: " + color);
}
}
4. 测试类
在测试类中,我们写了两个测试方法,分别指定 router key 为 yellow.blue.red
和 green.blue.red
@SpringBootTest
public class TopicTest {
@Autowired
private ColorProducer colorProducer;
@Test
public void test1() {
String criteria = "yellow.blue.red";
colorProducer.produce(criteria);
}
@Test
public void test2() {
String criteria = "green.blue.red";
colorProducer.produce(criteria);
}
}
输出结果:
test1 方法输出
ColorReceiverA receive: This is yellow.blue.red
test2 方法输出
ColorReceiverA receive: This is green.blue.red\
ColorReceiverB receive: This is green.blue.red
5. 结论
Topic Exchange 中的 router key 可以写成通配符的格式,如 *.blue.*
或者 green.#
,这样使得匹配规则更加灵活
Headers Exchange
Headers Exchange 没有 router key 了,而是根据消息头来匹配。消息头内容是 key-value 键值对格式,可以指定多个 key-value 键值对,所以需要指定匹配模式是 all 还是 any,即匹配所有的 key-value 键值对,还是只匹配其中一条。
1. 创建 Headers Exchange 配置类
该配置类创建了两个队列 imageQueueA 和 imageQueueB,以及一个 Headers Exchange(headers.image),再将 imageQueueA 和交换机(headers.image)绑定,并且指定匹配模式为 all,再将 imageQueueA 和交换机(headers.image)绑定,并且指定匹配模式为 any
@Configuration
public class HeadersConfig {
@Bean
public Queue imageQueueA() {
return new Queue("imageQueueA");
}
@Bean
public Queue imageQueueB() {
return new Queue("imageQueueB");
}
@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange("headers.image");
}
/**
* 匹配type=jpg且size=12的消息,分发给队列imageQueueA
*
* @return
*/
@Bean
public Binding bindingHeadersExchangeA() {
Map<String, Object> keys = Maps.newHashMap();
keys.put("type", "jpg");
keys.put("size", 12);
return BindingBuilder.bind(imageQueueA()).to(headersExchange()).whereAll(keys).match();
}
/**
* 匹配type=png或者size=6的消息,分发给队列imageQueueB
*
* @return
*/
@Bean
public Binding bindingHeadersExchangeB() {
Map<String, Object> keys = Maps.newHashMap();
keys.put("type", "png");
keys.put("size", 6);
return BindingBuilder.bind(imageQueueB()).to(headersExchange()).whereAny(keys).match();
}
}
2. 创建生产者类
为了方便传参,我们定义了一个类 Image
@Builder
@ToString
@Getter
public class Image implements Serializable {
private static final long serialVersionUID = 8617592564349459927L;
private String type;
private int size;
}
生产者类,根据传入的参数指定消息头内容
@Component
public class ImagesProducer {
@Autowired
private RabbitTemplate template;
public void produce(Image image) {
MessageProperties properties = new MessageProperties();
properties.setHeader("type", image.getType());
properties.setHeader("size", image.getSize());
template.convertAndSend("headers.image", "", new Message(image.toString().getBytes(), properties));
}
}
3. 创建消费者类
消费者 ImageReceiverA 监听队列 imageQueueA
@Component
@Slf4j
public class ImageReceiverA {
@RabbitListener(queues = "imageQueueA")
public void receive(byte[] image) {
log.info("ImageReceiverA receive : " + new String(image));
}
}
消费者 ImageReceiverB 监听队列 imageQueueB
@Component
@Slf4j
public class ImageReceiverB {
@RabbitListener(queues = "imageQueueB")
public void receive(byte[] image) {
log.info("ImageReceiverB receive : " + new String(image));
}
}
4. 测试类
@SpringBootTest
public class HeadersTest {
@Autowired
private ImagesProducer imagesProducer;
@Test
public void testAll() {
imagesProducer.produce(Image.builder().type("jpg").size(12).build());
// ImageReceiverA receive : Image(type=jpg, size=12)
}
@Test
public void testAny() {
imagesProducer.produce(Image.builder().type("png").size(12).build());
// ImageReceiverB receive : Image(type=png, size=12)
}
@Test
public void testAny2() {
imagesProducer.produce(Image.builder().type("jpg").size(6).build());
// ImageReceiverB receive : Image(type=jpg, size=6)
}
}
输出结果:
testAll 方法输出:
ImageReceiverA receive : Image(type=jpg, size=12)
testAny 方法输出:
mageReceiverB receive : Image(type=png, size=12)
testAny2 方法输出:
ImageReceiverB receive : Image(type=jpg, size=6)
5. 结论
Headers Exchange 可以灵活增加或减少消息头中的键值对来实现匹配。
传输对象
前面的例子都是传的字符串,在实际的开发中经常需要传输对象,现在我们写一个例子来传输对象
1. 创建一个实体类
@Builder
@ToString
public class User implements Serializable {
private static final long serialVersionUID = 4409039369681054682L;
private String name;
private int age;
}
2. 创建配置类
这里使用的是 Topic Exchange
@Configuration
public class UserConfig {
@Bean
public Queue userInfo() {
return new Queue("user.info");
}
@Bean
public TopicExchange userTopicExchange() {
return new TopicExchange("topic.user");
}
@Bean
public Binding bindingTopicExchange() {
return BindingBuilder.bind(userInfo()).to(userTopicExchange()).with("user.info");
}
}
3. 创建生产者
@Component
public class UserProducer {
@Autowired
private RabbitTemplate template;
public void produce() {
template.convertAndSend("topic.user", "user.info", User.builder().name("Tom").age(20).build());
}
}
4. 创建消费者
@Component
@Slf4j
public class UserReceiver {
/**
* RabbitListener注解可以直接作用于方法
*
* @param user
*/
@RabbitListener(queues = "user.info")
public void receive(User user) {
log.info("receive user : " + user);
}
}
5. 测试类
@SpringBootTest
public class UserTest {
@Autowired
private UserProducer userProducer;
@Test
public void test() {
userProducer.produce();
}
}
输出结果:
receive user : User(name=Tom, age=20)
最后
RabbitMQ 的基本概念和 SpringBoot 整合 RabbitMQ 的常用模式都总结完毕了,以后项目中需要实际使用 RabbitMQ 时,再去深入研究吧。
本篇文章的完整代码
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于