python 连接 ActiveMQ

本贴最后更新于 1505 天前,其中的信息可能已经时移世改

Python 连接 ActiveMQ

由于项目需求,需要 用 python 连接 ActiveMQ,因为对 python 不是很熟悉,在实现功能的时候遇到了很多问题,在这里记录一下。

stomp 协议

通过搜索引擎搜了一下, python 是如何连接 ActiveMQ 的,大部分都是通过 stomp 协议来实现的,所以这里先了解一下 stomp 协议。

概述

简单(或流) 面向文本的消息协议(STOMP),以前称为 TTMP,是一种简单的基于文本的协议,设计用于与面向消息的中间件(MOM)一起工作。它提供了一种可互操作的有线格式,允许 STOMP 客户机与支持该协议的任何消息代理通信。

该协议与 HTTP 大致相似,并使用一下命令在 TCP 上工作

  • 连接
  • 发送
  • 订阅
  • 取消订阅
  • 开始
  • 提交
  • 中止
  • 正确应答(ACK)
  • 否订应答(NACK)
  • 断开

客户端和服务器之间的通信是通过一个由多行组成的"框架"。第一行包含命令,后跟窗体 : (每行一个),后跟一个空白行,然后是正文内容,以空字符结尾。服务器和客户端之间的通信是通过具有类似格式的标头和正文内容的消息、收据或错误框架。

实现

以下是一些支持 STOMP 的 MOM 产品:

  • Apache ActiveMQ,也称为保险丝消息代理
  • 黄蜂 Q
  • 打开消息队列(OpenMQ)
  • RabbitMQ(消息代理,支持 STOMP)
  • 系统日志 - ng 通过其 Stomp 目标插件

以上内容摘自维基百科

stomp.py 依赖

简介

stomp.py 是 Python 客户端库,用于使用 STOMP 协议访问消息传递服务器(如 ActiveMQ、Artemis 或 RabbitMQ)。它还可以作为一个独立的命令行客户机运行以进行测试

安装

可以通过 pip 命令直接安装

pip install stomp.py

详细参照 https://pypi.org/project/stomp.py/

Python 代码实现

代码实现的时候遇到了很多问题,比如断连重连、如何让程序一直监听消息等,下面一点点说

推送消息到队列

import stomp
def send_to_queue(msg):
    conn = stomp.Connection10([('127.0.0.1',61613)])
    conn.connect(username="admin",passcode="admin",wait=True)
    conn.send('/queue/test', msg,headers={'persistent':'true'})   # ① 
    conn.disconnect()

if __name__ == '__main__':
   send_to_queue('aaa')

上述代码执行完以后,ActiveMQ 会收到一条消息,如下图

image.png

注意: 上面代码块 ① 处,传入参数中有一个 headers={'persistent':'true'} 其中 headers 参数需要传入字典类型(Java 中的 Map 类型), persistent 是 代表 这条消息是否需要 ActiveMQ 持久化

因为我的项目中是需要进行持久化的,在网上找了好久,可苦了我了。。。。,我不仅查了 stomp.py 的官网,还查了 stomp 协议的官网,都没有找到,最后还是在搜 python 代码实现偶然发现的,因为搜索引擎上大都是一些简单的实现,很少有这种往 headers 中传参数。。。 。

headers 中可以传入哪些参数,以及具体的值官网上都没找到,等以后有机会了再整理一下。

接收消息

接收消息也遇到了很多棘手的问题,先把网上的最简单的实现代码贴出来

 import stomp

# 定义一个监听类,里面有一个 on_message 方法
class SampleListener(object):
    def on_message(self, headers, message):
        print(message)

if __name__ == '__main__':
  
   conn = stomp.Connection10([(mq_url, mq_port)])
   conn.set_listener('test', SampleListener())  # 'listenerName' 可以自己随便定义
   conn.connect()
   conn.subscribe('/queue/test')
   time.sleep(2)   # 这里主程序睡了两秒 ,不然程序还没走到回调那里就死掉了


控制台会打印,消息队列中的消息。但是有一个问题,怎么让程序一直监听消息队列中的消息呢?

程序一直监听消息队列

# 定义一个连接方法
def connect():
    conn = stomp.Connection10([('127.0.0.1', 61613)])
    conn.set_listener('listener_name', SampleListener())
    conn.connect()
    conn.subscribe('test')
    return conn

if __name__ == '__main__':
  conn = connect()   # 连接消息队列
  while True:        # 一个while 循环
      time.sleep(5)  # 我担心一直 while 会 耗资源,所以再循环体中睡了 5秒钟

上面代码解决了程序一直或者,并且接收消息的功能。 这个睡眠不会影响消息的实时性,这个睡眠只是针对主线程的。消息过来的时候是另外的线程处理的,所以不需要担心 这个睡眠会影响消息的实时性。

但是我又想到一个问题,如果消息队列挂掉了会怎么样?我发现我把消息队列关掉以后,python 这边是没有任何反应的。然后我又 启动 ActiveMQ,发现这时候发送的消息,python 这边都是接收不到的。

有问题了,这个应该怎么解决呢?我最初的想法是, stomp.py 一定会有一个 断开连接的回调的。所以查了一下 stomp.py 的官网没找到这方面的说明。。。。 嗯。。。 头大

没办法硬着头皮看了 stomp.py 的源码。.... 嗯..... 还是没找到。

就在我万般无奈,没有头绪,我点 conn. 方法的时候,发现了一个 is_connected() 的方法。

抱着试试的态度,找出了解决断连重连的解决方案。

解决断连重连问题

上代码

# 连接消息队列
def connect():
    conn = stomp.Connection10([('127.0.0.1', 61613)])
    conn.set_listener('listener_name', SampleListener())
    conn.connect()
    conn.subscribe('test')
    return conn

if __name__ == '__main__':
   # 连接消息队列
   conn = connect()
   while True:
       # 每 5秒判断一次,是否断开连接(因为没有找到断连得回调,只好自己定时判断)
       time.sleep(5)
       if not conn.is_connected():
           # 断连重连
           conn = connect()
   conn.disconnect()

断连重连解决了,程序可以检测出断开连接,并主动重连消息队列。

但是问题又来了.... 嗯... stomp.py 连接的时候,如果失败了 3 次 就会主动关闭进程

官网找不到解决方案,只好又看了看源码,stomp 构造方法可以传入一个参数 reconnect_attempts_max

该参数代表 连接失败重连的次数,如果设置成 -1 则代表一直重连。 哈哈,解决了,直接上代码

# 连接消息队列
def connect():
    # reconnect_attempts_max 设置成 -1 , 连接不成功,一直尝试重连
    conn = stomp.Connection10([('127.0.0.1', 61613)],reconnect_attempts_max=-1)
    conn.set_listener('listener_name', SampleListener())
    conn.connect()
    conn.subscribe('test')
    return conn

if __name__ == '__main__':
   # 连接消息队列
   conn = connect()
   while True:
       # 每 5秒判断一次,是否断开连接(因为没有找到断连得回调,只好自己定时判断)
       time.sleep(5)
       if not conn.is_connected():
           # 断连重连
           conn = connect()
   conn.disconnect()

断连重连终于解决了,这时候关掉 activeMQ, python 会一直尝试连接消息队列.

再次打开 activeMQ ,新的消息也能接收到.

  • 消息队列
    40 引用 • 52 回帖 • 2 关注
  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    545 引用 • 672 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • martin1qi

    hello, 有问题请教一下。stomp 我没有找到消息重新交付的功能,如果 ack 设置了 client,但是消费失败了,怎么尝试重新消费 3 次停止呢?现在代码是尝试一直消费。。

    # -*-coding:utf-8-*-
    import stomp
    import time
    
    __listener_name = 'SampleListener'
    __topic_name1 = 'que_test1'
    __host = '127.0.0.1'
    __port = 61613
    __user = 'admin'
    __password = 'admin'
    
    
    class Queue_Demo1(stomp.ConnectionListener):
        def __init__(self, conn):
            self.conn = conn
    
        def on_message(self, headers, body):
            print('Queue_Demo1 start')
            print('headers: %s' % headers['destination'])
            print(headers)
            print('message1111: %s\n' % body)
            # try:
            print(1/0)
            self.conn.ack(id=headers['message-id'], subscription=headers['subscription'])
            # except:
            #     pass
                # self.conn.nack(id=headers['message-id'], subscription=headers['subscription'])
    
    
    def connect_listen(host, port, topic_name, func_id):
        connect = stomp.Connection([(host, port)], reconnect_sleep_max=30, reconnect_attempts_max=1)
        func_dict = {'1': Queue_Demo1}
        connect.set_listener(topic_name, func_dict[func_id](connect))
        connect.connect(__user, __password, wait=True)
        connect.subscribe(topic_name, id='1', ack='client')
        return connect
    
    
    def receive_from_topic():
        conn = connect_listen(__host, __port, __topic_name1, '1')
    
        while True:
            try:
                if not conn.is_connected():
                    conn = connect_listen(__host, __port, __topic_name1, '1')
                    receive_from_topic()
            except Exception as e:
                time.sleep(15)
    
    
    if __name__ == '__main__':
        receive_from_topic()
    
    
    4 回复
  • zh847707713
    作者

    不好意思,最近太忙了没看到提问,现在问题解决了么?😄

  • zh847707713
    作者

    代码可能写得不是很标准,但是达到了你想要的效果,可以参考下

  • zh847707713
    作者

    代码贴不进来,过两天再贴上来

  • zh847707713
    作者

    image.png