RabbitMQ- 从基础到实战(1)— Hello RabbitMQ

本贴最后更新于 2545 天前,其中的信息可能已经时移世易

1.简介

本篇博文介绍了在 windows 平台下安装 RabbitMQ Server 端,并用 JAVA 代码实现收发消息

2.安装 RabbitMQ

  1. RabbitMQ 是用 Erlang 开发的,所以需要先安装 Erlang 环境,在这里下载对应系统的 Erlang 安装包进行安装
  2. 点击这里下载对应平台的 RabbitMQ 安装包进行安装

Windows 平台安装完成后如图

3.启用 RabbitMQ Web 控制台

RabbitMQ 提供一个控制台,用于管理和监控 RabbitMQ,默认是不启动的,需要运行以下命令进行启动

  1. 点击上图的 Rabbit Command Prompt,打开 rabbitMQ 控制台

  2. 官方介绍管理控制台的页面,可以看到,输入以下命令启动后台控制插件

    rabbitmq-plugins enable rabbitmq_management

  3. 登录后台页面:http://localhost:15672/ 密码和用户名都是 guest ,界面如下

目前可以先不用理会此界面,后面使用到时会详细介绍,也可以到这里查看官方文档。

4.编写 MessageSender

Spring 对 RabbitMQ 已经进行了封装,正常使用中,会使用 Spring 集成,第一个项目中,我们先不考虑那么多

在 IDE 中新建一个 Maven 项目,并在 pom.xml 中贴入如下依赖,RabbitMQ 的最新版本依赖可以在这里找到

com.rabbitmqgroupId> amqp-clientartifactId> 4.1.0version> dependency>

等待 Maven 下载完成后,就可以在 Maven Dependencies 中看到 RabbitMQ 的 JAR

在这里,我们发现,RabbitMQ 的日志依赖了 slf4j-api 这个包,slf4j-api 并不是一个日志实现,这样子是打不出日志的,所以,我们给 pom 加上一个日志实现,这里用了 logback

ch.qos.logbackgroupId> logback-classicartifactId> 1.2.1version> dependency>

之后 maven 依赖如下,可以放心写代码了

新建一个 MessageSender 类,代码如下

1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3
4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6
7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10
11 public class MessageSender { 12
13 private Logger logger = LoggerFactory.getLogger(MessageSender.class); 14
15 //声明一个队列名字
16 private final static String QUEUE_NAME = "hello"; 17
18 public boolean sendMessage(String message){ 19 //new 一个 RabbitMQ 的连接工厂
20 ConnectionFactory factory = new ConnectionFactory(); 21 //设置需要连接的 RabbitMQ 地址,这里指向本机
22 factory.setHost("127.0.0.1"); 23 Connection connection = null; 24 Channel channel = null; 25 try { 26 //尝试获取一个连接
27 connection = factory.newConnection(); 28 //尝试创建一个 channel
29 channel = connection.createChannel(); 30 //这里的参数在后面详解
31 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 32 //注意这里调用了 getBytes(),发送的其实是 byte 数组,接收方收到消息后,需要重新组装成 String
33 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 34 logger.info("Sent '" + message + "'"); 35 //关闭 channel 和连接
36 channel.close(); 37 connection.close(); 38 } catch (IOException | TimeoutException e) { 39 //失败后记录日志,返回 false,代表发送失败
40 logger.error("send message faild!",e); 41 return false; 42 } 43 return true; 44 } 45 }

然后在 App 类的 main 方法中调用 sendMessage

1 public class App { 2 public static void main( String[] args ){ 3 MessageSender sender = new MessageSender(); 4 sender.sendMessage("hello RabbitMQ!"); 5 } 6 }

打印日志如下

打开 RabbitMQ 的控制台,可以看到消息已经进到了 RabbitMQ 中

点进去,用控制台自带的 getMessage 功能,可以看到消息已经成功由 RabbitMQ 管理了

至此,MessageSender 已经写好了,在该类的 31 和 33 行,我们分别调用了队列声明和消息发送

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

queueDeclare,有很多参数,我们可以看一下他的源码,注释上有详细的解释,我简单翻译了一下

1 /**
2 * Declare a queue 声明一个队列
3 * @see com.rabbitmq.client.AMQP.Queue.Declare 4 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk 5 * @param queue the name of the queue 队列的名字 6 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,为 true 则在 rabbitMQ 重启后生存 7 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是否是排他性队列(别人看不到),只对当前连接有效,当前连接断开后,队列删除(设置了持久化也删除) 8 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自动删除,在最后一个连接断开后删除队列 9 * @param arguments other properties (construction arguments) for the queue 其他参数 10 * @return a declaration-confirm method to indicate the queue was successfully declared 11 * @throws java.io.IOException if an error is encountered 12 */
13 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, 14 Map arguments) throws IOException;

前面 4 个都非常好理解,最后一个“其他参数”,到底是什么其他参数,这个东西真的很难找,用到再解释吧,官方文档如下

basicPublish 的翻译如下

1 /**
2 * Publish a message.发送一条消息
3 *
4 * Publishing to a non-existent exchange will result in a channel-level
5 * protocol exception, which closes the channel.
6 *
7 * Invocations of Channel#basicPublish will eventually block if a
8 * http://www.rabbitmq.com/alarms.html">resource-driven alarm is in effect.
9 * 10 * @see com.rabbitmq.client.AMQP.Basic.Publish 11 * @see http://www.rabbitmq.com/alarms.html">Resource-driven alarms 12 * @param exchange the exchange to publish the message to 交换模式,会在后面讲,官方文档在这里 13 * @param routingKey the routing key 控制消息发送到哪个队列 14 * @param props other properties for the message - routing headers etc 其他参数 15 * @param body the message body 消息,byte 数组 16 * @throws java.io.IOException if an error is encountered 17 */
18 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

这里又有个其他参数,它的类型是这样的,设置消息的一些详细属性

5.编写 MessageConsumer

为了和 Sender 区分开,新建一个 Maven 项目 MessageConsumer

1 package com.liyang.ticktock.rabbitmq; 2
3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5
6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8
9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16
17 public class MessageConsumer { 18
19 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); 20
21 public boolean consume(String queueName){ 22 //连接 RabbitMQ
23 ConnectionFactory factory = new ConnectionFactory(); 24 factory.setHost("127.0.0.1"); 25 Connection connection = null; 26 Channel channel = null; 27 try { 28 connection = factory.newConnection(); 29 channel = connection.createChannel(); 30 //这里声明 queue 是为了取消息的时候,queue 肯定会存在 31 //注意,queueDeclare 是幂等的,也就是说,消费者和生产者,不论谁先声明,都只会有一个 queue
32 channel.queueDeclare(queueName, false, false, false, null); 33
34 //这里重写了 DefaultConsumer 的 handleDelivery 方法,因为发送的时候对消息进行了 getByte(),在这里要重新组装成 String
35 Consumer consumer = new DefaultConsumer(channel){ 36 @Override 37 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 38 throws IOException { 39 String message = new String(body, "UTF-8"); 40 logger.info("Received '" + message + "'"); 41 } 42 }; 43 //上面是声明消费者,这里用声明的消费者消费掉队列中的消息
44 channel.basicConsume(queueName, true, consumer); 45
46 //这里不能关闭连接,调用了消费方法后,消费者会一直连接着 rabbitMQ 等待消费
47
48 } catch (IOException | TimeoutException e) { 49 //失败后记录日志,返回 false,代表消费失败
50 logger.error("send message faild!",e); 51 return false; 52 } 53
54
55 return true; 56 } 57 }

然后在 App 的 main 方法中调用 Cunsumer 进行消费

1 public class App 2 {
3 //这个队列名字要和生产者中的名字一样,否则找不到队列
4 private final static String QUEUE_NAME = "hello";
5
6 public static void main( String[] args ) 7 {
8 MessageConsumer consumer = new MessageConsumer(); 9 consumer.consume(QUEUE_NAME); 10 } 11 }

结果如下,消费者一直在等待消息,每次有消息进来,就会立刻消费掉

6.多个消费者同时消费一个队列

改造一下 Consumer

在 App 中 new 多个消费者

改造 Sender,使它不停的往 RabbitMQ 中发送消息

启动 Sender

启动 Consumer,发现消息很平均的发给四个客户端,一人一个,谁也不插队

如果我们把速度加快呢?把 Sender 的休息时间去掉,发现消费开始变得没有规律了,其实呢,它还是有规律的,这个是 RabbitMQ 的特性,称作“Round-robin dispatching”,消息会平均的发送给每一个消费者,可以看第一第二行,消息分别是 56981 和 56985,相应的 82、82、84 都被分给了其他线程,只是在当前线程的时间片内,可以处理这么多任务,所以就一次打印出来了

7.结束语

这一章介绍了从安装到用 JAVA 语言编写生产者与消费者,在这里只是简单的消费消息并打印日志,如果一个消息需要处理的时间很长,而处理的过程中,这个消费者挂掉了,那消息会不会丢失呢?答案是肯定的,而且已经分配给这个消费者,但还没来得及处理的消息也会一并丢失掉,这个问题,RabbitMQ 早就考虑到了,并且提供了解决方案,下一篇博文将进行详细介绍

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 364 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...