【转】http://blog.csdn.net/hzw19920329/article/details/54311277
在生产者通过 channel 的 basicPublish 方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看 Channel 接口,会发现存在 3 个重载的 basicPublish 方法
[java] view plain copy
- void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
- void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
-
throws IOException;
- void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
-
throws IOException; 他们共有的参数分别是: exchange:交换机名称 routingKey:路由键 props:消息属性字段,比如消息头部信息等等 body:消息主体部分 除此之外,还有mandatory和immediate这两个参数,鉴于RabbitMQ3.0不再支持immediate标志,因此我们重点讨论mandatory标志 mandatory的作用: 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者; 下面我们通过几个实例测试下mandatory标志的作用: 测试1:设置mandatory标志,且exchange未绑定队列
[java] view plain copy
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmBindingKey";
-
int count = 3;
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
//创建生产者
-
Sender producer = new Sender(factory, count, exchangeName, routingKey);
-
producer.run();
-
}
-
}
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String routingKey;
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.routingKey = routingKey;
-
}
-
public void run() {
-
try {
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
//创建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//发送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
-
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
-
}
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
第 45 行我们将 basicPublish 的第三个参数 mandatory 设置成了 true,表示开启了 mandatory 标志,但我们没有为当前 exchange 绑定任何队列;
通过 wireshark 抓包看到下面输出:
可以看到最后执行了 basic.return 方法,将发布者发出的消息返还给了发布者,查看协议的 Arguments 参数部分可以看到,Reply-Text 字段值为:NO_ROUTE,表示消息并没有路由到合适的队列中;
那么我们该怎么获取到没有被正确路由到合适队列的消息呢?这时候可以通过为 channel 信道设置 ReturnListener 监听器来实现,具体实现代码见下:
[java] view plain copy
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmBindingKey";
-
int count = 3;
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
//创建生产者
-
Sender producer = new Sender(factory, count, exchangeName, routingKey);
-
producer.run();
-
}
-
}
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String routingKey;
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String routingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.routingKey = routingKey;
-
}
-
public void run() {
-
try {
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
//创建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//发送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
-
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
-
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
-
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
-
}
-
channel.addReturnListener(new ReturnListener() {
-
@Override
-
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
-
throws IOException {
-
//此处便是执行Basic.Return之后回调的地方
-
String message = new String(arg5);
-
System.out.println("Basic.Return返回的结果: "+message);
-
}
-
});
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
在设置了 ReturnListener 监听器之后,broker(代理服务器)发出 basic.return 方法之后,就会回调第 52 行的 handleReturn 方法,在这个方法里面我们就可以进行消息的重新发布操作啦;
测试 2:设置 mandatory 标志,且为 exchange 绑定队列(路由键和绑定键一致)
[java] view plain copy
-
public class ProducerTest {
-
public static void main(String[] args) {
-
String exchangeName = "confirmExchange";
-
String queueName = "confirmQueue";
-
String routingKey = "confirmRoutingKey";
-
String bindingKey = "confirmRoutingKey";
-
//String bindingKey = "confirmBindingKey";
-
int count = 3;
-
ConnectionFactory factory = new ConnectionFactory();
-
factory.setHost("172.16.151.74");
-
factory.setUsername("test");
-
factory.setPassword("test");
-
factory.setPort(5672);
-
//创建生产者
-
Sender producer = new Sender(factory, count, exchangeName, queueName,routingKey,bindingKey);
-
producer.run();
-
}
-
}
-
class Sender
-
{
-
private ConnectionFactory factory;
-
private int count;
-
private String exchangeName;
-
private String queueName;
-
private String routingKey;
-
private String bindingKey;
-
public Sender(ConnectionFactory factory,int count,String exchangeName,String queueName,String routingKey,String bindingKey) {
-
this.factory = factory;
-
this.count = count;
-
this.exchangeName = exchangeName;
-
this.queueName = queueName;
-
this.routingKey = routingKey;
-
this.bindingKey = bindingKey;
-
}
-
public void run() {
-
try {
-
Connection connection = factory.newConnection();
-
Channel channel = connection.createChannel();
-
//创建exchange
-
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
-
//创建队列
-
channel.queueDeclare(queueName, true, false, false, null);
-
//绑定exchange和queue
-
channel.queueBind(queueName, exchangeName, bindingKey);
-
//发送持久化消息
-
for(int i = 0;i < count;i++)
-
{
-
//第一个参数是exchangeName(默认情况下代理服务器端是存在一个""名字的exchange的,
-
//因此如果不创建exchange的话我们可以直接将该参数设置成"",如果创建了exchange的话
-
//我们需要将该参数设置成创建的exchange的名字),第二个参数是路由键
-
channel.basicPublish(exchangeName, routingKey, true, MessageProperties.PERSISTENT_BASIC, ("第"+(i+1)+"条消息").getBytes());
-
}
-
channel.addReturnListener(new ReturnListener() {
-
@Override
-
public void handleReturn(int arg0, String arg1, String arg2, String arg3, BasicProperties arg4, byte[] arg5)
-
throws IOException {
-
//此处便是执行Basic.Return之后回调的地方
-
String message = new String(arg5);
-
System.out.println("Basic.Return返回的结果: "+message);
-
}
-
});
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
}
通过抓包发现并不会有 basic.return 方法被调用,查看 RabbitMQ 管理界面发现消息已经到达了队列;
[java] view plain copy
测试3:设置mandatory标志,且exchange绑定队列(路由键和绑定键不一致)
代码就是把测试2中第6行注释,第7行注释打开,注意到此时的routingKey和bindingKey是不一致的,此时我们运行程序,同时抓包得到下面截图:
注意一点,我们发送了三条消息,那么相应的应该执行三次basic.return,其中第一次和第二次basic.return显示在一行上了,第三次是单独一行,不要误认为只执行了两次,从协议的具体返回内容里我们同样看到了Reply-Text字段值是NO_ROUTE,这种现象在测试1中已经见过了;
到此,我们明白了mandatory标志的作用:在消息没有被路由到合适队列情况下会将消息返还给消息发布者,同时我们测试了哪些情况下消息不会到达合适的队列,测试1演示的是创建了exchange但是没有为他绑定队列导致的消息未到达合适队列,测试3演示的是创建了exchange同时创建了queue,但是在将两者绑定的时候,使用的bindingKey和消息发布者使用的rountingKey不一致导致的消息未到达合适队列;
参考资料:
[RabbitMQ(二) AMQP协议mandatory和immediate标志位区别](http://blog.csdn.net/jiao_fuyou/article/details/21594947)
[RabbitMQ之mandatory](https://my.oschina.net/moooofly/blog/147634)
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于