深入学习 RabbitMQ(一):mandatory 标志的作用

本贴最后更新于 2493 天前,其中的信息可能已经事过境迁

【转】http://blog.csdn.net/hzw19920329/article/details/54311277

在生产者通过 channel 的 basicPublish 方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看 Channel 接口,会发现存在 3 个重载的 basicPublish 方法

[java] view plain copy

  1. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  2. void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
  3.          throws IOException;  
    
  4. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  5.          throws IOException;  
    
     他们共有的参数分别是:
     exchange:交换机名称
     routingKey:路由键
     props:消息属性字段,比如消息头部信息等等
     body:消息主体部分
     除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志
     mandatory的作用:
    
     当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
    
    下面我们通过几个实例测试下mandatory标志的作用:
     测试1:设置mandatory标志,且exchange未绑定队列
    

[java] view plain copy

  1. public class ProducerTest {

  2.  public static void main(String[] args) {  
    
  3.      String exchangeName = "confirmExchange";  
    
  4.      String queueName = "confirmQueue";  
    
  5.      String routingKey = "confirmRoutingKey";  
    
  6.      String bindingKey = "confirmBindingKey";  
    
  7.      int count = 3;  
    
  8.      ConnectionFactory factory = new ConnectionFactory();  
    
  9.      factory.setHost("172.16.151.74");  
    
  10.      factory.setUsername("test");  
    
  11.      factory.setPassword("test");  
    
  12.      factory.setPort(5672);  
    
  13.      //创建生产者  
    
  14.      Sender producer = new Sender(factory, count, exchangeName, routingKey);  
    
  15.      producer.run();  
    
  16.  }  
    
  17. }

  18. class Sender

  19. {

  20.  private ConnectionFactory factory;  
    
  21.  private int count;  
    
  22.  private String exchangeName;  
    
  23.  private String routingKey;  
    
  24.  public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {  
    
  25.      this.factory = factory;  
    
  26.      this.count = count;  
    
  27.      this.exchangeName = exchangeName;  
    
  28.      this.routingKey = routingKey;  
    
  29.  }  
    
  30.  public void run() {  
    
  31.      try {  
    
  32.          Connection connection = factory.newConnection();  
    
  33.          Channel channel = connection.createChannel();  
    
  34.          //创建exchange  
    
  35.          channel.exchangeDeclare(exchangeName, "direct", true, false, null);  
    
  36.          //发送持久化消息  
    
  37.          for(int i = 0;i < count;i++)  
    
  38.          {  
    
  39.              //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
    
  40.              channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());  
    
  41.          }  
    
  42.      } catch (Exception e) {  
    
  43.          e.printStackTrace();  
    
  44.      }  
    
  45.  }  
    
  46. }

    第 45 行我们将 basicPublish 的第三个参数 mandatory 设置成了 true,表示开启了 mandatory 标志,但我们没有为当前 exchange 绑定任何队列;

    通过 wireshark 抓包看到下面输出:

    可以看到最后执行了 basic.return 方法,将发布者发出的消息返还给了发布者,查看协议的 Arguments 参数部分可以看到,Reply-Text 字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;

    那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为 channel 信道设置 ReturnListener 监听器来实现,具体实现代码见下:

[java] view plain copy

  1. public class ProducerTest {

  2.  public static void main(String[] args) {  
    
  3.      String exchangeName = "confirmExchange";  
    
  4.      String queueName = "confirmQueue";  
    
  5.      String routingKey = "confirmRoutingKey";  
    
  6.      String bindingKey = "confirmBindingKey";  
    
  7.      int count = 3;  
    
  8.      ConnectionFactory factory = new ConnectionFactory();  
    
  9.      factory.setHost("172.16.151.74");  
    
  10.      factory.setUsername("test");  
    
  11.      factory.setPassword("test");  
    
  12.      factory.setPort(5672);  
    
  13.      //创建生产者  
    
  14.      Sender producer = new Sender(factory, count, exchangeName, routingKey);  
    
  15.      producer.run();  
    
  16.  }  
    
  17. }

  18. class Sender

  19. {

  20.  private ConnectionFactory factory;  
    
  21.  private int count;  
    
  22.  private String exchangeName;  
    
  23.  private String routingKey;  
    
  24.  public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {  
    
  25.      this.factory = factory;  
    
  26.      this.count = count;  
    
  27.      this.exchangeName = exchangeName;  
    
  28.      this.routingKey = routingKey;  
    
  29.  }  
    
  30.  public void run() {  
    
  31.      try {  
    
  32.          Connection connection = factory.newConnection();  
    
  33.          Channel channel = connection.createChannel();  
    
  34.          //创建exchange  
    
  35.          channel.exchangeDeclare(exchangeName, "direct", true, false, null);  
    
  36.          //发送持久化消息  
    
  37.          for(int i = 0;i < count;i++)  
    
  38.          {  
    
  39.              //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
    
  40.              //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
    
  41.              //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
    
  42.              channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());  
    
  43.          }  
    
  44.          channel.addReturnListener(new ReturnListener() {  
    
  45.              @Override  
    
  46.              public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)  
    
  47.                      throws IOException {  
    
  48.                  //此处便是执行Basic.Return之后回调的地方  
    
  49.                  String message = new String(arg5);  
    
  50.                  System.out.println("Basic.Return返回的结果:  "+message);  
    
  51.              }  
    
  52.          });  
    
  53.      } catch (Exception e) {  
    
  54.          e.printStackTrace();  
    
  55.      }  
    
  56.  }  
    
  57. }

    在设置了 ReturnListener 监听器之后,broker(代理服务器)发出 basic.return 方法之后,就会回调第 52 行的 handleReturn 方法,在这个方法里面我们就可以进行消息的重新发布操作啦;

    测试 2:设置 mandatory 标志,且为 exchange 绑定队列(路由键和绑定键一致)

[java] view plain copy

  1. public class ProducerTest {

  2.  public static void main(String[] args) {  
    
  3.      String exchangeName = "confirmExchange";  
    
  4.      String queueName = "confirmQueue";  
    
  5.      String routingKey = "confirmRoutingKey";  
    
  6.      String bindingKey = "confirmRoutingKey";  
    
  7.      //String bindingKey = "confirmBindingKey";  
    
  8.      int count = 3;  
    
  9.      ConnectionFactory factory = new ConnectionFactory();  
    
  10.      factory.setHost("172.16.151.74");  
    
  11.      factory.setUsername("test");  
    
  12.      factory.setPassword("test");  
    
  13.      factory.setPort(5672);  
    
  14.      //创建生产者  
    
  15.      Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);  
    
  16.      producer.run();  
    
  17.  }  
    
  18. }

  19. class Sender

  20. {

  21.  private ConnectionFactory factory;  
    
  22.  private int count;  
    
  23.  private String exchangeName;  
    
  24.  private String  queueName;  
    
  25.  private String routingKey;  
    
  26.  private String bindingKey;  
    
  27.  public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {  
    
  28.      this.factory = factory;  
    
  29.      this.count = count;  
    
  30.      this.exchangeName = exchangeName;  
    
  31.      this.queueName = queueName;  
    
  32.      this.routingKey = routingKey;  
    
  33.      this.bindingKey = bindingKey;  
    
  34.  }  
    
  35.  public void run() {  
    
  36.      try {  
    
  37.          Connection connection = factory.newConnection();  
    
  38.          Channel channel = connection.createChannel();  
    
  39.          //创建exchange  
    
  40.          channel.exchangeDeclare(exchangeName, "direct", true, false, null);  
    
  41.          //创建队列  
    
  42.          channel.queueDeclare(queueName, true, false, false, null);  
    
  43.          //绑定exchange和queue  
    
  44.          channel.queueBind(queueName, exchangeName, bindingKey);  
    
  45.          //发送持久化消息  
    
  46.          for(int i = 0;i < count;i++)  
    
  47.          {  
    
  48.              //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
    
  49.              //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
    
  50.              //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
    
  51.              channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());  
    
  52.          }  
    
  53.          channel.addReturnListener(new ReturnListener() {  
    
  54.              @Override  
    
  55.              public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)  
    
  56.                      throws IOException {  
    
  57.                  //此处便是执行Basic.Return之后回调的地方  
    
  58.                  String message = new String(arg5);  
    
  59.                  System.out.println("Basic.Return返回的结果:  "+message);  
    
  60.              }  
    
  61.          });  
    
  62.      } catch (Exception e) {  
    
  63.          e.printStackTrace();  
    
  64.      }  
    
  65.  }  
    
  66. }

    通过抓包发现并不会有 basic.return 方法被调用,查看 RabbitMQ 管理界面发现消息已经到达了队列;

[java] view plain copy

    测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致)

    代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:

   注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了;

   到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列;

   参考资料:

   [RabbitMQ(二) AMQP协议mandatory和immediate标志位区别](http://blog.csdn.net/jiao_fuyou/article/details/21594947)

   [RabbitMQ之mandatory](https://my.oschina.net/moooofly/blog/147634)
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 364 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    36 引用 • 37 回帖 • 529 关注
  • 百度

    百度(Nasdaq:BIDU)是全球最大的中文搜索引擎、最大的中文网站。2000 年 1 月由李彦宏创立于北京中关村,致力于向人们提供“简单,可依赖”的信息获取方式。“百度”二字源于中国宋朝词人辛弃疾的《青玉案·元夕》词句“众里寻他千百度”,象征着百度对中文信息检索技术的执著追求。

    63 引用 • 785 回帖 • 177 关注
  • 架构

    我们平时所说的“架构”主要是指软件架构,这是有关软件整体结构与组件的抽象描述,用于指导软件系统各个方面的设计。另外还有“业务架构”、“网络架构”、“硬件架构”等细分领域。

    142 引用 • 442 回帖
  • 链书

    链书(Chainbook)是 B3log 开源社区提供的区块链纸质书交易平台,通过 B3T 实现共享激励与价值链。可将你的闲置书籍上架到链书,我们共同构建这个全新的交易平台,让闲置书籍继续发挥它的价值。

    链书社

    链书目前已经下线,也许以后还有计划重制上线。

    14 引用 • 257 回帖
  • Ubuntu

    Ubuntu(友帮拓、优般图、乌班图)是一个以桌面应用为主的 Linux 操作系统,其名称来自非洲南部祖鲁语或豪萨语的“ubuntu”一词,意思是“人性”、“我的存在是因为大家的存在”,是非洲传统的一种价值观,类似华人社会的“仁爱”思想。Ubuntu 的目标在于为一般用户提供一个最新的、同时又相当稳定的主要由自由软件构建而成的操作系统。

    125 引用 • 169 回帖
  • 导航

    各种网址链接、内容导航。

    40 引用 • 173 回帖
  • BND

    BND(Baidu Netdisk Downloader)是一款图形界面的百度网盘不限速下载器,支持 Windows、Linux 和 Mac,详细介绍请看这里

    107 引用 • 1281 回帖 • 27 关注
  • Sphinx

    Sphinx 是一个基于 SQL 的全文检索引擎,可以结合 MySQL、PostgreSQL 做全文搜索,它可以提供比数据库本身更专业的搜索功能,使得应用程序更容易实现专业化的全文检索。

    1 引用 • 211 关注
  • WiFiDog

    WiFiDog 是一套开源的无线热点认证管理工具,主要功能包括:位置相关的内容递送;用户认证和授权;集中式网络监控。

    1 引用 • 7 回帖 • 589 关注
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖
  • 阿里云

    阿里云是阿里巴巴集团旗下公司,是全球领先的云计算及人工智能科技公司。提供云服务器、云数据库、云安全等云计算服务,以及大数据、人工智能服务、精准定制基于场景的行业解决方案。

    89 引用 • 345 回帖 • 1 关注
  • Lute

    Lute 是一款结构化的 Markdown 引擎,支持 Go 和 JavaScript。

    25 引用 • 191 回帖 • 16 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    5 引用 • 7 回帖 • 1 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 49 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖
  • CSDN

    CSDN (Chinese Software Developer Network) 创立于 1999 年,是中国的 IT 社区和服务平台,为中国的软件开发者和 IT 从业者提供知识传播、职业发展、软件开发等全生命周期服务,满足他们在职业发展中学习及共享知识和信息、建立职业发展社交圈、通过软件开发实现技术商业化等刚性需求。

    14 引用 • 155 回帖
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 172 关注
  • Sym

    Sym 是一款用 Java 实现的现代化社区(论坛/BBS/社交网络/博客)系统平台。

    下一代的社区系统,为未来而构建

    524 引用 • 4601 回帖 • 699 关注
  • React

    React 是 Facebook 开源的一个用于构建 UI 的 JavaScript 库。

    192 引用 • 291 回帖 • 385 关注
  • JRebel

    JRebel 是一款 Java 虚拟机插件,它使得 Java 程序员能在不进行重部署的情况下,即时看到代码的改变对一个应用程序带来的影响。

    26 引用 • 78 回帖 • 664 关注
  • LaTeX

    LaTeX(音译“拉泰赫”)是一种基于 ΤΕΧ 的排版系统,由美国计算机学家莱斯利·兰伯特(Leslie Lamport)在 20 世纪 80 年代初期开发,利用这种格式,即使使用者没有排版和程序设计的知识也可以充分发挥由 TeX 所提供的强大功能,能在几天,甚至几小时内生成很多具有书籍质量的印刷品。对于生成复杂表格和数学公式,这一点表现得尤为突出。因此它非常适用于生成高印刷质量的科技和数学类文档。

    12 引用 • 54 回帖 • 62 关注
  • BookxNote

    BookxNote 是一款全新的电子书学习工具,助力您的学习与思考,让您的大脑更高效的记忆。

    笔记整理交给我,一心只读圣贤书。

    1 引用 • 1 回帖
  • 快应用

    快应用 是基于手机硬件平台的新型应用形态;标准是由主流手机厂商组成的快应用联盟联合制定;快应用标准的诞生将在研发接口、能力接入、开发者服务等层面建设标准平台;以平台化的生态模式对个人开发者和企业开发者全品类开放。

    15 引用 • 127 回帖 • 1 关注
  • jQuery

    jQuery 是一套跨浏览器的 JavaScript 库,强化 HTML 与 JavaScript 之间的操作。由 John Resig 在 2006 年 1 月的 BarCamp NYC 上释出第一个版本。全球约有 28% 的网站使用 jQuery,是非常受欢迎的 JavaScript 库。

    63 引用 • 134 回帖 • 724 关注
  • abitmean

    有点意思就行了

    29 关注
  • Ruby

    Ruby 是一种开源的面向对象程序设计的服务器端脚本语言,在 20 世纪 90 年代中期由日本的松本行弘(まつもとゆきひろ/Yukihiro Matsumoto)设计并开发。在 Ruby 社区,松本也被称为马茨(Matz)。

    7 引用 • 31 回帖 • 210 关注
  • OpenShift

    红帽提供的 PaaS 云,支持多种编程语言,为开发人员提供了更为灵活的框架、存储选择。

    14 引用 • 20 回帖 • 633 关注