SpringBoot 整合 RabbitMQ,实现消息发送与多个消费者的情况

本贴最后更新于 1250 天前,其中的信息可能已经时移世易

配置

### rabbitmq
spring:
  rabbitmq:
    host: 192.168.2.111
    port: 5673
    username: guest
    password: guest

依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>2.3.3.RELEASE</version>
</dependency>

配置队列

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 订阅模式
 *
 * @author ming
 * @version 1.0.0
 * @date 2021/2/1 14:49
 **/

@Configuration
public class TopicQueueConfig {

    /**
     * 配置队列
     */
    @Bean(name = "topic_queue")
    public Queue updatePasswordQueue() {
        return new Queue("topic_queue");
    }

    /**
     * 配置交换机
     */
    @Bean(name = "topic_queue_exchange")
    public TopicExchange exchange() {
        return new TopicExchange("topic_queue_exchange");
    }

    /**
     * 将队列按照相应的规则绑定到交换机上
     *
     * @param queue    消息队列
     * @param exchange 交换机
     */
    @Bean
    public Binding bindingExchangeMessages(@Qualifier("topic_queue") Queue queue,
                                           @Qualifier("topic_queue_exchange") TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("topic.queue.#");
    }
}

生产者

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

/**
 * 生产者
 *
 * @author ming
 * @date 2021/6/22
 **/
@Component
@Slf4j
@RequiredArgsConstructor
public class TopicQueueSender {

    private final AmqpTemplate template;

    /**
     * 发送同步修改密码的队列通知
     *
     * @param content 内容
     */
    public void send(String content) {
        template.convertAndSend("topic_queue_exchange", "topic.queue.update", content);
    }
}

消费者

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 *
 * @author ming
 * @version 1.0.0
 * @date 2021/2/1 11:35
 **/

@Slf4j
@Component
@RequiredArgsConstructor
public class TopicQueueReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic_queue_a", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "topic_queue_exchange", type = ExchangeTypes.TOPIC),
            key = "topic.queue.update"
    ))
    public void receiverA(String content) {
        log.info("进入监听:{}", content);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic_queue_b", durable = "true", autoDelete = "false"),
            exchange = @Exchange(value = "topic_queue_exchange", type = ExchangeTypes.TOPIC),
            key = "topic.queue.update"
    ))
    public void receiverB(String content) {
        log.info("进入监听:{}", content);
    }
}

注意:

  • Exchange 和 RoutingKey 相同、queue 不同时,所有消费者都能消费同样的信息;
  • Exchange 和 RoutingKey、queue 都相同时,消费者(随机一个)中只有一个能消费信息,其他消费者都不能消费该信息。
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 361 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3190 引用 • 8214 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...