【转】 RabbitMQ 之 mandatory 和 immediate

本贴最后更新于 2353 天前,其中的信息可能已经时过境迁

原文链接: http://blog.csdn.net/u013256816/article/details/54914525

1. 概述

mandatory 和 immediate 是 AMQP 协议中 basic.publish 方法中的两个标识位,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。对于刚开始接触 RabbitMQ 的朋友特别容易被这两个参数搞混,这里博主整理了写资料,简单讲解下这两个标识位。

mandatory
当 mandatory 标志位设置为 true 时,如果 exchange 根据自身类型和消息 routeKey 无法找到一个符合条件的 queue,那么会调用 basic.return 方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当 mandatory 设置为 false 时,出现上述情形 broker 会直接将消息扔掉。

immediate
当 immediate 标志位设置为 true 时,如果 exchange 在将消息路由到 queue(s)时发现对于的 queue 上么有消费者,那么这条消息不会放入队列中。当与消息 routeKey 关联的所有 queue(一个或者多个)都没有消费者时,该消息会通过 basic.return 方法返还给生产者。

概括来说,mandatory 标志告诉服务器至少将该消息 route 到一个队列中,否则将消息返还给生产者;immediate 标志告诉服务器如果该消息关联的 queue 上有消费者,则马上将消息投递给它,如果所有 queue 都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。


2. mandatory

在生产者通过 channle 的 basicPublish 方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看 channel 接口,会发现存在 3 个重载的 basicPublish 方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

mandatory 和 immediate 上面已经解释过了,其余的参数分别是:
exchange:交换机名称
routingkey:路由键
props:消息属性字段,比如消息头部信息等等
body:消息主体部分

本节主要讲述 mandatory, 下面我们写一个 demo,在 RabbitMQ broker 中有:
exchange : exchange.mandatory.test
queue: queue.mandatory.test
exchange 路由到 queue 的 routingkey 是 mandatory
这里先不讲当前的 exchange 绑定到 queue 中,即:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

详细代码如下:

package com.vms.test.zzh.rabbitmq.self;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * Created by hidden on 2017/2/7.
 */
public class RBmandatoryTest {
    public static final String ip = "10.198.197.73";
    public static final int port = 5672;
    public static final String username = "root";
    public static final String password = "root";

    public static final String queueName = "queue.mandatory.test";
    public static final String exchangeName = "exchange.mandatory.test";
    public static final String routingKey = "mandatory";
    public static final Boolean mandatory = true;
    public static final Boolean immediate = false;

    public static void main(String[] args) {

        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(ip);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
//            channel.close();
//            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

运行,之后通过 wireshark 抓包工具可以看到如下图所示:

这里可以看到最后执行了 basic.return 方法,将发布者发出的消息返回给了发布者,查看协议的 arguments 参数部分可以看到:reply-text 字段值为 NO_ROUTE,表示消息并没有路由到合适的队列中;

那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为 channel 信道设置 ReturnListener 监听器来实现,具体代码(main 函数部分):

try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(ip);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);

            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.basicQos(1);
            channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println("Basic.return返回的结果是:"+message);
                }
            });

//            channel.close();
//            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

运行结果:

Basic.return返回的结果是:===mandatory===

下面我们来看一下,设置 mandatory 标志且 exchange 路由到 queue 中,代码部分只需要将:

channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

改为

channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());

即可。
通过 wireshark 抓包如下:

可以看到并不会有 basic.return 方法被调用。查看 RabbitMQ 管理界面发现消息已经到达了队列。


3. immediate

在 RabbitMQ3.0 以后的版本里,去掉了 immediate 参数的支持,发送带 immediate 标记的 publish 会返回如下错误:
“{amqp_error,not_implemented,”immediate=true”,’basic.publish’}”

为什么移除 immediate 标记,参见如下版本变化描述:
Removal of “immediate” flag
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.
这段解释的大概意思是:immediate 标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 394 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...