RabbitMQ- 从基础到实战(2)— 防止消息丢失

本贴最后更新于 2333 天前,其中的信息可能已经时移世异

1.简介

RabbitMQ 中,消息丢失可以简单的分为两种:客户端丢失和服务端丢失。针对这两种消息丢失,RabbitMQ 都给出了相应的解决方案。

2.防止客户端丢失消息

如图,生产者 P 向队列中生产消息,C1 和 C2 消费队列中的消息,默认情况下,RabbitMQ 会平均的分发消费给 C1C2(Round-robin dispatching),假设一个任务的执行时间非常长,在执行过程中,客户端挂了(连接断开),那么,该客户端正在处理且未完成的消息,以及分配给它还没来得及执行的消息,都将丢失。因为默认情况下,RabbitMQ 分发完消息后,就会从内存中把消息删除掉。

3.消息确认(Message acknowledgment)

为了解决上述问题,RabbitMQ 引入了消息确认机制,当消息处理完成后,给 Server 端发送一个确认消息,来告诉服务端可以删除该消息了,如果连接断开的时候,Server 端没有收到消费者发出的确认信息,则会把消息转发给其他保持在线的消费者。

验证上述问题

首先,我们验证上述问题(客户端丢失消息)是否真的存在,对 Consumer 进行如下改造。

先生产两条消息

启动消费者,在消费者接收到消息,还没处理完成的时候,强制关掉

这时,观察控制台,发现两条消息都没有了,1 条是在执行中丢失的,还有 1 条,已经分配给这个 Consumer,还没来得及处理,也丢失了

这证明了上述问题是真的存在的,如果发生在生产环境,将产生难以预料的后果

引入消息确认机制

为了方便观察,我们用 CMD 来运行 Consumer,要通过 maven 打成可执行的 JAR 包,需要在 pom.xml 中增加如下配置

ConsumerfinalName> maven-assembly-pluginartifactId> falseappendAssemblyId> jar-with-dependenciesdescriptorRef> descriptorRefs> com.liyang.ticktock.rabbitmq.AppmainClass> manifest> archive> configuration> make-assemblyid> packagephase> assemblygoal> goals> execution> executions> plugin> org.apache.maven.pluginsgroupId> maven-compiler-pluginartifactId> 1.8source> 1.8target> configuration> plugin>
    plugins>
build>

上述配置描述了最终打包名字、入口类路径、带上依赖包、使用 1.8 版本的 JDK 进行打包,配置完后,就可以通过 maven 的 install 方法,在 target 目录生成可执行的 jar 包,如果包大小很小,应检查配置,是不是没有带上依赖包

再次改造 Consummer 类

install 成可执行 jar 包,通过 cmd 开启两个 consumer

通过 Sender 发送一条消息,然后用 Ctrl+C 结束先收到消息的 Consumer,发现另外一个 Consumer 接收到了未处理完的消息

问题得到了解决,现在消费者在执行过程中死掉也不会丢失消息了

看一下发送确认的方法

1 /**
2 * Acknowledge one or several received
3 * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} 4 * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method 5 * containing the received message being acknowledged.
6 * @see com.rabbitmq.client.AMQP.Basic.Ack 7 * @param deliveryTag the tag from the received 这个是 RabbitMQ 用来区分消息的,文档在这 8 * @param multiple true to acknowledge all messages up to and 为 true 的话,确认所有消息,为 false 只确认当前消息 9 * including the supplied delivery tag; false to acknowledge just 10 * the supplied delivery tag. 11 * @throws java.io.IOException if an error is encountered 12 */
13 void basicAck(long deliveryTag, boolean multiple) throws IOException;

在官方文档中,这样描述 deliveryTag

简单来说,就是 RabbitMQ 内部用来区分消息的一个标签,从 envelope 中获取就行了

忘记确认将引起内存泄漏

RabbitMQ 只有在收到消费者确认后,才会从内存中删除消息,如果消费者忘了确认(更多情况是因为代码问题没有执行到确认的代码),将会导致内存泄漏

验证一下

注释掉 Consumer 中的确认代码

运行 Sender 和 Consumer,不停的生产消费消息,发现消费者在正常的消费消息

查看控制台,发现已经被吃掉了 43KB 的内存,所以,在试用过程中,一定要保证消息确认在任何情况下都可以发出,否则即使消费者处理完成,RabbitMQ 也不会把消息在内存中清除,在该消费者断开连接之后,还会把消息转发给其他消费者重新处理,将引发难以预计的问题

4.消息的持久化

现在,消费者宕机已经无法影响到我们的消息了,但如果 RabbitMQ 重启了,消息依然会丢失。所幸的是,RabbitMQ 提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启 RabbitMQ,消息也不会丢失。但是,仍然有一个非常短暂的时间窗口(RabbitMQ 收到消息还没来得及存到硬盘上)会导致消息丢失,如果需要严格的控制,可以参考官方文档

要使用 RabbitMQ 的消息持久化,在声明队列时设置一个参数即可

注意,RabbitMQ 不允许对一个已经存在的队列用不同的参数重新声明,对于试图这么做的程序,会报错,所以,改动之前代码之前,要在控制台中把原来的队列删除

重新声明队列后,发现 Durable 为 true

重启 RabbitMQ

队列的消息没有丢失

5.结束语

这一章介绍了 RabbitMQ 消息的确认和持久化,后面将会继续深入介绍 RabbitMQ 的其他特性

  • RabbitMQ

    RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种语言客户端,如:Python、Ruby、.NET、Java、C、PHP、ActionScript 等。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    49 引用 • 60 回帖 • 395 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...