RabbitMQ是什么?
RabbitMQ,遵循AMQP协议,由内在高并发的erlanng语言开发,用在实时的对可靠性要求比较高的消息传递上。
学过websocket的来理解rabbitMQ应该是非常简单的了,websocket是基于服务器和页面之间的通信协议,一次握手,多次通信。 而rabbitMQ就像是服务器之间的socket,一个服务器连上MQ监听,而另一个服务器只要通过MQ发送消息就能被监听服务器所接收。
但是MQ和socket还是有区别的,socket相当于是页面直接监听服务器。而MQ就是服务器之间的中转站,例如邮箱,一个人投递信件给邮箱,另一个人去邮箱取,他们中间没有直接的关系,所以耦合度相比socket小了很多。
RabbitMQ的安装
下载:官网地址
- macOS下安装
macOS下安装我采用home brew进行安装,如果没有安装home brew,则打开终端,输入
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
安装完成之后,依次运行下列命令
brew update
brew install rabbitmq
这种方式安装的话,rabbitmq会被安装到/usr/local/sbin
目录下,使用echo $PATH
命令查看安装目录是否在PATH中。如果不在,那么修改用户家目录下的.bash_profile文件将安装目录添加进PATH,保存后别忘了source一下。
- Windows下安装
Windows下安装需要安装Erlang,下载地址
官网下载rabbitmq安装包,下载地址
首先安装otp文件,再安装rabbitmq-server,安装过程全部默认下一步就好。
安装完成之后可以在开始菜单找到Rabbitmq文件夹。
此时打开浏览器输入localhost:15672
会发现并不能访问
这是因为RabbitMQ安装后默认是不启动管理模块的,所以需要配置将管理模块启动
启动管理模块,首先关闭服务,输入如下命令
rabbitmqctl start_app
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl stop
删除下图中的两个文件夹(一定要在服务是停止的状态下)
重启之后发现能正常访问
启动RabbitMQ服务
rabbitmq-server
启动后可在浏览器输入localhost:15672
进入管理界面,默认超级用户管理员账号密码均为guest
创建User、Vhost
在这里可以新建User、Vhost等。
新建一个user_test,密码设为123456,Tags选择Admin
接着新建一个Vhost,名字为/vhost_test
点击进入vhost_test详情页
设置权限给user_test
到此大致设置完成,接下来开始撸代码了~
RabbitMQ基础(Java实现)
1. “Hello World!” - 最简单的模型
生产者发出消息到队列,消费者从队列中取信息。
代码如下:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zcx</groupId>
<artifactId>myrabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
</dependency>
</dependencies>
</project>
ConnectionUtils.java
package com.zcx.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 获取MQ连接工具类
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 地址
factory.setHost("127.0.0.1");
// 连接端口
factory.setPort(5672);
// 连接的Virtual Hosts
factory.setVirtualHost("/vhost_test");
factory.setUsername("user_test");
factory.setPassword("123456");
return factory.newConnection();
}
}
Producer.java
package com.zcx.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zcx.rabbitmq.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者类,将消息发送到队列
*/
public class Producer {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//获得连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//被发送的信息
String msg = "hello rabbitmq~600267";
//信息发送
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("---send msg:" + msg);
//关闭资源
channel.close();
connection.close();
}
}
Consumer.java
package com.zcx.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zcx.rabbitmq.utils.ConnectionUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者类,从队列中取信息
*/
public class Consumer {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("---receive msg:" + msgString);
}
};
// 监听
channel.basicConsume(QUEUE_NAME,true,consumer);
}
/**
* 老式方法,已淘汰
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
public static void oldapi() throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,consumer);
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());
System.out.println("---receive msg:"+msgString);
}
}
}
先运行Producer.java发送信息到队列,再运行Consumer.java,发现能够成功取到信息。
2. Work Queues 之轮询分发(Round-robin dispatching)
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而且必须等待完成。相反,我们安排稍后去完成任务。
Producer.java
/**
* 生产者类,将消息发送到队列
*/
public class Producer {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获得连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 20; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("---send msg:" + msg);
// 消息发送速度太快了,得让它睡一会
Thread.sleep(500);
}
//关闭资源
channel.close();
connection.close();
}
}
发送20条消息到消息队列test_work_queue中
Consumer1.java
/**
* 消费者类,从队列中取信息
*/
public class Consumer1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer1] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer1] --- done");
}
}
};
// 监听
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Consumer2.java 的代码在1的基础上,将两个打印输出改一下,再将睡眠时间改为1000就行了。
先运行两个消费者,之后运行生产者。我们可以发现,虽然两个消费者因为睡眠时间不同而整体运行的速度不同,可是他们运行的任务总量是一样的。
造成这样的结果是因为轮询分发(Round-robin dispatching),这种分发模式就是不论谁忙谁不忙,任务我就轮着循环发给你们,能者多劳?呵,不存在的!
3.Work Queues 之公平分发(Fair dispatch)
从之前的实验我们可以看出来在有两名消费速度不一样的消费者的情况下,一名工作人员会一直很忙,另一名工作人员完成任务之后就闲置了
发生这种情况是因为RabbitMQ只在消息进入队列时调度消息。它没有考虑消费者未确认消息的数量。它只是盲目地将第n条消息分发给第n个消费者。
为了解决这个问题,我们可以使用basicQos
方法和 prefetch_count = 1
设置。这告诉RabbitMQ一次不要向工作人员(消费者)发送多个消息。或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。相反,它会将其分派给不是仍然忙碌的下一个工作人员。
我们之前的代码可以更改如下:
Producer.java
package com.zcx.rabbitmq.workfair;
/**
* 生产者类,将消息发送到队列
*/
public class Producer {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获得连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//队列给消费者发送一条消息后,等到消费者返回确认信息后才会继续发送下一条消息
int prefetch_count = 1;
channel.basicQos(prefetch_count );
for (int i = 0; i < 20; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("---send msg:" + msg);
// 消息发送速度太快了,得让它睡一会
Thread.sleep(500);
}
//关闭资源
channel.close();
connection.close();
}
}
在消费者中关闭自动应答
Consumer1.java
package com.zcx.rabbitmq.workfair;
/**
* 消费者类,从队列中取信息
*/
public class Consumer1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//保证一次只分发一个
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer1] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer1] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;//关闭自动应答
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
Consumer2.java 代码略
运行Consumer1(睡两秒)、Consumer2(睡一秒),再运行Producer,结果如下:
可以发现Consumer2处理的任务多于Consumer1,能者多劳!
4.消息确认与消息持久化
消息确认(Message acknowledgment)
在之前的第一个例子中不知道大家想过没有,如果其中一个消费者开始一项长期任务并且只是部分完成而死亡会发生什么。用我们目前的代码,一旦RabbitMQ将消息传递给客户,它立即将其标记为删除。在这种情况下,如果你干掉一个消费者,我们将失去刚刚处理的信息。我们也会失去所有派发给这个特定消费者但尚未处理的消息。
但我们不想失去任何任务。如果一名消费者死亡,我们希望将任务交付给另一名消费者。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回ack(请求)告诉RabbitMQ已经收到,处理了特定的消息,并且RabbitMQ可以自由删除它。
如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ将理解消息未被完全处理,并将重新排队。如果有其他消费者同时在线,它会迅速将其重新发送给另一位消费者。这样,即使消费者偶尔死亡,也可以确保没有任何信息丢失。
boolean autoAck = false;//关闭自动应答
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
消息持久化(Message durability)
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ这个爸爸挂了,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非我们告诉它不要。需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。
Boolean durable = true;
//声明队列
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
虽然这个命令本身是正确的,但直接在我们之前的代码上作更改的话是不可行的。那是因为我们已经定义了一个名为test_work_queue
的队列 。RabbitMQ不允许使用不同的参数重新定义现有的队列,并会向任何试图执行该操作的程序返回错误。我们可以尝试定义一个新的队列,或者localhost:15672
删除test_work_queue
5.订阅模式(Publish/Subscribe)
从图中我们可以看出:
- 一个生产者,多个消费者
- 每一个消费者都有自己的队列
- 生产者不会直接将消息发送到队列,而是发到交换机(exchange)上
- 每个队列都要绑定到交换机上
由此可以做到一条消息被多个消费者消费,应用场景比如注册之后,同时有邮件、手机短信通知
废话不多说,上代码
Producer.java
package com.zcx.rabbitmq.ps;
public class Producer {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获得连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发
//发送消息
String msg = "hello ps";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("---send msg:" + msg);
//关闭资源
channel.close();
connection.close();
}
}
此时能够运行,可是消息会丢失,因为没有绑定队列去存储它,交换机本身不具备存储功能。
Consumer1.java
package com.zcx.rabbitmq.ps;
public class Consumer1 {
private static final String QUEUE_NAME = "test_queue_fanout_email";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer1] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer1] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;//自动应答
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
Consumer2.java
package com.zcx.rabbitmq.ps;
/**
* 消费者类,从队列中取信息
*/
public class Consumer2 {
private static final String QUEUE_NAME = "test_queue_fanout_sms";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer2] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer2] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;//自动应答
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
运行后发现两个消费者都能收到消息
7.交换机(Exchange)
交换机一方面是接收生产者的消息,另一方面是向队列推送消息。这里暂时先只放两种
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//分发
交换机在声明的时候可以指定其功能
Fanout(不处理路由键)
Direct(处理路由键)
Topic
# ——匹配一个或多个
* ——匹配一个
8.路由模式(Routing)
路由模式下我们需要设置routingKey,然后交换机根据routingKey去分发。
比如上图中,如果生产者发出的消息routingKey为error,那么交换机会将这条消息分别分发给C1、C2;而如果routingKey为info或者warning,那么消息只会被分发到C2。
Producer.java
package com.zcx.rabbitmq.routing;
public class Producer {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获得连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息
String msg = "hello direct~~";
String routingKey = "info";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("---send msg:" + msg);
//关闭资源
channel.close();
connection.close();
}
}
Consumer1.java
package com.zcx.rabbitmq.routing;
/**
* 消费者类,从队列中取信息
*/
public class Consumer1 {
private static final String QUEUE_NAME = "test_queue_direct_one";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//绑定队列到交换机
String routingKey = "error";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer1] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer1] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
Consumer2.java
package com.zcx.rabbitmq.routing;
/**
* 消费者类,从队列中取信息
*/
public class Consumer2 {
private static final String QUEUE_NAME = "test_queue_direct_two";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer2] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer2] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
9.主题模式(Topics)
直接上代码
Producer.java
package com.zcx.rabbitmq.topics;
public class Producer {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获得连接
Connection connection = ConnectionUtils.getConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//发送消息
String routingKey = "lazy.add.test";
String msg = "hello topic~~,I like "+routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("---send msg:" + msg);
//关闭资源
channel.close();
connection.close();
}
}
Consumer1.java
package com.zcx.rabbitmq.ps;
public class Consumer1 {
private static final String QUEUE_NAME = "test_queue_topic_one";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer1] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer1] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;//自动应答
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
Consumer2.java
package com.zcx.rabbitmq.ps;
public class Consumer2 {
private static final String QUEUE_NAME = "test_queue_topic_two";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列,由于生产者那边已经声明过,这里可以省略
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
//绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 收到消息会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("[Consumer2] --- receive msg:" + msgString);
try {
// 我可没睡觉,我是在处理任务,谁都不准说我在睡大觉!
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[Consumer2] --- done");
// 发送回执,告诉队列我干完了快夸我
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;//自动应答
// 监听
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}