一: RabbitMQ
1.由于 RabbitMQ 是由 Elang 语言开发的,所以安装 RabbitMQ 需要首先安装 Elang 环境(需要添加入环境变量才可以正常启动 RabbitMQ)。
2.在安装过程中需要注意 ERlang 和 RabbitMQ 的版本对应,如果版本不正确的话也会导致 RabbitMQ 安装失败。
3.当使用 springboot 去远程连接 rabbitMQ 时,不能使用默认的 guest 用户,官方文档也写到,该用户引用于本地连接使用。
4.RabbitMQ 有 web 的 UI 界面,不过需要启动(rabbitmq-plugins.bat enable rabbitmq_management 命令),windows\linux 下如果启动不成功,可能是没有在管理员的权限下进行运行。开启和关闭 RabbitMQ 的命令分别为 net start\stop RabbitMQ。
二:Springboot 和 RabbitMQ 整合
创建 maven 项目并导入需要的 jar 包
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-test</artifactId> </dependency> </dependencies>
并配置好 application.properties:
spring.application.name=Spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=#username spring.rabbitmq.password=#password
1.Direct 模式:最简单的消息队列模式
RabbitConfig.java:RabbitMQ 的配置类,在这里可以定义一个消息队列。
@Configuration public class RabbitConfig { @Bean public Queue Queue(){ // ("hello") 值得是创建这个队列的名字 return new Queue("hello"); } }
HelloSender.java: 发送端的编写
@Service public class HelloSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(int i ){ String context = "hello" + new Date(); System.out.println("Sender:" + context + "******" + i); // 这里需要指定发送的消息队列 this.rabbitTemplate.convertAndSend("hello",context); } }
HelloReceiver.java: 接收端的编写
@Component @RabbitListener(queues = "hello") public class HelloReceiver { @RabbitHandler public void process1(String hello){ System.out.println("Receiver1:" + hello); } }
TestRabbitMQ:对于队列的发送测试
@SpringBootTest @RunWith(SpringJUnit4ClassRunner.class) public class TestRabbitMQ { @Autowired private HelloSender helloSender; @Test public void testRabbit(){ helloSender.send(1); } }
最后编写 springboot 的项目启动类后启动,会看到发送端和接收端的数据。
2.Topic 模式:可以自定义接收队列的模式,是最灵活的一种。
RabbitConfig 中创建两个消息队列,把其都绑定在同一个交换机中,通过 with 来限定满足什么条件的时候接收器触发。
@Configuration public class RabbitConfig { // 创建 queue @Bean(name = "message") public Queue Queue(){ return new Queue("topic.message"); // topic.message ,是 rounting-key 匹配规则 } @Bean(name = "messages") public Queue queueMessages(){ return new Queue("topic.messages"); } /** * 创建交换机 * @return */ @Bean public TopicExchange exchange(){ return new TopicExchange("exchange"); } // 根据绑定规则将队列绑定到相应的交换机上 @Bean public Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange){ return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean public Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange){ // * 表示一个词, # 表示零个或多个词 return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
当发送的消息是 queueMessage 的时候两个接收器都会接受,而当发送的消息是 queueMessages 的时候只有第二个接收器会接受。
3.Fanout 模式:意为广播模式,可以同时分发到绑定交换机的所有接收器。
RabbitMQ 中编写了 3 个消息队列,把其都绑定在交换机上。
@Configuration /** * 广播模式 Fanout */ public class FanoutRabbitConfig { @Bean public Queue AMessage(){ return new Queue("fanout.A"); } @Bean public Queue BMessage(){ return new Queue("fanout.B"); } @Bean public Queue CMessage(){ return new Queue("fanout.C"); } // 构建交换器 @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutExchange"); } // 把 A,B,C都绑到 fanoutExchange 上 @Bean Binding bindingExchangeA(Queue AMessage , FanoutExchange fanoutExchange){ return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage , FanoutExchange fanoutExchange){ return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage , FanoutExchange fanoutExchange){ return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
运行的结果是当发送消息的时候交换器会将其分发到 3 个接收器上
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于