Skip to content

Latest commit

 

History

History

rabbitmq-springboot-reflect

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 

correlationId: 应用程序使用-关联标识符

messageId: 应用程序使用-消息标识符

Timestamp: 应用程序使用-消息时间戳

type: 应用程序使用-消息类型名称

AppId: 应用程序使用-创建应用程序id

首先我们创建了RabbitMqListenerAware类并实现了ChannelAwareMessageListenerApplicationContextAware

ChannelAwareMessageListener: 作用主要是接听消息,实现了其中的onMessage方法,当生产者成功成功发送消息时这个回调 方法会执行。

ApplicationContextAware: 作用是用来获取容器中的Bean,之后根据反射执行这个Bean中的方法。并实现了其中的setApplicationContext方法 用来给application赋值。

我们在RabbitmqConfig中配置了:

 @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(topic_exchange);
        rabbitTemplate.setRoutingKey(routing_message);
        return rabbitTemplate;
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory,Queue... queues){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueues(queues);
        MessageListenerAdapter adapter = new MessageListenerAdapter(new RabbitMqListenerAware());
        //设置处理器的消费消息的默认方法,如果没有设置,那么默认的处理器中的默认方式是handleMessage方法
        adapter.setDefaultListenerMethod("onMessage");
        container.setMessageListener(adapter);
        return container;
    }

这里主要说一下MessageListenerAdapter,我们上面实现的ChannelAwareMessageListener接口,同时MessageListenerAdapter也实现了该接口

我们在这里设置了消息默认的监听方法onMessage所以上面说到的onMessage回调方法才会执行,如果不设置会走默认的handleMessage方法

为什么这么说? 我们看源码可以发现:

public MessageListenerAdapter() {
        this.queueOrTagToMethodName = new HashMap();
        this.defaultListenerMethod = "handleMessage";
        this.delegate = this;
    }

所以说如果不设置默认方法会执行handleMessage方法。

BasicProducer所有生产者的父类,在这里我们将实现发送消息以及消息发送成功和失败的回调,下面贴出发送消息的方法:

public void sendMessage(String serviceName, String serviceMethodName, String correlationId, Object msg) {
        this.rabbitTemplate.setCorrelationKey(correlationId);
        //参数: String routingKey, Object message, MessagePostProcessor messagePostProcessor
        this.rabbitTemplate.convertAndSend(routing_message, msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                long tag = new Random().nextLong();
                message.getMessageProperties().setDeliveryTag(tag);
                message.getMessageProperties().setTimestamp(new Date());
                message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
                message.getMessageProperties().setCorrelationId(correlationId);
                message.getMessageProperties().setHeader("serviceName",serviceName);
                message.getMessageProperties().setHeader("serviceMethodName",serviceMethodName);
                System.out.println("Random 随机数:" + tag);
                return message;
            }
        }, new CorrelationData(correlationId));

    }

当然我们也是调用rabbitTemplate进行发送消息,其中参数有: 路由键,消息内容以及消息属性,在消息属性中我们的重点是:

message.getMessageProperties().setHeader("serviceName",serviceName);

message.getMessageProperties().setHeader("serviceMethodName",serviceMethodName);

serviceName的值是消费者的类名, serviceMethodName:消费者的方法名

这个消息发送成功之后就会执行上面说到的消息监听方法onMessage方法,在其中我们通过反射执行消息者方法。

消费者接收到消息,处理完成之后并ack