springboot 2.1.6 + rabbitmq 整合之道

本贴最后更新于 1988 天前,其中的信息可能已经物是人非

1. 导入 rabbitmq

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2. 在 application.yml 中添加 rabbitmq 配置

spring: rabbitmq: host: localhost # rabbitmq的IP地址或对应的域名 username: admin # rabbitmq登录账号 password: 123456 # rabbitmq 登录密码 virtual-host: / publisher-returns: true connection-timeout: 15000 template: mandatory: true listener: simple: concurrency: 5 #监听最小为5 max-concurrency: 10 #监听最大为10

3.注入队列

@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("zzm-rabbitmq"); } }

注意:zzm-rabbitmq 为队列名称

4.创建消息生产者

@Component public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 10000L) public void send() { rabbitTemplate.convertAndSend("zzm-rabbitmq", "Hello rabbitmq !~"); } }

注意:通过 zzm-rabbitmq 发送一个字符串也可以发送一个 object,在生成环境中,绝大多数是一个 object

5.创建消费者

@Component public class RabbitConsumer { @RabbitHandler @RabbitListener(queues = "zzm-rabbitmq") public void process(@Payload String foo) { System.out.println(new Date() + ": " + foo); } }

@RabbitListener(queues = "zzm-rabbitmq")这里里面要注意一下,这里要写你刚才在 Queue 里面的那个队列名称

6.启动主类

1. 在启动类上面加入@EnableScheduling,表示启动定时任务 2. 修改Producer生产者类中的 @Scheduled(fixedDelay = 10000L)里面的数值 3. 控制台打印:Mon Oct 21 16:55:43 GMT+08:00 2019: Hello rabbitmq !~ 表示成功 4. 到此,一个简单的springboot + rabbitmq整合应用就弄好了; 5. 由于rabbitmq引用了ExChange概念,ExChange有四种类型:Direct、 Topic、 Headers 、Fanout;其中Headers使用是最少的了,Direct是最简单的; 6. 下面我们来介绍一下topic和Fanout。

6.1 简单说一下 ExChange 中的四种类型

direct:exchange在和queue进行binding时会设置routingkey,将消息发送到exchange时会设置对应的routingkey,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。 fanout:直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。(广播) topic:此类型exchange和direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'。 其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词。 header:其路由的规则是根据header来判断,其中的header就是binding时的arguments参数:

7. Topic Exchange

7.1 配置 Topic 规则

import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { @Bean public Queue queueMessage1() { return new Queue(MQConst.TOPIC_QUEUENAME1); } @Bean public Queue queueMessage2() { return new Queue(MQConst.TOPIC_QUEUENAME2); } @Bean TopicExchange exchange() { return new TopicExchange(MQConst.TOPIC_EXCHANGE); } @Bean Binding bindingExchangeMessage(Queue queueMessage1, TopicExchange exchange) { // 将队列1绑定到名为topicKey.A的routingKey return BindingBuilder.bind(queueMessage1).to(exchange).with(MQConst.TOPIC_KEY1); } @Bean Binding bindingExchangeMessages(Queue queueMessage2, TopicExchange exchange) { // 将队列2绑定到所有topicKey.开头的routingKey return BindingBuilder.bind(queueMessage2).to(exchange).with(MQConst.TOPIC_KEYS); } }

7.2 配置消费者

import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicConsumer { @RabbitHandler @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1) public void process1(Object message) { System.out.println("queue:topic.message1,message:" + message); } @RabbitHandler @RabbitListener(queues = MQConst.TOPIC_QUEUENAME2) public void process2(Object message) { System.out.println("queue:topic.message2,message:" + message); } }

7.3 配置生产者

import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; @Configuration public class TopicProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 10000L) public void send() { rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEYS, "我是TOPIC_KEYS"); rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "我是TOPIC_KEY1"); } }

7.4 启动主类,控制台输出:

queue:topic.message2,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-FFUIrKtXJAJI_ecGEo0NSA, consumerQueue=topic_queuename2]) queue:topic.message2,message:(Body:'我是TOPIC_KEYS' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.#, deliveryTag=1, consumerTag=amq.ctag-1VqDmcyXJi1_ZhD3GJcefA, consumerQueue=topic_queuename2]) queue:topic.message1,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-dDQ5i2_SYPcAYdCTHmjDxg, consumerQueue=topic_queuename1]) queue:topic.message1,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-fgs6LkKQo9x2dXw9El6OEQ, consumerQueue=topic_queuename1]) queue:topic.message2,message:(Body:'我是TOPIC_KEYS' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.#, deliveryTag=1, consumerTag=amq.ctag-rX6y-N4JGfg_FB1xzHsBiw, consumerQueue=topic_queuename2]) queue:topic.message2,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-cdPvOG38zLth8qBNxAaDug, consumerQueue=topic_queuename2])

8. Fanout Exchange

8.1 配置 Fanout 规则

import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { @Bean public Queue MessageA() { return new Queue(MQConst.FANOUT_QUEUENAME1); } @Bean public Queue MessageB() { return new Queue(MQConst.FANOUT_QUEUENAME2); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(MQConst.FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageB).to(fanoutExchange); } }

8.2 配置消费者

import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutConsumer { @RabbitHandler @RabbitListener(queues = MQConst.FANOUT_QUEUENAME1) public void process1(String message){ System.out.println("queue:fanout.message1,message:" + message); } @RabbitHandler @RabbitListener(queues = MQConst.FANOUT_QUEUENAME2) public void process2(String message){ System.out.println("queue:fanout.message2,message:" + message); } }

8.3 配置生产消息

import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; @Configuration public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 10000L) public void send() { rabbitTemplate.convertAndSend(MQConst.FANOUT_EXCHANGE,"", "我是FANOUT_QUEUENAME2"); } }

8.4 启动主类 控制台输出:

queue:fanout.message2,message:我是FANOUT_QUEUENAME2 queue:fanout.message1,message:我是FANOUT_QUEUENAME2 queue:fanout.message2,message:我是FANOUT_QUEUENAME2

总结:

一般direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。header类型用的比较少,但还是知道一点好。

源码链接:

https://gitee.com/zzmedu/amqb-rabbitmq.git

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 347 关注
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    946 引用 • 1460 回帖 • 1 关注
  • 消息队列
    40 引用 • 52 回帖 • 2 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...