RabbitMQ 学习笔记

本贴最后更新于 1576 天前,其中的信息可能已经时过境迁

前言

本篇文章是我学习 RabbitMQ 的笔记,主要包括两大部分,第一部分是 RabbitMQ 的安装与基本概念,第二部分是在 SpringBoot 项目中编写测试 RabbitMQ。

RabbitMQ 基础

简介

RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为消息中间件),主要用来实现异步通信和程序解耦。

安装

推荐使用 Docker 来安装

1. 拉取 RabbitMQ 镜像
docker pull rabbitmq
2. 创建 RabbitMQ 容器
docker run -p 5672:5672 -p 15672:15672 --name rabbitmq -d rabbitmq
3. 进入 RabbitMQ 容器
docker exec -it rabbitmq /bin/bash
4. 使用管理插件

在 RabbitMQ 容器里输入下面的指令即可开启插件

rabbitmq-plugins enable rabbitmq_management

在浏览器输入 http://127.0.0.1:15672/ 出现下图所示界面

loginrabbitmq.png

然后输入默认的账号 guest,密码也是 guest 就可以登录

RabbitMQ 模型

RabbitMQ 支持的是 AMQP 0-9-1 Model 模型,如下图所示

hello-world.webp

简单描述以下这个模型,一个 发布者(Publisher)交换机(Exchange) 发送一条消息,然后 交换机(Exchange) 根据 路由关系(binding) 将消息复制分发给和它绑定的 队列(Queue) ,最后,订阅了 队列(Queue)消费者(Consumer) 消费这些消息。

1. 发布(生产)者(Publisher/Producer)

指的是给交换机发送消息的程序

2. 队列(Queue)

队列本质上是一个消息缓冲区,负责接收来自交换机的消息

3. 消费者(Consumer)

指的是从队列中消费消息的程序

4. 绑定(Bindings)

交换机和一个队列的关系称为一个 binding

5. 交换机(Exchange)

生产者将消息发送给交换机,交换机再根据绑定关系把这些消息分发给对应的队列,交换机有四种类型,分别是

Direct Exchange

这是默认的交换机类型,Direct Exchange 会根据一个 路由键(routing key) 去分发消息给对应的队列。它的过程是这样的,生产者发消息的时候会指定一个 routing key R , Direct Exchange 绑定队列时也会指定一个 routing key K, 当 Direct Exchange 接收到消息的时候, 会给那些满足 (K = R) 的队列分发消息。

Fanout Exchange

这种类型的交换机会把接收到的消息分发给所有和自己绑定的队列(会忽略 routing key),就是广播模式。

Topic Exchange

Topic 类型的交换机和 direct 类型的效果很像,也是根据路由键来匹配队列,但是匹配规则更加灵活。Topic Exchange 可以根据通配符 *# 来匹配,其中:

  • *: 表示可以替换一个字符
  • #: 表示可以替换一个或者多个字符

topicwildcard.webp

如上图所示,共有三个绑定关系(bindings),交换机 X 和 队列 Q1 有一个绑定关系 *.orange.*,和 队列 Q2 有两个绑定关系,*.*.rabbit"lazy.#。举例说明一下:

  1. 假设现在有一个消息设置了 routing key 为 quick.orange.rabbit,那么这个消息会被交换机分发给 Q1 和 Q2。
  2. 假设现在有一个消息设置了 routing key 为 lazy.orange.elephant,那么这个消息也会被交换机分发给 Q1 和 Q2。
  3. 假设现在有一个消息设置了 routing key 为 quick.orange.fox,那么这个消息只会被交换机分发给 Q1。
  4. 假设现在有一个消息设置了 routing key 为 lazy.pink.rabbit,那么这个消息会被交换机分发给 Q2,并且只会分发一次。
Headers Exchange

Headers 类型的交换机,舍弃了 routing key,而是根据 消息头(message headers) 来匹配队列。有两种匹配模式,消息头带有一个参数 x-match,如果这个参数的值是 all,代表匹配所有;如果是 any,代表匹配任意一个值。

SpringBoot 整合 RabbitMQ

创建 RabbitMQ 项目

通过 IDEA 创建项目时选择 RabbitMQ 的依赖

springbootcreaterabbitmq.png

创建完成后,pom.xml 文件会包含以下两个依赖(我创建的时候,遇到一个问题,spring-boot-starter-parent 版本高于 2.3.0 会报错,找不到 spring-rabbit-test 依赖,所以我手动将 spring-boot-starter-parent 的版本改为了 2.3.0.)

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> </dependency>

项目创建好了,我们就开始写测试 demo

一对一使用

先通过一个简单的入门程序,来感受一些 RabbitMQ 消息队列,模型图如下:

hellodemo.png

1. 创建 RabbitMQ 配置类

HelloConfig 这个配置类注入了一个名为 hello 的队列

@Configuration public class HelloConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } }
2. 创建一个生产者

HelloProducer 这个类负责生产消息

@Component @Slf4j public class HelloProducer { @Autowired private RabbitTemplate template; public void produce(int i) { String message = "hello : " + i; log.info("hello produce: ===============> " + message); // 参考:<a>https://www.rabbitmq.com/tutorials/tutorial-one-python.html<a/> // The queue name needs to be specified in the routing_key parameter: // convertAndSend函数有三个参数,第一个是交换机名称,如果不给就是默认的;第二个是路由key指队列名称;第三个是消息体 template.convertAndSend("hello", message); } }
3. 创建一个消费者

HelloReceiver 类用来监听队列 hello,负责消费从队列 hello 分发过来的消息。
@RabbitListener(queues = "hello") 这个注解可以标注在类上面,也可以标注在方法上,当标注在类上时,还需要通过 @RabbitHandler 来标注接收消息的方法

@Component @RabbitListener(queues = "hello") @Slf4j public class HelloReceiver { @RabbitHandler public void process(String message) { log.info("hello receive : ================> " + message); } }
4. 测试类

测试类通过注入生产者 HelloProducer,通过循环调用 HelloProducer 类的 produce 方法来产生消息

@SpringBootTest @Slf4j public class HelloTest { @Autowired private HelloProducer helloProducer; @Test public void hello() throws Exception{ int i = 0; while (i < 10) { helloProducer.produce(i); Thread.sleep(3000); i++; } } }

输出结果:

hello produce: ===============> hello : 0 hello receive : ================> hello : 0 hello produce: ===============> hello : 1 hello receive : ================> hello : 1 hello produce: ===============> hello : 2 hello receive : ================> hello : 2 .....
5. 结论

从输出日志可以看出,生产者每生产一个消息,消费者就消费一个,这是最简单的一对一消息模型

一对多和多对多使用

1. 创建一个配置类
@Configuration public class WorkQueueConfig { @Bean public Queue workQueue() { return new Queue("workQueue"); } }
2. 创建两个生产者

这两个生产者都将消息发送给队列 workQueue

@Slf4j public class WorkProducerA { @Autowired private RabbitTemplate template; public void produce(int i) { template.convertAndSend("workQueue", ("WorkProducerA message ******* " + i)); } }
@Component @Slf4j public class WorkProducerB { @Autowired private RabbitTemplate template; public void produce(int i) { template.convertAndSend("workQueue", ("WorkProducerB message ******* " + i)); } }
3. 创建两个消费者

这两个消费者都监听队列 workQueue

@Component @RabbitListener(queues = "workQueue") @Slf4j public class WorkReceiverA { @RabbitHandler public void process(String message) { log.info("workReceiverA : " + message); } }
@Component @RabbitListener(queues = "workQueue") @Slf4j public class WorkReceiverB { @RabbitHandler public void process(String message) { log.info("workReceiverB : " + message); } }
4. 测试

测试类有两个方法,分别是测试一对多和多对多的

@SpringBootTest public class WorkQueueTest { @Autowired private WorkProducerA workProducerA; @Autowired private WorkProducerB workProducerB; /** * 一对多 * * 参考:<a>https://www.rabbitmq.com/tutorials/tutorial-two-python.html</a> * <p> * By default, RabbitMQ will send each message to the next consumer, in sequence. * On average every consumer will get the same number of messages */ @Test public void oneToMany() { int i = 0; while (i < 20) { workProducerA.produce(i); i++; } } /** * 多对多 * */ @Test public void manyToMany() { int i = 0; while (i < 40) { workProducerA.produce(i); workProducerB.produce(i); i++; } } }

一对多输出结果:

workReceiverB : WorkProducerA message ******* 0 workReceiverA : WorkProducerA message ******* 1 workReceiverB : WorkProducerA message ******* 2 workReceiverA : WorkProducerA message ******* 3 ..... workReceiverB : WorkProducerA message ******* 18 workReceiverA : WorkProducerA message ******* 19
5. 结论

当同一个队列向多个消费者提供消息时,采取的是均分策略,每个消费者会得到数量相同的消息

多对多输出结果:

workReceiverB : WorkProducerA message ******* 0 workReceiverA : WorkProducerB message ******* 0 workReceiverA : WorkProducerB message ******* 1 workReceiverB : WorkProducerA message ******* 1 workReceiverA : WorkProducerB message ******* 2 workReceiverB : WorkProducerA message ******* 2 ..... workReceiverB : WorkProducerA message ******* 39 workReceiverA : WorkProducerB message ******* 39

结论:当多个生产者给同一个队列发送消息,而这个队列给多个消费者提供消息时,也是平均分配的

Direct Exchange

在实际应用中,我们可能需要根据一些实际情况,将特定类型的消息分发给特定的队列,再通过队列将消息提供给特定的消费者。举个例子,我们需要做一个处理日志的功能,我们的应用程序(日志生产者)会产生的 error 类型日志和 info 类型日志,现在有两个日志消费者,其中一个只接收 error 类型的日志,另一个则接收 errorinfo 类型的日志,现在我们通过代码来实现。

1. 创建一个 Direct Exchange 配置类

该配置类创建了两个队列 logQueueA 和 logQueueB,创建了一个 Direcrt 类型交换机(direct.logs),然后将队列 logQueueA 和交换机 direct.logs 绑定,并指定 router key 为 error,再将队列 logQueueB 和交换机 direct.logs 绑定,分别指定 router key 为 error 和 info。

@Configuration public class DirectConfig { @Bean public Queue logQueueA() { return new Queue("logQueueA"); } @Bean public Queue logQueueB() { return new Queue("logQueueB"); } @Bean public DirectExchange directExchange() { return new DirectExchange("direct.logs"); } /** * logQueueA 和 error 日志绑定 * * @return */ @Bean public Binding bindingExchangeErrorLogs() { return BindingBuilder.bind(logQueueA()).to(directExchange()).with("error"); } /** * logQueueB 和 error 日志绑定 * * @return */ @Bean public Binding bindingExchangeMixLogsA() { return BindingBuilder.bind(logQueueB()).to(directExchange()).with("error"); } /** * logQueueB 和 info 日志绑定 * * @return */ @Bean public Binding bindingExchangeMixLogsB() { return BindingBuilder.bind(logQueueB()).to(directExchange()).with("info"); } }
2. 创建一个日志生产者

模拟生产日志消息,根据传入的参数类型,决定日志类型和 router key

@Configuration public class LogsProducer { @Autowired private RabbitTemplate template; /** * 根据参数指定routingKey * * @param type */ public void produce(String type) { template.convertAndSend("direct.logs", type, "This is " + type + " logs"); } }
3. 创建两个日志消费者

消费者 LogReceiverA 监听队列 logQueueA

@Component @Slf4j @RabbitListener(queues = "logQueueA") public class LogReceiverA { @RabbitHandler public void receive(String message) { log.info("error receiver receive : " + message); } }

消费者 LogReceiverB 监听队列 logQueueB

@Component @Slf4j @RabbitListener(queues = "logQueueB") public class LogReceiverB { @RabbitHandler public void receive(String message) { log.info("mix receiver receive : " + message); } }
4. 测试类

测试类有两个方法,分别测试生产 error 类型日志和 info 类型日志,当生产 error 类型日志时,LogReceiverA 和 LogReceiverB 两个消费者都接收到了日志消息;当生产 info 日志时,只有消费者 LogReceiverB 接收到日志消息。

@SpringBootTest public class DirectTest { @Autowired private LogsProducer logsProducer; @Test public void testErrorLog() { logsProducer.produce("error"); // mix receiver receive : This is error logs // error receiver receive : This is error logs } @Test public void testInfoLog() { logsProducer.produce("info"); // mix receiver receive : This is info logs } }
5.结论

Direct Exchange 通过 router key 来对消息进行匹配

Fanout Exchange

Fanout Exhcnage 主要应用于广播消息,该类型的交换机会把生产者生产的消息通过广播的形式发送给所有和自己绑定的交换机。

1. 创建 Fanout Exchange 配置类

该配置类创建了三个队列 broadcastQueueA、broadcastQueueB、broadcastQueueC,又创建了一个 Fanout Exhcnage (fanout.broadcast),再将这三个队列都和交换机(fanout.broadcast)绑定。

@Configuration public class FanoutConfig { @Bean public Queue broadcastQueueA() { return new Queue("broadcastQueueA"); } @Bean public Queue broadcastQueueB() { return new Queue("broadcastQueueB"); } @Bean public Queue broadcastQueueC() { return new Queue("broadcastQueueC"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanout.broadcast"); } @Bean public Binding bindingExchangeMessageA() { return BindingBuilder.bind(broadcastQueueA()).to(fanoutExchange()); } @Bean public Binding bindingExchangeMessageB() { return BindingBuilder.bind(broadcastQueueB()).to(fanoutExchange()); } @Bean public Binding bindingExchangeMessageC() { return BindingBuilder.bind(broadcastQueueC()).to(fanoutExchange()); } }
2. 创建生产者

该生产者主要生产一条广播消息

@Component @Slf4j public class BroadcastProducer { @Autowired private RabbitTemplate template; public void send() { String context = "This is a broadcast message"; template.convertAndSend("fanout.broadcast","", context); } }
3. 创建消费者

消费者 BroadcastReceiverA 监听队列 broadcastQueueA

@Component @Slf4j @RabbitListener(queues = "broadcastQueueA") public class BroadcastReceiverA { @RabbitHandler public void process(String message) { log.info("BroadcastReceiverA receive : " + message); } }

消费者 BroadcastReceiverB 监听队列 broadcastQueueB

@Component @Slf4j @RabbitListener(queues = "broadcastQueueB") public class BroadcastReceiverB { @RabbitHandler public void process(String message) { log.info("BroadcastReceiverB receive : " + message); } }

消费者 BroadcastReceiverC 监听队列 broadcastQueueC

@Component @Slf4j @RabbitListener(queues = "broadcastQueueC") public class BroadcastReceiverC { @RabbitHandler public void process(String message) { log.info("BroadcastReceiverC receive : " + message); } }
4. 测试类
@SpringBootTest public class FanoutTest { @Autowired private BroadcastProducer fanoutSender; /** * 发布订阅模式 * */ @Test public void testFanout() { fanoutSender.send(); } }

输出结果:

BroadcastReceiverA receive : This is a broadcast message BroadcastReceiverB receive : This is a broadcast message BroadcastReceiverC receive : This is a broadcast message
5. 结论

Fanout Exhcnage 不需要指定 router key,它会将接收到的来自生产者的生产的消息发送给所有和自己绑定的队列,它的路由效果如下图所示:

exchangefanout.webp

Topic Exchange

Topic Exchange 也是根据 router key 来匹配队列,但是匹配规则很灵活,因为可以根据通配符来匹配。我们通过一个匹配颜色的例子来演示

1. 创建 Topic Exchange 配置类

该配置类创建了两个队列 colorQueueA 和 colorQueueB,以及一个 Topic Exchange(topic.color),再将队列 colorQueueA 和交换机绑定,并指定了 router ky 为 *.blue.*,将队列 colorQueueB 和交换机绑定,并指定了 router key 为 green.#

@Configuration public class TopicConfig { @Bean public Queue topicQueueA() { return new Queue("colorQueueA"); } @Bean public Queue topicQueueB() { return new Queue("colorQueueB"); } @Bean public TopicExchange colorTopicExchange() { return new TopicExchange("topic.color"); } @Bean public Binding bindingExchangeTopicQueueA() { return BindingBuilder.bind(topicQueueA()).to(colorTopicExchange()).with("*.blue.*"); } @Bean public Binding bindingExchangeTopicQueueB() { return BindingBuilder.bind(topicQueueB()).to(colorTopicExchange()).with("green.#"); } }
2. 创建生产者

根据传入的参数,指定 router key

@Component @Slf4j public class ColorProducer { @Autowired private RabbitTemplate template; public void produce(String criteria) { template.convertAndSend("topic.color", criteria, "This is " + criteria); } }
3. 创建两个消费者

消费者 ColorReceiverA 监听队列 colorQueueA

@Component @Slf4j @RabbitListener(queues = "colorQueueA") public class ColorReceiverA { @RabbitHandler public void receive(String color) { log.info("ColorReceiverA receive: " + color); } }

消费者 ColorReceiverB 监听队列 colorQueueB

@Component @Slf4j @RabbitListener(queues = "colorQueueB") public class ColorReceiverB { @RabbitHandler public void receive(String color) { log.info("ColorReceiverB receive: " + color); } }
4. 测试类

在测试类中,我们写了两个测试方法,分别指定 router key 为 yellow.blue.redgreen.blue.red

@SpringBootTest public class TopicTest { @Autowired private ColorProducer colorProducer; @Test public void test1() { String criteria = "yellow.blue.red"; colorProducer.produce(criteria); } @Test public void test2() { String criteria = "green.blue.red"; colorProducer.produce(criteria); } }

输出结果:

test1 方法输出

ColorReceiverA receive: This is yellow.blue.red

test2 方法输出

ColorReceiverA receive: This is green.blue.red\ ColorReceiverB receive: This is green.blue.red
5. 结论

Topic Exchange 中的 router key 可以写成通配符的格式,如 *.blue.* 或者 green.#,这样使得匹配规则更加灵活

Headers Exchange

Headers Exchange 没有 router key 了,而是根据消息头来匹配。消息头内容是 key-value 键值对格式,可以指定多个 key-value 键值对,所以需要指定匹配模式是 all 还是 any,即匹配所有的 key-value 键值对,还是只匹配其中一条。

1. 创建 Headers Exchange 配置类

该配置类创建了两个队列 imageQueueA 和 imageQueueB,以及一个 Headers Exchange(headers.image),再将 imageQueueA 和交换机(headers.image)绑定,并且指定匹配模式为 all,再将 imageQueueA 和交换机(headers.image)绑定,并且指定匹配模式为 any

@Configuration public class HeadersConfig { @Bean public Queue imageQueueA() { return new Queue("imageQueueA"); } @Bean public Queue imageQueueB() { return new Queue("imageQueueB"); } @Bean public HeadersExchange headersExchange() { return new HeadersExchange("headers.image"); } /** * 匹配type=jpg且size=12的消息,分发给队列imageQueueA * * @return */ @Bean public Binding bindingHeadersExchangeA() { Map<String, Object> keys = Maps.newHashMap(); keys.put("type", "jpg"); keys.put("size", 12); return BindingBuilder.bind(imageQueueA()).to(headersExchange()).whereAll(keys).match(); } /** * 匹配type=png或者size=6的消息,分发给队列imageQueueB * * @return */ @Bean public Binding bindingHeadersExchangeB() { Map<String, Object> keys = Maps.newHashMap(); keys.put("type", "png"); keys.put("size", 6); return BindingBuilder.bind(imageQueueB()).to(headersExchange()).whereAny(keys).match(); } }
2. 创建生产者类

为了方便传参,我们定义了一个类 Image

@Builder @ToString @Getter public class Image implements Serializable { private static final long serialVersionUID = 8617592564349459927L; private String type; private int size; }

生产者类,根据传入的参数指定消息头内容

@Component public class ImagesProducer { @Autowired private RabbitTemplate template; public void produce(Image image) { MessageProperties properties = new MessageProperties(); properties.setHeader("type", image.getType()); properties.setHeader("size", image.getSize()); template.convertAndSend("headers.image", "", new Message(image.toString().getBytes(), properties)); } }
3. 创建消费者类

消费者 ImageReceiverA 监听队列 imageQueueA

@Component @Slf4j public class ImageReceiverA { @RabbitListener(queues = "imageQueueA") public void receive(byte[] image) { log.info("ImageReceiverA receive : " + new String(image)); } }

消费者 ImageReceiverB 监听队列 imageQueueB

@Component @Slf4j public class ImageReceiverB { @RabbitListener(queues = "imageQueueB") public void receive(byte[] image) { log.info("ImageReceiverB receive : " + new String(image)); } }
4. 测试类
@SpringBootTest public class HeadersTest { @Autowired private ImagesProducer imagesProducer; @Test public void testAll() { imagesProducer.produce(Image.builder().type("jpg").size(12).build()); // ImageReceiverA receive : Image(type=jpg, size=12) } @Test public void testAny() { imagesProducer.produce(Image.builder().type("png").size(12).build()); // ImageReceiverB receive : Image(type=png, size=12) } @Test public void testAny2() { imagesProducer.produce(Image.builder().type("jpg").size(6).build()); // ImageReceiverB receive : Image(type=jpg, size=6) } }

输出结果:

testAll 方法输出:

ImageReceiverA receive : Image(type=jpg, size=12)

testAny 方法输出:

mageReceiverB receive : Image(type=png, size=12)

testAny2 方法输出:

ImageReceiverB receive : Image(type=jpg, size=6)
5. 结论

Headers Exchange 可以灵活增加或减少消息头中的键值对来实现匹配。

传输对象

前面的例子都是传的字符串,在实际的开发中经常需要传输对象,现在我们写一个例子来传输对象

1. 创建一个实体类
@Builder @ToString public class User implements Serializable { private static final long serialVersionUID = 4409039369681054682L; private String name; private int age; }
2. 创建配置类

这里使用的是 Topic Exchange

@Configuration public class UserConfig { @Bean public Queue userInfo() { return new Queue("user.info"); } @Bean public TopicExchange userTopicExchange() { return new TopicExchange("topic.user"); } @Bean public Binding bindingTopicExchange() { return BindingBuilder.bind(userInfo()).to(userTopicExchange()).with("user.info"); } }
3. 创建生产者
@Component public class UserProducer { @Autowired private RabbitTemplate template; public void produce() { template.convertAndSend("topic.user", "user.info", User.builder().name("Tom").age(20).build()); } }
4. 创建消费者
@Component @Slf4j public class UserReceiver { /** * RabbitListener注解可以直接作用于方法 * * @param user */ @RabbitListener(queues = "user.info") public void receive(User user) { log.info("receive user : " + user); } }
5. 测试类
@SpringBootTest public class UserTest { @Autowired private UserProducer userProducer; @Test public void test() { userProducer.produce(); } }

输出结果:

receive user : User(name=Tom, age=20)

最后

RabbitMQ 的基本概念和 SpringBoot 整合 RabbitMQ 的常用模式都总结完毕了,以后项目中需要实际使用 RabbitMQ 时,再去深入研究吧。

本篇文章的完整代码

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 344 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    949 引用 • 1460 回帖
1 操作
marshalby2 在 2021-01-20 15:57:41 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...