深入学习 RabbitMQ(一):mandatory 标志的作用

本贴最后更新于 2616 天前,其中的信息可能已经事过境迁

【转】http://blog.csdn.net/hzw19920329/article/details/54311277

在生产者通过 channel 的 basicPublish 方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看 Channel 接口,会发现存在 3 个重载的 basicPublish 方法

[java] view plain copy

  1. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  2. void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
  3. throws IOException;
  4. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  5. throws IOException; 他们共有的参数分别是: exchange:交换机名称 routingKey:路由键 props:消息属性字段,比如消息头部信息等等 body:消息主体部分 除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志 mandatory的作用: 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者; 下面我们通过几个实例测试下mandatory标志的作用: 测试1:设置mandatory标志,且exchange未绑定队列

[java] view plain copy

  1. public class ProducerTest {

  2. public static void main(String[] args) {
  3. String exchangeName = "confirmExchange";
  4. String queueName = "confirmQueue";
  5. String routingKey = "confirmRoutingKey";
  6. String bindingKey = "confirmBindingKey";
  7. int count = 3;
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("172.16.151.74");
  10. factory.setUsername("test");
  11. factory.setPassword("test");
  12. factory.setPort(5672);
  13. //创建生产者
  14. Sender producer = new Sender(factory, count, exchangeName, routingKey);
  15. producer.run();
  16. }
  17. }

  18. class Sender

  19. {

  20. private ConnectionFactory factory;
  21. private int count;
  22. private String exchangeName;
  23. private String routingKey;
  24. public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
  25. this.factory = factory;
  26. this.count = count;
  27. this.exchangeName = exchangeName;
  28. this.routingKey = routingKey;
  29. }
  30. public void run() {
  31. try {
  32. Connection connection = factory.newConnection();
  33. Channel channel = connection.createChannel();
  34. //创建exchange
  35. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  36. //发送持久化消息
  37. for(int i = 0;i < count;i++)
  38. {
  39. //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
  40. channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
  41. }
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }

    第 45 行我们将 basicPublish 的第三个参数 mandatory 设置成了 true,表示开启了 mandatory 标志,但我们没有为当前 exchange 绑定任何队列;

    通过 wireshark 抓包看到下面输出:

    可以看到最后执行了 basic.return 方法,将发布者发出的消息返还给了发布者,查看协议的 Arguments 参数部分可以看到,Reply-Text 字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;

    那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为 channel 信道设置 ReturnListener 监听器来实现,具体实现代码见下:

[java] view plain copy

  1. public class ProducerTest {

  2. public static void main(String[] args) {
  3. String exchangeName = "confirmExchange";
  4. String queueName = "confirmQueue";
  5. String routingKey = "confirmRoutingKey";
  6. String bindingKey = "confirmBindingKey";
  7. int count = 3;
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("172.16.151.74");
  10. factory.setUsername("test");
  11. factory.setPassword("test");
  12. factory.setPort(5672);
  13. //创建生产者
  14. Sender producer = new Sender(factory, count, exchangeName, routingKey);
  15. producer.run();
  16. }
  17. }

  18. class Sender

  19. {

  20. private ConnectionFactory factory;
  21. private int count;
  22. private String exchangeName;
  23. private String routingKey;
  24. public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
  25. this.factory = factory;
  26. this.count = count;
  27. this.exchangeName = exchangeName;
  28. this.routingKey = routingKey;
  29. }
  30. public void run() {
  31. try {
  32. Connection connection = factory.newConnection();
  33. Channel channel = connection.createChannel();
  34. //创建exchange
  35. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  36. //发送持久化消息
  37. for(int i = 0;i < count;i++)
  38. {
  39. //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
  40. //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
  41. //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
  42. channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
  43. }
  44. channel.addReturnListener(new ReturnListener() {
  45. @Override
  46. public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
  47. throws IOException {
  48. //此处便是执行Basic.Return之后回调的地方
  49. String message = new String(arg5);
  50. System.out.println("Basic.Return返回的结果: "+message);
  51. }
  52. });
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }

    在设置了 ReturnListener 监听器之后,broker(代理服务器)发出 basic.return 方法之后,就会回调第 52 行的 handleReturn 方法,在这个方法里面我们就可以进行消息的重新发布操作啦;

    测试 2:设置 mandatory 标志,且为 exchange 绑定队列(路由键和绑定键一致)

[java] view plain copy

  1. public class ProducerTest {

  2. public static void main(String[] args) {
  3. String exchangeName = "confirmExchange";
  4. String queueName = "confirmQueue";
  5. String routingKey = "confirmRoutingKey";
  6. String bindingKey = "confirmRoutingKey";
  7. //String bindingKey = "confirmBindingKey";
  8. int count = 3;
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("172.16.151.74");
  11. factory.setUsername("test");
  12. factory.setPassword("test");
  13. factory.setPort(5672);
  14. //创建生产者
  15. Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
  16. producer.run();
  17. }
  18. }

  19. class Sender

  20. {

  21. private ConnectionFactory factory;
  22. private int count;
  23. private String exchangeName;
  24. private String queueName;
  25. private String routingKey;
  26. private String bindingKey;
  27. public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
  28. this.factory = factory;
  29. this.count = count;
  30. this.exchangeName = exchangeName;
  31. this.queueName = queueName;
  32. this.routingKey = routingKey;
  33. this.bindingKey = bindingKey;
  34. }
  35. public void run() {
  36. try {
  37. Connection connection = factory.newConnection();
  38. Channel channel = connection.createChannel();
  39. //创建exchange
  40. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  41. //创建队列
  42. channel.queueDeclare(queueName, true, false, false, null);
  43. //绑定exchange和queue
  44. channel.queueBind(queueName, exchangeName, bindingKey);
  45. //发送持久化消息
  46. for(int i = 0;i < count;i++)
  47. {
  48. //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
  49. //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
  50. //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
  51. channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
  52. }
  53. channel.addReturnListener(new ReturnListener() {
  54. @Override
  55. public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
  56. throws IOException {
  57. //此处便是执行Basic.Return之后回调的地方
  58. String message = new String(arg5);
  59. System.out.println("Basic.Return返回的结果: "+message);
  60. }
  61. });
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }

    通过抓包发现并不会有 basic.return 方法被调用,查看 RabbitMQ 管理界面发现消息已经到达了队列;

[java] view plain copy

测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致) 代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:

注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了; 到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列; 参考资料: [RabbitMQ(二) AMQP协议mandatory和immediate标志位区别](http://blog.csdn.net/jiao_fuyou/article/details/21594947) [RabbitMQ之mandatory](https://my.oschina.net/moooofly/blog/147634)
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 352 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • NetBeans

    NetBeans 是一个始于 1997 年的 Xelfi 计划,本身是捷克布拉格查理大学的数学及物理学院的学生计划。此计划延伸而成立了一家公司进而发展这个商用版本的 NetBeans IDE,直到 1999 年 Sun 买下此公司。Sun 于次年(2000 年)六月将 NetBeans IDE 开源,直到现在 NetBeans 的社群依然持续增长。

    78 引用 • 102 回帖 • 701 关注
  • FreeMarker

    FreeMarker 是一款好用且功能强大的 Java 模版引擎。

    23 引用 • 20 回帖 • 458 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 45 关注
  • sts
    2 引用 • 2 回帖 • 224 关注
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 567 关注
  • Eclipse

    Eclipse 是一个开放源代码的、基于 Java 的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。

    76 引用 • 258 回帖 • 631 关注
  • jsDelivr

    jsDelivr 是一个开源的 CDN 服务,可为 npm 包、GitHub 仓库提供免费、快速并且可靠的全球 CDN 加速服务。

    5 引用 • 31 回帖 • 94 关注
  • 脑图

    脑图又叫思维导图,是表达发散性思维的有效图形思维工具 ,它简单却又很有效,是一种实用性的思维工具。

    31 引用 • 96 回帖
  • Solo

    Solo 是一款小而美的开源博客系统,专为程序员设计。Solo 有着非常活跃的社区,可将文章作为帖子推送到社区,来自社区的回帖将作为博客评论进行联动(具体细节请浏览 B3log 构思 - 分布式社区网络)。

    这是一种全新的网络社区体验,让热爱记录和分享的你不再感到孤单!

    1440 引用 • 10067 回帖 • 489 关注
  • Hibernate

    Hibernate 是一个开放源代码的对象关系映射框架,它对 JDBC 进行了非常轻量级的对象封装,使得 Java 程序员可以随心所欲的使用对象编程思维来操纵数据库。

    39 引用 • 103 回帖 • 718 关注
  • MongoDB

    MongoDB(来自于英文单词“Humongous”,中文含义为“庞大”)是一个基于分布式文件存储的数据库,由 C++ 语言编写。旨在为应用提供可扩展的高性能数据存储解决方案。MongoDB 是一个介于关系数据库和非关系数据库之间的产品,是非关系数据库当中功能最丰富,最像关系数据库的。它支持的数据结构非常松散,是类似 JSON 的 BSON 格式,因此可以存储比较复杂的数据类型。

    90 引用 • 59 回帖 • 6 关注
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    286 引用 • 248 回帖 • 14 关注
  • GitHub

    GitHub 于 2008 年上线,目前,除了 Git 代码仓库托管及基本的 Web 管理界面以外,还提供了订阅、讨论组、文本渲染、在线文件编辑器、协作图谱(报表)、代码片段分享(Gist)等功能。正因为这些功能所提供的便利,又经过长期的积累,GitHub 的用户活跃度很高,在开源世界里享有深远的声望,并形成了社交化编程文化(Social Coding)。

    210 引用 • 2040 回帖
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖
  • flomo

    flomo 是新一代 「卡片笔记」 ,专注在碎片化时代,促进你的记录,帮你积累更多知识资产。

    6 引用 • 140 回帖
  • JSON

    JSON (JavaScript Object Notation)是一种轻量级的数据交换格式。易于人类阅读和编写。同时也易于机器解析和生成。

    52 引用 • 190 回帖
  • 区块链

    区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。所谓共识机制是区块链系统中实现不同节点之间建立信任、获取权益的数学算法 。

    92 引用 • 752 回帖
  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 679 关注
  • OnlyOffice
    4 引用 • 22 关注
  • 微软

    微软是一家美国跨国科技公司,也是世界 PC 软件开发的先导,由比尔·盖茨与保罗·艾伦创办于 1975 年,公司总部设立在华盛顿州的雷德蒙德(Redmond,邻近西雅图)。以研发、制造、授权和提供广泛的电脑软件服务业务为主。

    8 引用 • 44 回帖 • 1 关注
  • Tomcat

    Tomcat 最早是由 Sun Microsystems 开发的一个 Servlet 容器,在 1999 年被捐献给 ASF(Apache Software Foundation),隶属于 Jakarta 项目,现在已经独立为一个顶级项目。Tomcat 主要实现了 JavaEE 中的 Servlet、JSP 规范,同时也提供 HTTP 服务,是市场上非常流行的 Java Web 容器。

    162 引用 • 529 回帖
  • H2

    H2 是一个开源的嵌入式数据库引擎,采用 Java 语言编写,不受平台的限制,同时 H2 提供了一个十分方便的 web 控制台用于操作和管理数据库内容。H2 还提供兼容模式,可以兼容一些主流的数据库,因此采用 H2 作为开发期的数据库非常方便。

    11 引用 • 54 回帖 • 666 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 609 关注
  • 博客

    记录并分享人生的经历。

    273 引用 • 2388 回帖
  • Postman

    Postman 是一款简单好用的 HTTP API 调试工具。

    4 引用 • 3 回帖 • 1 关注
  • API

    应用程序编程接口(Application Programming Interface)是一些预先定义的函数,目的是提供应用程序与开发人员基于某软件或硬件得以访问一组例程的能力,而又无需访问源码,或理解内部工作机制的细节。

    79 引用 • 431 回帖
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 486 关注