Python 连接 ActiveMQ
由于项目需求,需要 用 python 连接 ActiveMQ,因为对 python 不是很熟悉,在实现功能的时候遇到了很多问题,在这里记录一下。
stomp 协议
通过搜索引擎搜了一下, python 是如何连接 ActiveMQ 的,大部分都是通过 stomp 协议来实现的,所以这里先了解一下 stomp 协议。
概述
简单(或流) 面向文本的消息协议(STOMP),以前称为 TTMP,是一种简单的基于文本的协议,设计用于与面向消息的中间件(MOM)一起工作。它提供了一种可互操作的有线格式,允许 STOMP 客户机与支持该协议的任何消息代理通信。
该协议与 HTTP 大致相似,并使用一下命令在 TCP 上工作:
- 连接
- 发送
- 订阅
- 取消订阅
- 开始
- 提交
- 中止
- 正确应答(ACK)
- 否订应答(NACK)
- 断开
客户端和服务器之间的通信是通过一个由多行组成的"框架"。第一行包含命令,后跟窗体 : (每行一个),后跟一个空白行,然后是正文内容,以空字符结尾。服务器和客户端之间的通信是通过具有类似格式的标头和正文内容的消息、收据或错误框架。
实现
以下是一些支持 STOMP 的 MOM 产品:
- Apache ActiveMQ,也称为保险丝消息代理
- 黄蜂 Q
- 打开消息队列(OpenMQ)
- RabbitMQ(消息代理,支持 STOMP)
- 系统日志 - ng 通过其 Stomp 目标插件
以上内容摘自维基百科
stomp.py 依赖
简介
stomp.py 是 Python 客户端库,用于使用 STOMP 协议访问消息传递服务器(如 ActiveMQ、Artemis 或 RabbitMQ)。它还可以作为一个独立的命令行客户机运行以进行测试
安装
可以通过 pip 命令直接安装
pip install stomp.py
详细参照 https://pypi.org/project/stomp.py/
Python 代码实现
代码实现的时候遇到了很多问题,比如断连重连、如何让程序一直监听消息等,下面一点点说
推送消息到队列
import stomp
def send_to_queue(msg):
conn = stomp.Connection10([('127.0.0.1',61613)])
conn.connect(username="admin",passcode="admin",wait=True)
conn.send('/queue/test', msg,headers={'persistent':'true'}) # ①
conn.disconnect()
if __name__ == '__main__':
send_to_queue('aaa')
上述代码执行完以后,ActiveMQ 会收到一条消息,如下图
注意: 上面代码块 ① 处,传入参数中有一个 headers={'persistent':'true'}
其中 headers 参数需要传入字典类型(Java 中的 Map 类型), persistent 是 代表 这条消息是否需要 ActiveMQ 持久化。
因为我的项目中是需要进行持久化的,在网上找了好久,可苦了我了。。。。,我不仅查了 stomp.py 的官网,还查了 stomp 协议的官网,都没有找到,最后还是在搜 python 代码实现偶然发现的,因为搜索引擎上大都是一些简单的实现,很少有这种往 headers 中传参数。。。 。
headers 中可以传入哪些参数,以及具体的值官网上都没找到,等以后有机会了再整理一下。
接收消息
接收消息也遇到了很多棘手的问题,先把网上的最简单的实现代码贴出来
import stomp
# 定义一个监听类,里面有一个 on_message 方法
class SampleListener(object):
def on_message(self, headers, message):
print(message)
if __name__ == '__main__':
conn = stomp.Connection10([(mq_url, mq_port)])
conn.set_listener('test', SampleListener()) # 'listenerName' 可以自己随便定义
conn.connect()
conn.subscribe('/queue/test')
time.sleep(2) # 这里主程序睡了两秒 ,不然程序还没走到回调那里就死掉了
控制台会打印,消息队列中的消息。但是有一个问题,怎么让程序一直监听消息队列中的消息呢?
程序一直监听消息队列
# 定义一个连接方法
def connect():
conn = stomp.Connection10([('127.0.0.1', 61613)])
conn.set_listener('listener_name', SampleListener())
conn.connect()
conn.subscribe('test')
return conn
if __name__ == '__main__':
conn = connect() # 连接消息队列
while True: # 一个while 循环
time.sleep(5) # 我担心一直 while 会 耗资源,所以再循环体中睡了 5秒钟
上面代码解决了程序一直或者,并且接收消息的功能。 这个睡眠不会影响消息的实时性,这个睡眠只是针对主线程的。消息过来的时候是另外的线程处理的,所以不需要担心 这个睡眠会影响消息的实时性。
但是我又想到一个问题,如果消息队列挂掉了会怎么样?我发现我把消息队列关掉以后,python 这边是没有任何反应的。然后我又 启动 ActiveMQ,发现这时候发送的消息,python 这边都是接收不到的。
有问题了,这个应该怎么解决呢?我最初的想法是, stomp.py 一定会有一个 断开连接的回调的。所以查了一下 stomp.py 的官网没找到这方面的说明。。。。 嗯。。。 头大
没办法硬着头皮看了 stomp.py 的源码。.... 嗯..... 还是没找到。
就在我万般无奈,没有头绪,我点 conn.
方法的时候,发现了一个 is_connected()
的方法。
抱着试试的态度,找出了解决断连重连的解决方案。
解决断连重连问题
上代码
# 连接消息队列
def connect():
conn = stomp.Connection10([('127.0.0.1', 61613)])
conn.set_listener('listener_name', SampleListener())
conn.connect()
conn.subscribe('test')
return conn
if __name__ == '__main__':
# 连接消息队列
conn = connect()
while True:
# 每 5秒判断一次,是否断开连接(因为没有找到断连得回调,只好自己定时判断)
time.sleep(5)
if not conn.is_connected():
# 断连重连
conn = connect()
conn.disconnect()
断连重连解决了,程序可以检测出断开连接,并主动重连消息队列。
但是问题又来了.... 嗯... stomp.py 连接的时候,如果失败了 3 次 就会主动关闭进程
官网找不到解决方案,只好又看了看源码,stomp 构造方法可以传入一个参数 reconnect_attempts_max
,
该参数代表 连接失败重连的次数,如果设置成 -1 则代表一直重连。 哈哈,解决了,直接上代码
# 连接消息队列
def connect():
# reconnect_attempts_max 设置成 -1 , 连接不成功,一直尝试重连
conn = stomp.Connection10([('127.0.0.1', 61613)],reconnect_attempts_max=-1)
conn.set_listener('listener_name', SampleListener())
conn.connect()
conn.subscribe('test')
return conn
if __name__ == '__main__':
# 连接消息队列
conn = connect()
while True:
# 每 5秒判断一次,是否断开连接(因为没有找到断连得回调,只好自己定时判断)
time.sleep(5)
if not conn.is_connected():
# 断连重连
conn = connect()
conn.disconnect()
断连重连终于解决了,这时候关掉 activeMQ, python 会一直尝试连接消息队列.
再次打开 activeMQ ,新的消息也能接收到.
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于