1. 导入 rabbitmq
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 在 application.yml 中添加 rabbitmq 配置
spring: rabbitmq: host: localhost # rabbitmq的IP地址或对应的域名 username: admin # rabbitmq登录账号 password: 123456 # rabbitmq 登录密码 virtual-host: / publisher-returns: true connection-timeout: 15000 template: mandatory: true listener: simple: concurrency: 5 #监听最小为5 max-concurrency: 10 #监听最大为10
3.注入队列
@Configuration public class RabbitConfig { @Bean public Queue Queue() { return new Queue("zzm-rabbitmq"); } }
注意:zzm-rabbitmq 为队列名称
4.创建消息生产者
@Component public class RabbitProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 10000L) public void send() { rabbitTemplate.convertAndSend("zzm-rabbitmq", "Hello rabbitmq !~"); } }
注意:通过 zzm-rabbitmq 发送一个字符串也可以发送一个 object,在生成环境中,绝大多数是一个 object
5.创建消费者
@Component public class RabbitConsumer { @RabbitHandler @RabbitListener(queues = "zzm-rabbitmq") public void process(@Payload String foo) { System.out.println(new Date() + ": " + foo); } }
@RabbitListener(queues = "zzm-rabbitmq")这里里面要注意一下,这里要写你刚才在 Queue 里面的那个队列名称
6.启动主类
1. 在启动类上面加入@EnableScheduling,表示启动定时任务 2. 修改Producer生产者类中的 @Scheduled(fixedDelay = 10000L)里面的数值 3. 控制台打印:Mon Oct 21 16:55:43 GMT+08:00 2019: Hello rabbitmq !~ 表示成功 4. 到此,一个简单的springboot + rabbitmq整合应用就弄好了; 5. 由于rabbitmq引用了ExChange概念,ExChange有四种类型:Direct、 Topic、 Headers 、Fanout;其中Headers使用是最少的了,Direct是最简单的; 6. 下面我们来介绍一下topic和Fanout。
6.1 简单说一下 ExChange 中的四种类型
direct:exchange在和queue进行binding时会设置routingkey,将消息发送到exchange时会设置对应的routingkey,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。 fanout:直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。(广播) topic:此类型exchange和direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'。 其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词。 header:其路由的规则是根据header来判断,其中的header就是binding时的arguments参数:
7. Topic Exchange
7.1 配置 Topic 规则
import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { @Bean public Queue queueMessage1() { return new Queue(MQConst.TOPIC_QUEUENAME1); } @Bean public Queue queueMessage2() { return new Queue(MQConst.TOPIC_QUEUENAME2); } @Bean TopicExchange exchange() { return new TopicExchange(MQConst.TOPIC_EXCHANGE); } @Bean Binding bindingExchangeMessage(Queue queueMessage1, TopicExchange exchange) { // 将队列1绑定到名为topicKey.A的routingKey return BindingBuilder.bind(queueMessage1).to(exchange).with(MQConst.TOPIC_KEY1); } @Bean Binding bindingExchangeMessages(Queue queueMessage2, TopicExchange exchange) { // 将队列2绑定到所有topicKey.开头的routingKey return BindingBuilder.bind(queueMessage2).to(exchange).with(MQConst.TOPIC_KEYS); } }
7.2 配置消费者
import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class TopicConsumer { @RabbitHandler @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1) public void process1(Object message) { System.out.println("queue:topic.message1,message:" + message); } @RabbitHandler @RabbitListener(queues = MQConst.TOPIC_QUEUENAME2) public void process2(Object message) { System.out.println("queue:topic.message2,message:" + message); } }
7.3 配置生产者
import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; @Configuration public class TopicProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 10000L) public void send() { rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEYS, "我是TOPIC_KEYS"); rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "我是TOPIC_KEY1"); } }
7.4 启动主类,控制台输出:
queue:topic.message2,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-FFUIrKtXJAJI_ecGEo0NSA, consumerQueue=topic_queuename2]) queue:topic.message2,message:(Body:'我是TOPIC_KEYS' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.#, deliveryTag=1, consumerTag=amq.ctag-1VqDmcyXJi1_ZhD3GJcefA, consumerQueue=topic_queuename2]) queue:topic.message1,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-dDQ5i2_SYPcAYdCTHmjDxg, consumerQueue=topic_queuename1]) queue:topic.message1,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-fgs6LkKQo9x2dXw9El6OEQ, consumerQueue=topic_queuename1]) queue:topic.message2,message:(Body:'我是TOPIC_KEYS' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.#, deliveryTag=1, consumerTag=amq.ctag-rX6y-N4JGfg_FB1xzHsBiw, consumerQueue=topic_queuename2]) queue:topic.message2,message:(Body:'我是TOPIC_KEY1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic_exchange, receivedRoutingKey=topicKey.A, deliveryTag=1, consumerTag=amq.ctag-cdPvOG38zLth8qBNxAaDug, consumerQueue=topic_queuename2])
8. Fanout Exchange
8.1 配置 Fanout 规则
import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { @Bean public Queue MessageA() { return new Queue(MQConst.FANOUT_QUEUENAME1); } @Bean public Queue MessageB() { return new Queue(MQConst.FANOUT_QUEUENAME2); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(MQConst.FANOUT_EXCHANGE); } @Bean Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageB).to(fanoutExchange); } }
8.2 配置消费者
import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class FanoutConsumer { @RabbitHandler @RabbitListener(queues = MQConst.FANOUT_QUEUENAME1) public void process1(String message){ System.out.println("queue:fanout.message1,message:" + message); } @RabbitHandler @RabbitListener(queues = MQConst.FANOUT_QUEUENAME2) public void process2(String message){ System.out.println("queue:fanout.message2,message:" + message); } }
8.3 配置生产消息
import com.example.amqbrabbitmq.common.MQConst; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.Scheduled; @Configuration public class FanoutProducer { @Autowired private RabbitTemplate rabbitTemplate; @Scheduled(fixedDelay = 10000L) public void send() { rabbitTemplate.convertAndSend(MQConst.FANOUT_EXCHANGE,"", "我是FANOUT_QUEUENAME2"); } }
8.4 启动主类 控制台输出:
queue:fanout.message2,message:我是FANOUT_QUEUENAME2 queue:fanout.message1,message:我是FANOUT_QUEUENAME2 queue:fanout.message2,message:我是FANOUT_QUEUENAME2
总结:
一般direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。header类型用的比较少,但还是知道一点好。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于