深入了解 ActiveMQ

本贴最后更新于 1570 天前,其中的信息可能已经时过境迁

认识 MQ(Message Queue)

什么是消息队列

image.png

首先我们先从以下几个维度来认识一下消息队列:

  • 消息队列:一般我们会简称它为 MQ(MessageQueue)
  • 消息(Message):传输的数据。
  • 队列(Queue):队列是一种先进先出的数据结构。
  • 消息队列从字面的含义来看就是一个存放消息的容器。
  • 消息队列可以简单理解为:把要传输的数据放在队列中。
  • 把数据放到消息队列叫做生产者。
  • 从消息队列里边取数据叫做消费者。

为什么需要消息队列

使用消息队列主要是基于以下三个主要场景:

  • 解耦
  • 异步
  • 削峰/限流

下面我们分场景来描述下使用消息队列带来的好处

解耦

假设我们有一个用户系统 A,用户系统 A 可以产生一个 userId。

然后,现在有系统 B 和系统 C 都需要这个 userId 去做相关的操作。

image.png

伪码大致如下:

java
public class SystemA {
    // 系统B和系统C的依赖
    SystemB systemB = new SystemB();
    SystemC systemC = new SystemC();
    // 系统A独有的数据userId
    private String userId = "activeMq-1234567890";
    public void doSomething() {
        // 系统B和系统C都需要拿着系统A的userId去操作其他的事
        systemB.SystemBNeed2do(userId);
        systemC.SystemCNeed2do(userId);
    }
}

这样类似的业务场景大家是不是很熟悉,大家是不是这样写很合情合理,也很简单。

某一天,系统 B 的负责人告诉系统 A 的负责人,现在系统 B 的 SystemBNeed2do(String userId)这个接口不再使用了,让系统 A 别去调它了。于是,系统 A 的负责人说"好的,那我就不调用你了。",于是就把调用系统 B 接口的代码给删掉了。代码变成这样了:

java
public void doSomething() {
  // 系统A不再调用系统B的接口了
  //systemB.SystemBNeed2do(userId);
  systemC.SystemCNeed2do(userId);
}

由于业务需要,系统 D 说也需要用到系统 A 的 userId,于是代码改成了这样:

java
public void doSomething() {
        // 已经不再需要系统B的依赖了
        //systemB.SystemBNeed2do(userId);
        // 系统C和系统D都需要拿着系统A的userId去操作其他的事
        systemC.SystemCNeed2do(userId);
        systemD.SystemDNeed2do(userId);

}

当前系统 A、B、C、D 系统的交互是这样子的。

image.png

随着业务需求的变化,代码也要一遍一遍的修改。

还会存在另外一个问题,调用系统 C 的时候,如果系统 C 挂了,系统 A 还要想办法处理。如果调用系统 D 时,由于网络延迟,请求超时了,那系统 A 是反馈 fail 还是重试?

那么怎么去解决这样的现状呢,如何从频繁的修改代码中解脱呢?

这时候我们就引入一层消息队列中间件,交互图如下:

image.png

将系统 A 产生的 userId 写到消息队列中,系统 C 和系统 D 从消息队列中拿数据。

这样有什么好处?

  • 系统 A 只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统 A 一点都不关心。
  • 即便现在系统 D 不想要 userId 这个数据了,系统 B 又突然想要 userId 这个数据了,都跟系统 A 无关,系统 A 一点代码都不用改。
  • 系统 D 拿 userId 不再经过系统 A,而是从消息队列里边拿。系统 D 即便挂了或者请求超时,都跟系统 A 无关,

只跟消息队列有关。这样一来,系统 A 与系统 B、C、D 都解耦了。

异步

系统 A 做的是主要的业务,而系统 B、C、D 是非主要的业务。比如系统 A 处理的是订单下单,而系统 B 是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统 C 和系统 D 也是处理一些小事而已。那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统 B、C、D 的接口。

image.png

削峰/限流

我们再来一个场景,现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒 3000 个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理 1000 个请求。

image.png

系统 B 和系统 C 根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有 8000 个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩。

image.png

什么是 JMS MQ

全称:Java MessageService 中文:Java 消息服务。

JMS 是 Java 的一套 API 标准,最初的目的是为了使应用程序能够访问现有的 MOM 系 统(MOM 是 MessageOriented Middleware 的英文缩写,指的是利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。) 后来被许多现有的 MOM 供应商采用,并实现为 MOM 系统。

常见 MOM 系统包括 Apache 的 ActiveMQ、阿里巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft
的 MSMQ、BEA 的 RabbitMQ 等。 (并非全部的 MOM 系统都遵循 JMS 规范)】

基于 JMS 实现的 MOM,又被称为 JMSProvider。

JMS 中的一些概念

Broker

消息服务器,作为 server 提供消息核心服务

Provider 生产者

消息生产者是由会话创建的一个对象,用于把消息发动到一个目的地

Consumer 消费者

消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法:

同步消费。通过调用消费者的 receive 方法从目的地中显式提取消息。receive 方法可以一直阻塞到消息到达。

异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

P2P 点对点消息模型

消息生产者生产消息发送到 queue 中,然后消息消费者从 queue 中取出并且消费消息。
消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了。当消费者不存在时,消息会一直保存,直到有消费消费。

Pub/Sub 发布订阅消息模型

消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。 和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。当生产者发布消息,不管是否有消费者。都不会保存消息一定要先有消息的消费者,后有消息的生产者。

P2P vs Pub/Sub

image.png

Queue

队列存储,常用与点对点消息模型

默认只能由唯一的一个消费者处理。一旦处理消息删除。

Topic

主题存储,用于订阅/发布消息模型

主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处理的业务场景中可使用。

ConnectionFactory

连接工厂,jms 中用它创建连接

连接工厂是客户用来创建连接的对象,例如 ActiveMQ 提供的 ActiveMQConnectionFactory。

Connection

JMS Connection 封装了客户与 JMS 提供者之间的一个虚拟的连接。

Destination 消息的目的地

目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。

订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS 规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。
持久订阅允许消费者消费它在未处于激活状态时发送的消息。 在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

Session

JMS Session 是生产和消费消息的一个单线程上下文。会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

消息可靠性机制

确认 JMS 消息

只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。

在事务性会话中,当一个事务被提交的时候,确认自动发生。

在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

Session.AUTO_ACKNOWLEDGE。当客户成功的从 receive 方法返回的时候,或者从 MessageListener.onMessage 方法成功返回的时候,会话自动确认客户收到的消息。

Session.CLIENT_ACKNOWLEDGE。客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。

Session.DUPS_ACKNOWLEDGE。该选择只是会话迟钝的确认消息的提交。如果 JMS Provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS Provider 必须把消息头的 JMSRedelivered 字段设置为 true。

持久性

JMS 支持以下两种消息提交模式:

PERSISTENT。指示 JMSProvider 持久保存消息,以保证消息不会因为 JMS Provider 的失败而丢失。

NON_PERSISTENT。不要求 JMS Provider 持久保存消息。

优先级

可以使用消息优先级来指示 JMS Provider 首先提交紧急的消息。优先级分 10 个级别,从 0(最低)到 9(最高)。如果不指定优先级,默认级别是 4。需要注意的是,JMSProvider 并不一定保证按照优先级的顺序提交消息。

消息过期

可以设置消息在一定时间后过期,默认是永不过期

临时目的地

可以通过会话上的 createTemporaryQueue 方法和 createTemporaryTopic 方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

持久订阅

首先消息生产者必须使用 PERSISTENT 提交消息。客户可以通过会话上的 createDurableSubscriber 方法来创建一个持久订阅,该方法的第一个参数必须是一个 topic,第二个参数是订阅的名称。

JMS Provider 会存储发布到持久订阅对应的 topic 上的消息。如果最初创建持久订阅的客户或者任何其它客户使用相同的连接工厂和连接的客户 ID、相同的主题和相同的订阅名再次调用会话上的 createDurableSubscriber 方法,那么该持久订阅就会被激活。

JMS Provider 会象客户发送客户处于非激活状态时所发布的消息。

持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在创建之后会一直保留,直到应用程序调用会话上的 unsubscribe 方法。

本地事务

在一个 JMS 客户端,可以使用本地事务来组合消息的发送和接收。JMS Session 接口提供了 commit 和 rollback 方法。事务提交意味着生产的所有消息被发送,消费的所有消息被确认;事务回滚意味着生产的所有消息被销毁,消费的所有消息被恢复并重新提交,除非它们已经过期。

事务性的会话总是牵涉到事务处理中,commit 或 rollback 方法一旦被调用,一个事务就结束了,而另一个事务被开始。关闭事务性会话将回滚其中的事务。

需要注意的是,如果使用请求/回复机制,即发送一个消息,同时希望在同一个事务中等待接收该消息的回复,那么程序将被挂起,因为知道事务提交,发送操作才会真正执行。
需要注意的还有一个,消息的生产和消费不能包含在同一个事务中。

ActiveMQ

存储

ActiveMQ 支持很多种存储方式,常见的有 KahaDB 存储,AMQ 存储,JDBC 存储,LevelDB 存储,Memory
消息存储。 我们重点介绍一下 KahaDB 和 JDBC 存储方式。

KahaDB 存储

KahaDB 是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

在 data/kahadb 这个目录下,会生成四个文件,来完成消息持久化 db.data 它是消息的索引文件,本质上是 B-Tree(B 树),使用 B-Tree 作为索引指向 db-*.log 里面存储的消息 db.redo 用来进行消息恢复 *db-.log 存储消息内容。

image.png

新的数据以 APPEND 的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较 快的。默认是 32M,达到阀值会自动递增 lock 文件 锁,写入当前获得 kahadb 读写权限的 broker ,用于在集群环境下的竞争处理。

KahaDB 有如下几个特性:

  • 日志形式存储消息;
  • 消息索引以 B-Tree 结构存储,可以快速更新;
  • 完全支持 JMS 事务;
  • 支持多种恢复机制 kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

配置方式如下:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

JDBC 存储

支持通过 JDBC 将消息存储到关系数据库,性能上不如文件存储,能通过关系型数据库查询到消息的信息。

MQ 支持的数据库:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。使用 JDBC 存储需要用到下面三张数据表。

activemq_acks:用于存储订阅关系。如果是持久化 Topic,订阅者和服务器的订阅关系在这个表保存。 主要的数据库字段如下:

  • container:消息的 destination
  • sub_dest:如果是使用 static 集群,这个字段会有集群其他系统的信息
  • client_id:每个订阅者都必须有一个唯一的客户端 id 用以区分
  • sub_name:订阅者名称
  • selector:选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性 and 和 or 操作
  • last_acked_id:记录消费过的消息的 id。

activemq_lock:在集群环境中才有用,只有一个 Broker 可以获得消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用于记录哪个 Broker 是当前的 Master Broker。

activemq_msgs:用于存储消息,Queue 和 Topic 都存储在这个表中。 主要的数据库字段如下

  • id:自增的数据库主键
  • container:消息的 destination
  • msgid_prod:消息发送者客户端的主键
  • msg_seq:是发送消息的顺序,msgid_prod+msg_seq 可以组成 jms 的 messageid
  • expiration:消息的过期时间,存储的是从 1970-01-01 到现在的毫秒数
  • msg:消息本体的 java 序列化对象的二进制数据
  • priority:优先级,从 0-9,数值越大优先级越高
  • xid:topic

配置方式如下:

  1. 配置数据源 conf/acticvemq.xml 文件:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
   <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
   <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
   <property name="username" value="root"/>
   <property name="password" value="111111"/>
   <property name="maxActive" value="200"/>
   <property name="poolPreparedStatements" value="true"/>
</bean>   
  1. 配置 broke 中的 persistenceAdapter
    dataSource 指定持久化数据库的 bean,createTablesOnStartup 是否在启动的时候创建数据表,默认值是 true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为 true,之后改成 false。
<persistenceAdapter>
  <jdbcPersistenceAdapter dataSource="#mysql-ds"  createTablesOnStartup="false"/>
</persistenceAdapter> 

协议

ActiveMQ 支持的 client-broker 通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。

Transmission Control Protocol (TCP)

这是默认的 Broker 配置,TCP 的 Client 监听端口是 61616。

在网络传输数据前,必须要序列化数据,消息是通过一个叫 wire protocol 的来序列化成字节流。默认情况下,ActiveMQ 把 wire protocol 叫做 OpenWire,它的目的是促使网络上的效率和数据快速交互。

TCP 连接的 URI 形式:port?key=value&key=value

TCP 传输的优点: (1)TCP 协议传输可靠性高,稳定性强 (2)高效性:字节流方式传递,效率很高 (3)有效性、可用性:应用广泛,支持任何平台

New I/O API Protocol(NIO)

NIO 协议和 TCP 协议类似,但 NIO 更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的 client 调用和服务端有更多的负载。

适合使用 NIO 协议的场景:
(1)可能有大量的 Client 去链接到 Broker 上一般情况下,大量的 Client 去链接 Broker 是被操作系统的线程数所限制的。因此,NIO 的实现比 TCP 需要更少的线程去运行,所以建议使用 NIO 协议
(2)可能对于 Broker 有一个很迟钝的网络传输 NIO 比 TCP 提供更好的性能

NIO 连接的 URI 形式:port?key=value

Transport Connector 配置示例:

<transportConnectors>
  <transportConnector
    name="nio"
    uri="nio://localhost:61618?trace=true" />
</transportConnectors>

User Datagram Protocol(UDP)

UDP 和 TCP 的区别
(1)TCP 是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复制和丢失的。UDP,另一方面,它是不会保证数据包的传递的
(2)TCP 也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样确保了在发送和接收之间能够可靠的传递。相反,UDP 仅仅是一个链接协议,所以它没有可靠性之说

从上面可以得出:TCP 是被用在稳定可靠的场景中使用的;UDP 通常用在快速数据传递和不怕数据丢失的场景中,还有 ActiveMQ 通过防火墙时,只能用 UDP

UDP 连接的 URI 形式:port?key=value

Transport Connector 配置示例:

<transportConnectors>
    <transportConnector
        name="udp"
        uri="udp://localhost:61618?trace=true" />
</transportConnectors>

Active MQ 的安全机制

web 控制台安全
修改 jetty-realm.properties
# username: password [,rolename ...](用户名: 密码 角色)
注意:配置需重启 ActiveMQ 才会生效

消息安全机制
修改 activemq.xml 在中添加如下代码:

<plugins>
      <simpleAuthenticationPlugin>
          <users>
              <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
              <authenticationUser username="publisher" password="publisher"  groups="publishers,consumers"/>
              <authenticationUser username="consumer" password="consumer" groups="consumers"/>
              <authenticationUser username="guest" password="guest"  groups="guests"/>
          </users>
      </simpleAuthenticationPlugin>
 </plugins>

ActiveMQ 使用

在 java 中使用 ActiveMQ 只需要引入相关依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.11</version>
</dependency>

编写生产者

public class Sender {
	public static void main(String[] args) throws JMSException {
	// 1. 建立工厂对象,
	ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");
	//2 从工厂里拿一个连接
	Connection connection = acf.createConnection();
	connection.start();
	//3 从连接中获取Session(会话)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//4 从会话中获取目的地(Destination)消费者会从这个目的地取消息
	Queue queue = session.createQueue("mq.test");
	//5 从会话中创建消息提供者
	MessageProducer producer = session.createProducer(queue);
	//6 从会话中创建文本消息(也可以创建其它类型的消息体)
	TextMessage message = session.createTextMessage("msg: hello world");
	//7 通过消息提供者发送消息到ActiveMQ
	producer.send(message);
	//8 关闭连接
	connection.close();
	}
}

编写消费者

public class Receiver {
	public static void main(String[] args) throws JMSException {
	// 1. 建立工厂对象,
	ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");
	//2 从工厂里拿一个连接
	Connection connection = acf.createConnection();
	connection.start();
	//3 从连接中获取Session(会话)
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	//4 从会话中获取目的地(Destination)消费者会从这个目的地取消息
	Queue queue = session.createQueue("mq.test");
	//5 从会话中创建消息消费者
	MessageConsumer consumer = session.createConsumer(queue);
	while (true){
		//6 消费者接收消息
		Message msg = consumer.receive();
		TextMessage textMessage = (TextMessage) msg;
		System.out.println("text:"+textMessage.getText());
		}
	}
}

常用 API 及特性

  • 事务消息
    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    提交事务:session.commit();
    回滚事务:session.rollback();
    开启事务后,只有事务 commit 成功,消息才会发送到 MQ 中
  • 持久化
    默认持久化是开启的;
    开启非持久化示例代码:
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
  • 设置消息优先级
    producer.setPriority();
  • 设置消息超时/过期时间
    producer.setTimeToLive
    设置了消息超时的消息,消费端在超时后无法在消费到此消息。
  • 死信
    此类消息会进入到 ActiveMQ.DLQ 队列且不会自动清除,称为死信,有消息堆积的风险。
  • 签收模式
    签收代表接收端的 session 已收到消息的一次确认,反馈给 broker
    如果 session 带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
    消息事务是在生产者 producer 到 broker 或 broker 到 consumer 过程中同一个 session 中发生的,保证几条消息在发送过程中的原子性。在支持事务的 session 中,producer 发送 message 时在 message 中带有 transactionID。broker 收到 message 后判断是否有 transactionID,如果有就把 message 保存在 transaction store 中,等待 commit 或者 rollback 消息。
    ActiveMQ 支持自动签收与手动签收
    Session.AUTO_ACKNOWLEDGE
    当客户端从 receiver 或 onMessage 成功返回时,Session 自动签收客户端的这条消息的收条。
    Session.CLIENT_ACKNOWLEDGE
    客户端通过调用消息(Message)的 acknowledge 方法签收消息。在这种情况下,签收发生在 Session 层面:签收一个已经消费的消息会自动地签收这个 Session 所有已消费的收条。
    Session.DUPS_OK_ACKNOWLEDGE
    Session 不必确保对传送消息的签收,这个模式可能会引起消息的重复,但是降低了 Session 的开销,所以只有客户端能容忍重复的消息,才可使用。
  • 独占消费者
    Queue queue = session.createQueue("xxoo?consumer.exclusive=true");
  • 发送异步消息
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"admin",
				"admin",
				"tcp://localhost:61616"
				);
// 2.获取一个向ActiveMQ的连接
connectionFactory.setUseAsyncSend(true);
ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setUseAsyncSend(true);
  • 消息堆积
    producer 每发送一个消息,统计一下发送的字节数,当字节数达到 ProducerWindowSize 值时,需要等待 broker 的确认,才能继续发送。
    brokerUrl 中设置: tcp://localhost:61616?jms.producerWindowSize=1048576
    destinationUri 中设置: myQueue?producer.windowSize=1048576
  • 延迟消息投递
    首先在配置文件中开启延迟和调度
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

延迟发送示例代码:
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10*1000);

  • 创建监听器
ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
		ActiveMQConnectionFactory.DEFAULT_PASSWORD,
		"tcp://localhost:61618");
//2 从工厂里拿一个连接
Connection connection = acf.createConnection();
connection.start();
//3 从连接中获取Session(会话)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 从会话中获取目的地(Destination)消费者会从这个目的地取消息
Queue queue = session.createQueue("mq.test");
//5 从会话中创建消息消费者
MessageConsumer consumer = session.createConsumer(queue);
MyListener myListener = new MyListener();
MessageListener listener = myListener::receiveMessage;
consumer.setMessageListener(listener);

SpringBoot 整合 ActiveMQ

  • 添加依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  • 配置文件
server:
  port: 80
spring:
  activemq:
    broker-url: tcp://localhost:61618
    user: admin
    password: admin
    pool:
      enabled: true
      #连接池最大连接数
      max-connections: 5
      #空闲的连接过期时间,默认为30秒
      idle-timeout: 0
    packages:
      trust-all: true
  jms:
    pub-sub-domain: true
  • 配置类

@Configuration
@EnableJms
public class ActiveMqConfig {
// topic模式的ListenerContainer
 @Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
	        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
	        bean.setPubSubDomain(true);
	        bean.setConnectionFactory(activeMQConnectionFactory);
	        return bean;
	    }
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
	        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
	        bean.setConnectionFactory(activeMQConnectionFactory);
	        return bean;
	    }
}

  • 编写生产者
@Service
public class MqProducerService {
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
  
	public void sendStringQueue(String destination, String msg) {
		System.out.println("send...");
		ActiveMQQueue queue = new ActiveMQQueue(destination);
		jmsMessagingTemplate.afterPropertiesSet();
		ConnectionFactory factory = jmsMessagingTemplate.getConnectionFactory();
		try {
			Connection connection = factory.createConnection();
			connection.start();
		
			Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			Queue queue2 = session.createQueue(destination);
		
			MessageProducer producer = session.createProducer(queue2);
		
			TextMessage message = session.createTextMessage("hahaha");
		
		
			producer.send(message);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	
		jmsMessagingTemplate.convertAndSend(queue, msg);
	}
	public void sendStringQueueList(String destination, String msg) {
		System.out.println("xxooq");
		ArrayList<String> list = new ArrayList<>();
		list.add("1");
		list.add("2");
		jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(destination), list);
	}
}
  • 编写消费者
@JmsListener(destination = "user",containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(String msg) {
        System.out.println("接收到消息...." + msg);
    }

@JmsListener(destination = "ooo",containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
     System.out.println("接收到消息...." + msg);
 }

小结

本文详细介绍了为什么需要引入消息队列,JMS、ActiveMQ 的基础概念以及常用 API,与原生 JAVA 整合及 SpringBoot 整合等知识点,可以让大家更好的了解 ActiveMQ 的使用场景及使用方式。

  • ActiveMQ

    ActiveMQ 是 Apache 旗下的一款开源消息总线系统,它完整实现了 JMS 规范,是一个企业级的消息中间件。

    19 引用 • 13 回帖 • 668 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...