深入学习 RabbitMQ(一):mandatory 标志的作用

本贴最后更新于 2660 天前,其中的信息可能已经事过境迁

【转】http://blog.csdn.net/hzw19920329/article/details/54311277

在生产者通过 channel 的 basicPublish 方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看 Channel 接口,会发现存在 3 个重载的 basicPublish 方法

[java] view plain copy

  1. void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
  2. void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
  3. throws IOException;
  4. void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
  5. throws IOException; 他们共有的参数分别是: exchange:交换机名称 routingKey:路由键 props:消息属性字段,比如消息头部信息等等 body:消息主体部分 除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志 mandatory的作用: 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者; 下面我们通过几个实例测试下mandatory标志的作用: 测试1:设置mandatory标志,且exchange未绑定队列

[java] view plain copy

  1. public class ProducerTest {

  2. public static void main(String[] args) {
  3. String exchangeName = "confirmExchange";
  4. String queueName = "confirmQueue";
  5. String routingKey = "confirmRoutingKey";
  6. String bindingKey = "confirmBindingKey";
  7. int count = 3;
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("172.16.151.74");
  10. factory.setUsername("test");
  11. factory.setPassword("test");
  12. factory.setPort(5672);
  13. //创建生产者
  14. Sender producer = new Sender(factory, count, exchangeName, routingKey);
  15. producer.run();
  16. }
  17. }

  18. class Sender

  19. {

  20. private ConnectionFactory factory;
  21. private int count;
  22. private String exchangeName;
  23. private String routingKey;
  24. public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
  25. this.factory = factory;
  26. this.count = count;
  27. this.exchangeName = exchangeName;
  28. this.routingKey = routingKey;
  29. }
  30. public void run() {
  31. try {
  32. Connection connection = factory.newConnection();
  33. Channel channel = connection.createChannel();
  34. //创建exchange
  35. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  36. //发送持久化消息
  37. for(int i = 0;i < count;i++)
  38. {
  39. //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
  40. channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
  41. }
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. }

    第 45 行我们将 basicPublish 的第三个参数 mandatory 设置成了 true,表示开启了 mandatory 标志,但我们没有为当前 exchange 绑定任何队列;

    通过 wireshark 抓包看到下面输出:

    可以看到最后执行了 basic.return 方法,将发布者发出的消息返还给了发布者,查看协议的 Arguments 参数部分可以看到,Reply-Text 字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;

    那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为 channel 信道设置 ReturnListener 监听器来实现,具体实现代码见下:

[java] view plain copy

  1. public class ProducerTest {

  2. public static void main(String[] args) {
  3. String exchangeName = "confirmExchange";
  4. String queueName = "confirmQueue";
  5. String routingKey = "confirmRoutingKey";
  6. String bindingKey = "confirmBindingKey";
  7. int count = 3;
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("172.16.151.74");
  10. factory.setUsername("test");
  11. factory.setPassword("test");
  12. factory.setPort(5672);
  13. //创建生产者
  14. Sender producer = new Sender(factory, count, exchangeName, routingKey);
  15. producer.run();
  16. }
  17. }

  18. class Sender

  19. {

  20. private ConnectionFactory factory;
  21. private int count;
  22. private String exchangeName;
  23. private String routingKey;
  24. public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
  25. this.factory = factory;
  26. this.count = count;
  27. this.exchangeName = exchangeName;
  28. this.routingKey = routingKey;
  29. }
  30. public void run() {
  31. try {
  32. Connection connection = factory.newConnection();
  33. Channel channel = connection.createChannel();
  34. //创建exchange
  35. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  36. //发送持久化消息
  37. for(int i = 0;i < count;i++)
  38. {
  39. //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
  40. //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
  41. //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
  42. channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
  43. }
  44. channel.addReturnListener(new ReturnListener() {
  45. @Override
  46. public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
  47. throws IOException {
  48. //此处便是执行Basic.Return之后回调的地方
  49. String message = new String(arg5);
  50. System.out.println("Basic.Return返回的结果: "+message);
  51. }
  52. });
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }

    在设置了 ReturnListener 监听器之后,broker(代理服务器)发出 basic.return 方法之后,就会回调第 52 行的 handleReturn 方法,在这个方法里面我们就可以进行消息的重新发布操作啦;

    测试 2:设置 mandatory 标志,且为 exchange 绑定队列(路由键和绑定键一致)

[java] view plain copy

  1. public class ProducerTest {

  2. public static void main(String[] args) {
  3. String exchangeName = "confirmExchange";
  4. String queueName = "confirmQueue";
  5. String routingKey = "confirmRoutingKey";
  6. String bindingKey = "confirmRoutingKey";
  7. //String bindingKey = "confirmBindingKey";
  8. int count = 3;
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("172.16.151.74");
  11. factory.setUsername("test");
  12. factory.setPassword("test");
  13. factory.setPort(5672);
  14. //创建生产者
  15. Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
  16. producer.run();
  17. }
  18. }

  19. class Sender

  20. {

  21. private ConnectionFactory factory;
  22. private int count;
  23. private String exchangeName;
  24. private String queueName;
  25. private String routingKey;
  26. private String bindingKey;
  27. public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
  28. this.factory = factory;
  29. this.count = count;
  30. this.exchangeName = exchangeName;
  31. this.queueName = queueName;
  32. this.routingKey = routingKey;
  33. this.bindingKey = bindingKey;
  34. }
  35. public void run() {
  36. try {
  37. Connection connection = factory.newConnection();
  38. Channel channel = connection.createChannel();
  39. //创建exchange
  40. channel.exchangeDeclare(exchangeName, "direct", true, false, null);
  41. //创建队列
  42. channel.queueDeclare(queueName, true, false, false, null);
  43. //绑定exchange和queue
  44. channel.queueBind(queueName, exchangeName, bindingKey);
  45. //发送持久化消息
  46. for(int i = 0;i < count;i++)
  47. {
  48. //第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
  49. //因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
  50. //我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
  51. channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
  52. }
  53. channel.addReturnListener(new ReturnListener() {
  54. @Override
  55. public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
  56. throws IOException {
  57. //此处便是执行Basic.Return之后回调的地方
  58. String message = new String(arg5);
  59. System.out.println("Basic.Return返回的结果: "+message);
  60. }
  61. });
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. }

    通过抓包发现并不会有 basic.return 方法被调用,查看 RabbitMQ 管理界面发现消息已经到达了队列;

[java] view plain copy

测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致) 代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:

注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了; 到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列; 参考资料: [RabbitMQ(二) AMQP协议mandatory和immediate标志位区别](http://blog.csdn.net/jiao_fuyou/article/details/21594947) [RabbitMQ之mandatory](https://my.oschina.net/moooofly/blog/147634)
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 345 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • uTools

    uTools 是一个极简、插件化、跨平台的现代桌面软件。通过自由选配丰富的插件,打造你得心应手的工具集合。

    7 引用 • 27 回帖 • 1 关注
  • Dubbo

    Dubbo 是一个分布式服务框架,致力于提供高性能和透明化的 RPC 远程服务调用方案,是 [阿里巴巴] SOA 服务化治理方案的核心框架,每天为 2,000+ 个服务提供 3,000,000,000+ 次访问量支持,并被广泛应用于阿里巴巴集团的各成员站点。

    60 引用 • 82 回帖 • 613 关注
  • CloudFoundry

    Cloud Foundry 是 VMware 推出的业界第一个开源 PaaS 云平台,它支持多种框架、语言、运行时环境、云平台及应用服务,使开发人员能够在几秒钟内进行应用程序的部署和扩展,无需担心任何基础架构的问题。

    5 引用 • 18 回帖 • 181 关注
  • 安装

    你若安好,便是晴天。

    132 引用 • 1184 回帖
  • 工具

    子曰:“工欲善其事,必先利其器。”

    298 引用 • 763 回帖
  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 345 关注
  • ZeroNet

    ZeroNet 是一个基于比特币加密技术和 BT 网络技术的去中心化的、开放开源的网络和交流系统。

    1 引用 • 21 回帖 • 647 关注
  • 以太坊

    以太坊(Ethereum)并不是一个机构,而是一款能够在区块链上实现智能合约、开源的底层系统。以太坊是一个平台和一种编程语言 Solidity,使开发人员能够建立和发布下一代去中心化应用。 以太坊可以用来编程、分散、担保和交易任何事物:投票、域名、金融交易所、众筹、公司管理、合同和知识产权等等。

    34 引用 • 367 回帖
  • Electron

    Electron 基于 Chromium 和 Node.js,让你可以使用 HTML、CSS 和 JavaScript 构建应用。它是一个由 GitHub 及众多贡献者组成的活跃社区共同维护的开源项目,兼容 Mac、Windows 和 Linux,它构建的应用可在这三个操作系统上面运行。

    15 引用 • 136 回帖 • 4 关注
  • Flutter

    Flutter 是谷歌的移动 UI 框架,可以快速在 iOS 和 Android 上构建高质量的原生用户界面。 Flutter 可以与现有的代码一起工作,它正在被越来越多的开发者和组织使用,并且 Flutter 是完全免费、开源的。

    39 引用 • 92 回帖 • 3 关注
  • SpaceVim

    SpaceVim 是一个社区驱动的模块化 vim/neovim 配置集合,以模块的方式组织管理插件以
    及相关配置,为不同的语言开发量身定制了相关的开发模块,该模块提供代码自动补全,
    语法检查、格式化、调试、REPL 等特性。用户仅需载入相关语言的模块即可得到一个开箱
    即用的 Vim-IDE。

    3 引用 • 31 回帖 • 113 关注
  • 服务

    提供一个服务绝不仅仅是简单的把硬件和软件累加在一起,它包括了服务的可靠性、服务的标准化、以及对服务的监控、维护、技术支持等。

    41 引用 • 24 回帖 • 1 关注
  • 音乐

    你听到信仰的声音了么?

    62 引用 • 512 回帖
  • C++

    C++ 是在 C 语言的基础上开发的一种通用编程语言,应用广泛。C++ 支持多种编程范式,面向对象编程、泛型编程和过程化编程。

    107 引用 • 153 回帖
  • 游戏

    沉迷游戏伤身,强撸灰飞烟灭。

    181 引用 • 821 回帖
  • WebClipper

    Web Clipper 是一款浏览器剪藏扩展,它可以帮助你把网页内容剪藏到本地。

    3 引用 • 9 回帖
  • JWT

    JWT(JSON Web Token)是一种用于双方之间传递信息的简洁的、安全的表述性声明规范。JWT 作为一个开放的标准(RFC 7519),定义了一种简洁的,自包含的方法用于通信双方之间以 JSON 的形式安全的传递信息。

    20 引用 • 15 回帖 • 22 关注
  • 禅道

    禅道是一款国产的开源项目管理软件,她的核心管理思想基于敏捷方法 scrum,内置了产品管理和项目管理,同时又根据国内研发现状补充了测试管理、计划管理、发布管理、文档管理、事务管理等功能,在一个软件中就可以将软件研发中的需求、任务、bug、用例、计划、发布等要素有序的跟踪管理起来,完整地覆盖了项目管理的核心流程。

    6 引用 • 15 回帖 • 12 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖
  • JetBrains

    JetBrains 是一家捷克的软件开发公司,该公司位于捷克的布拉格,并在俄国的圣彼得堡及美国麻州波士顿都设有办公室,该公司最为人所熟知的产品是 Java 编程语言开发撰写时所用的集成开发环境:IntelliJ IDEA

    18 引用 • 54 回帖 • 1 关注
  • Logseq

    Logseq 是一个隐私优先、开源的知识库工具。

    Logseq is a joyful, open-source outliner that works on top of local plain-text Markdown and Org-mode files. Use it to write, organize and share your thoughts, keep your to-do list, and build your own digital garden.

    7 引用 • 69 回帖 • 1 关注
  • VirtualBox

    VirtualBox 是一款开源虚拟机软件,最早由德国 Innotek 公司开发,由 Sun Microsystems 公司出品的软件,使用 Qt 编写,在 Sun 被 Oracle 收购后正式更名成 Oracle VM VirtualBox。

    10 引用 • 2 回帖 • 18 关注
  • 倾城之链
    23 引用 • 66 回帖 • 168 关注
  • 导航

    各种网址链接、内容导航。

    44 引用 • 177 回帖
  • V2EX

    V2EX 是创意工作者们的社区。这里目前汇聚了超过 400,000 名主要来自互联网行业、游戏行业和媒体行业的创意工作者。V2EX 希望能够成为创意工作者们的生活和事业的一部分。

    16 引用 • 236 回帖 • 262 关注
  • Sillot

    Insights(注意当前设置 master 为默认分支)

    汐洛彖夲肜矩阵(Sillot T☳Converbenk Matrix),致力于服务智慧新彖乄,具有彖乄驱动、极致优雅、开发者友好的特点。其中汐洛绞架(Sillot-Gibbet)基于自思源笔记(siyuan-note),前身是思源笔记汐洛版(更早是思源笔记汐洛分支),是智慧新录乄终端(多端融合,移动端优先)。

    主仓库地址:Hi-Windom/Sillot

    文档地址:sillot.db.sc.cn

    注意事项:

    1. ⚠️ 汐洛仍在早期开发阶段,尚不稳定
    2. ⚠️ 汐洛并非面向普通用户设计,使用前请了解风险
    3. ⚠️ 汐洛绞架基于思源笔记,开发者尽最大努力与思源笔记保持兼容,但无法实现 100% 兼容
    29 引用 • 25 回帖 • 116 关注
  • QQ

    1999 年 2 月腾讯正式推出“腾讯 QQ”,在线用户由 1999 年的 2 人(马化腾和张志东)到现在已经发展到上亿用户了,在线人数超过一亿,是目前使用最广泛的聊天软件之一。

    45 引用 • 557 回帖 • 1 关注