深入学习 RabbitMQ(一):mandatory 标志的作用

本贴最后更新于 2253 天前,其中的信息可能已经事过境迁

【转】http://blog.csdn.net/hzw19920329/article/details/54311277

在生产者通过 channel 的 basicPublish 方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看 Channel 接口,会发现存在 3 个重载的 basicPublish 方法

[java] view plain copy

  1. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  2. void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
  3.          throws IOException;  
    
  4. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  5.          throws IOException;  
    
     他们共有的参数分别是:
     exchange:交换机名称
     routingKey:路由键
     props:消息属性字段,比如消息头部信息等等
     body:消息主体部分
     除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志
     mandatory的作用:
    
     当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;
    
    下面我们通过几个实例测试下mandatory标志的作用:
     测试1:设置mandatory标志,且exchange未绑定队列
    

[java] view plain copy

  1. public class ProducerTest {

  2.  public static void main(String[] args) {  
    
  3.      String exchangeName = "confirmExchange";  
    
  4.      String queueName = "confirmQueue";  
    
  5.      String routingKey = "confirmRoutingKey";  
    
  6.      String bindingKey = "confirmBindingKey";  
    
  7.      int count = 3;  
    
  8.      ConnectionFactory factory = new ConnectionFactory();  
    
  9.      factory.setHost("172.16.151.74");  
    
  10.      factory.setUsername("test");  
    
  11.      factory.setPassword("test");  
    
  12.      factory.setPort(5672);  
    
  13.      //创建生产者  
    
  14.      Sender producer = new Sender(factory, count, exchangeName, routingKey);  
    
  15.      producer.run();  
    
  16.  }  
    
  17. }

  18. class Sender

  19. {

  20.  private ConnectionFactory factory;  
    
  21.  private int count;  
    
  22.  private String exchangeName;  
    
  23.  private String routingKey;  
    
  24.  public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {  
    
  25.      this.factory = factory;  
    
  26.      this.count = count;  
    
  27.      this.exchangeName = exchangeName;  
    
  28.      this.routingKey = routingKey;  
    
  29.  }  
    
  30.  public void run() {  
    
  31.      try {  
    
  32.          Connection connection = factory.newConnection();  
    
  33.          Channel channel = connection.createChannel();  
    
  34.          //创建exchange  
    
  35.          channel.exchangeDeclare(exchangeName, "direct", true, false, null);  
    
  36.          //发送持久化消息  
    
  37.          for(int i = 0;i < count;i++)  
    
  38.          {  
    
  39.              //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
    
  40.              channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());  
    
  41.          }  
    
  42.      } catch (Exception e) {  
    
  43.          e.printStackTrace();  
    
  44.      }  
    
  45.  }  
    
  46. }

    第 45 行我们将 basicPublish 的第三个参数 mandatory 设置成了 true,表示开启了 mandatory 标志,但我们没有为当前 exchange 绑定任何队列;

    通过 wireshark 抓包看到下面输出:

    可以看到最后执行了 basic.return 方法,将发布者发出的消息返还给了发布者,查看协议的 Arguments 参数部分可以看到,Reply-Text 字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;

    那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为 channel 信道设置 ReturnListener 监听器来实现,具体实现代码见下:

[java] view plain copy

  1. public class ProducerTest {

  2.  public static void main(String[] args) {  
    
  3.      String exchangeName = "confirmExchange";  
    
  4.      String queueName = "confirmQueue";  
    
  5.      String routingKey = "confirmRoutingKey";  
    
  6.      String bindingKey = "confirmBindingKey";  
    
  7.      int count = 3;  
    
  8.      ConnectionFactory factory = new ConnectionFactory();  
    
  9.      factory.setHost("172.16.151.74");  
    
  10.      factory.setUsername("test");  
    
  11.      factory.setPassword("test");  
    
  12.      factory.setPort(5672);  
    
  13.      //创建生产者  
    
  14.      Sender producer = new Sender(factory, count, exchangeName, routingKey);  
    
  15.      producer.run();  
    
  16.  }  
    
  17. }

  18. class Sender

  19. {

  20.  private ConnectionFactory factory;  
    
  21.  private int count;  
    
  22.  private String exchangeName;  
    
  23.  private String routingKey;  
    
  24.  public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {  
    
  25.      this.factory = factory;  
    
  26.      this.count = count;  
    
  27.      this.exchangeName = exchangeName;  
    
  28.      this.routingKey = routingKey;  
    
  29.  }  
    
  30.  public void run() {  
    
  31.      try {  
    
  32.          Connection connection = factory.newConnection();  
    
  33.          Channel channel = connection.createChannel();  
    
  34.          //创建exchange  
    
  35.          channel.exchangeDeclare(exchangeName, "direct", true, false, null);  
    
  36.          //发送持久化消息  
    
  37.          for(int i = 0;i < count;i++)  
    
  38.          {  
    
  39.              //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
    
  40.              //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
    
  41.              //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
    
  42.              channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());  
    
  43.          }  
    
  44.          channel.addReturnListener(new ReturnListener() {  
    
  45.              @Override  
    
  46.              public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)  
    
  47.                      throws IOException {  
    
  48.                  //此处便是执行Basic.Return之后回调的地方  
    
  49.                  String message = new String(arg5);  
    
  50.                  System.out.println("Basic.Return返回的结果:  "+message);  
    
  51.              }  
    
  52.          });  
    
  53.      } catch (Exception e) {  
    
  54.          e.printStackTrace();  
    
  55.      }  
    
  56.  }  
    
  57. }

    在设置了 ReturnListener 监听器之后,broker(代理服务器)发出 basic.return 方法之后,就会回调第 52 行的 handleReturn 方法,在这个方法里面我们就可以进行消息的重新发布操作啦;

    测试 2:设置 mandatory 标志,且为 exchange 绑定队列(路由键和绑定键一致)

[java] view plain copy

  1. public class ProducerTest {

  2.  public static void main(String[] args) {  
    
  3.      String exchangeName = "confirmExchange";  
    
  4.      String queueName = "confirmQueue";  
    
  5.      String routingKey = "confirmRoutingKey";  
    
  6.      String bindingKey = "confirmRoutingKey";  
    
  7.      //String bindingKey = "confirmBindingKey";  
    
  8.      int count = 3;  
    
  9.      ConnectionFactory factory = new ConnectionFactory();  
    
  10.      factory.setHost("172.16.151.74");  
    
  11.      factory.setUsername("test");  
    
  12.      factory.setPassword("test");  
    
  13.      factory.setPort(5672);  
    
  14.      //创建生产者  
    
  15.      Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);  
    
  16.      producer.run();  
    
  17.  }  
    
  18. }

  19. class Sender

  20. {

  21.  private ConnectionFactory factory;  
    
  22.  private int count;  
    
  23.  private String exchangeName;  
    
  24.  private String  queueName;  
    
  25.  private String routingKey;  
    
  26.  private String bindingKey;  
    
  27.  public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {  
    
  28.      this.factory = factory;  
    
  29.      this.count = count;  
    
  30.      this.exchangeName = exchangeName;  
    
  31.      this.queueName = queueName;  
    
  32.      this.routingKey = routingKey;  
    
  33.      this.bindingKey = bindingKey;  
    
  34.  }  
    
  35.  public void run() {  
    
  36.      try {  
    
  37.          Connection connection = factory.newConnection();  
    
  38.          Channel channel = connection.createChannel();  
    
  39.          //创建exchange  
    
  40.          channel.exchangeDeclare(exchangeName, "direct", true, false, null);  
    
  41.          //创建队列  
    
  42.          channel.queueDeclare(queueName, true, false, false, null);  
    
  43.          //绑定exchange和queue  
    
  44.          channel.queueBind(queueName, exchangeName, bindingKey);  
    
  45.          //发送持久化消息  
    
  46.          for(int i = 0;i < count;i++)  
    
  47.          {  
    
  48.              //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,  
    
  49.              //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话  
    
  50.              //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键  
    
  51.              channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());  
    
  52.          }  
    
  53.          channel.addReturnListener(new ReturnListener() {  
    
  54.              @Override  
    
  55.              public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)  
    
  56.                      throws IOException {  
    
  57.                  //此处便是执行Basic.Return之后回调的地方  
    
  58.                  String message = new String(arg5);  
    
  59.                  System.out.println("Basic.Return返回的结果:  "+message);  
    
  60.              }  
    
  61.          });  
    
  62.      } catch (Exception e) {  
    
  63.          e.printStackTrace();  
    
  64.      }  
    
  65.  }  
    
  66. }

    通过抓包发现并不会有 basic.return 方法被调用,查看 RabbitMQ 管理界面发现消息已经到达了队列;

[java] view plain copy

    测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致)

    代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:

   注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了;

   到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列;

   参考资料:

   [RabbitMQ(二) AMQP协议mandatory和immediate标志位区别](http://blog.csdn.net/jiao_fuyou/article/details/21594947)

   [RabbitMQ之mandatory](https://my.oschina.net/moooofly/blog/147634)
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 395 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Mac

    Mac 是苹果公司自 1984 年起以“Macintosh”开始开发的个人消费型计算机,如:iMac、Mac mini、Macbook Air、Macbook Pro、Macbook、Mac Pro 等计算机。

    164 引用 • 594 回帖 • 1 关注
  • Firefox

    Mozilla Firefox 中文俗称“火狐”(正式缩写为 Fx 或 fx,非正式缩写为 FF),是一个开源的网页浏览器,使用 Gecko 排版引擎,支持多种操作系统,如 Windows、OSX 及 Linux 等。

    7 引用 • 30 回帖 • 457 关注
  • CentOS

    CentOS(Community Enterprise Operating System)是 Linux 发行版之一,它是来自于 Red Hat Enterprise Linux 依照开放源代码规定释出的源代码所编译而成。由于出自同样的源代码,因此有些要求高度稳定的服务器以 CentOS 替代商业版的 Red Hat Enterprise Linux 使用。两者的不同在于 CentOS 并不包含封闭源代码软件。

    238 引用 • 224 回帖
  • 职场

    找到自己的位置,萌新烦恼少。

    126 引用 • 1699 回帖
  • Sillot

    Sillot (汐洛)孵化自思源笔记,致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点
    Github 地址:https://github.com/Hi-Windom/Sillot

    12 引用 • 26 关注
  • RESTful

    一种软件架构设计风格而不是标准,提供了一组设计原则和约束条件,主要用于客户端和服务器交互类的软件。基于这个风格设计的软件可以更简洁,更有层次,更易于实现缓存等机制。

    30 引用 • 114 回帖 • 8 关注
  • Wide

    Wide 是一款基于 Web 的 Go 语言 IDE。通过浏览器就可以进行 Go 开发,并有代码自动完成、查看表达式、编译反馈、Lint、实时结果输出等功能。

    欢迎访问我们运维的实例: https://wide.b3log.org

    30 引用 • 218 回帖 • 594 关注
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    169 引用 • 799 回帖
  • JRebel

    JRebel 是一款 Java 虚拟机插件,它使得 Java 程序员能在不进行重部署的情况下,即时看到代码的改变对一个应用程序带来的影响。

    26 引用 • 78 回帖 • 618 关注
  • OAuth

    OAuth 协议为用户资源的授权提供了一个安全的、开放而又简易的标准。与以往的授权方式不同之处是 oAuth 的授权不会使第三方触及到用户的帐号信息(如用户名与密码),即第三方无需使用用户的用户名与密码就可以申请获得该用户资源的授权,因此 oAuth 是安全的。oAuth 是 Open Authorization 的简写。

    36 引用 • 103 回帖 • 6 关注
  • RIP

    愿逝者安息!

    8 引用 • 92 回帖 • 286 关注
  • React

    React 是 Facebook 开源的一个用于构建 UI 的 JavaScript 库。

    192 引用 • 291 回帖 • 444 关注
  • MongoDB

    MongoDB(来自于英文单词“Humongous”,中文含义为“庞大”)是一个基于分布式文件存储的数据库,由 C++ 语言编写。旨在为应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似 JSON 的 BSON 格式,因此可以存储比较复杂的数据类型。

    90 引用 • 59 回帖 • 4 关注
  • NGINX

    NGINX 是一个高性能的 HTTP 和反向代理服务器,也是一个 IMAP/POP3/SMTP 代理服务器。 NGINX 是由 Igor Sysoev 为俄罗斯访问量第二的 Rambler.ru 站点开发的,第一个公开版本 0.1.0 发布于 2004 年 10 月 4 日。

    311 引用 • 546 回帖 • 57 关注
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    207 引用 • 2031 回帖
  • 国际化

    i18n(其来源是英文单词 internationalization 的首末字符 i 和 n,18 为中间的字符数)是“国际化”的简称。对程序来说,国际化是指在不修改代码的情况下,能根据不同语言及地区显示相应的界面。

    7 引用 • 26 回帖 • 1 关注
  • Hprose

    Hprose 是一款先进的轻量级、跨语言、跨平台、无侵入式、高性能动态远程对象调用引擎库。它不仅简单易用,而且功能强大。你无需专门学习,只需看上几眼,就能用它轻松构建分布式应用系统。

    9 引用 • 17 回帖 • 591 关注
  • uTools

    uTools 是一个极简、插件化、跨平台的现代桌面软件。通过自由选配丰富的插件,打造你得心应手的工具集合。

    5 引用 • 13 回帖
  • 工具

    子曰:“工欲善其事,必先利其器。”

    273 引用 • 676 回帖
  • OpenStack

    OpenStack 是一个云操作系统,通过数据中心可控制大型的计算、存储、网络等资源池。所有的管理通过前端界面管理员就可以完成,同样也可以通过 Web 接口让最终用户部署资源。

    10 引用 • 9 关注
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖 • 1 关注
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    84 引用 • 139 回帖
  • Swift

    Swift 是苹果于 2014 年 WWDC(苹果开发者大会)发布的开发语言,可与 Objective-C 共同运行于 Mac OS 和 iOS 平台,用于搭建基于苹果平台的应用程序。

    34 引用 • 37 回帖 • 496 关注
  • Kubernetes

    Kubernetes 是 Google 开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理。

    108 引用 • 54 回帖
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖 • 1 关注
  • Kotlin

    Kotlin 是一种在 Java 虚拟机上运行的静态类型编程语言,由 JetBrains 设计开发并开源。Kotlin 可以编译成 Java 字节码,也可以编译成 JavaScript,方便在没有 JVM 的设备上运行。在 Google I/O 2017 中,Google 宣布 Kotlin 成为 Android 官方开发语言。

    19 引用 • 33 回帖 • 20 关注
  • Jenkins

    Jenkins 是一套开源的持续集成工具。它提供了非常丰富的插件,让构建、部署、自动化集成项目变得简单易用。

    51 引用 • 37 回帖