关于 ActiveMQ 更多详细的介绍可以参考下面的链接:
https://www.ilanni.com/?p=13543
https://www.cnblogs.com/xinhuaxuan/p/6119912.html
ActiveMQ 必须要引用的 jar 文件,参考下面链接
http://activemq.apache.org/initial-configuration.html
ActiveMQ 的 Queue:
ActiveMQConnectionFactory: 是一个管理对象,通过它来创建连接,这个类实现了 QueueConnectionFactory 和 TopicConnectionFactory。所以可以通过这个对象来创建 Queue 连接和 Topic 连接
Connection:这个是通过 ActiveMQConnectionFactory 创建来的连接,通过这个连接可以创建 Session 对象。无论是 Sender 还是 Receiver 都是通过它来创建。
Session:这个是通过 Connection 创建来的 Session 对象,可以作为 Sender ,也可以作为 Receiver。
Queue :这是一个队列,也是 ActiveMQ 的核心之一,Sender 将数据放到这里,Receiver 也是从这里取出数据。Queue 也是通过 Session 创建,并且 Sender 和 Receiver 都要创建一个相同的 Queue,然后才能协同工作。
MessageProducer:这个是通过 Session 创建来的,并且在创建此对象是要指定它要发送数据的 Queue, 它是数据的生产者(Producer),所以调用 send()方法就可以发送数据。
MessageConsumer: 这个是通过 Session 创建来的,并且在创建此对象是要指定它要读取数据的 Queue, 它是数据的消费者(Consumer),将 Consumer 实现 MessageListener 接口并重写 onMessage()方法,可以更简单完成接收数据的功能。
注意:
Connection 需要通过调用 start()方法来进行启动,启动之后才能接收到消息。
Queue 方式默认会持久化数据,在 data 目录下。通过 MessageProducer 的 setDeliveryMode(DeliveryMode.NON_PERSISTENT); 可以设置是否持久化数据。
由于 Queue 是 Point-to-Point 方式,所以 Queue 中的每个数据只能被一个 Consumer 所接收,
也就是 如果有多个 Consumer 从同一个 Queue 中取数据,那么同一条数据仅能被一个 Consumer 所接收,然后其它 Consumer 读取数据时,只能读取下一条数据。
ActiveMQ 的 Topic:
Topic:是 ActiveMQ 的核心之一,也是通过 Session 对象创建的。
Topic 方式与 Queue 方式的主要区别就是:Broker 中 topic 中的消息是以广播的形式发送出去,也就是说一条消息将被发送给所有连上该 Broker 的 Consumer。
-
持续订阅:
创建方式和 Queue 差不多,只是 TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, "t2");
理论上 Consumer 可以接收到 Topic 中的所有数据。
例如: 如果持久订阅者中途宕机,那么 Broker 会等到持久订阅者再次连上 Broker,然后将错过的消息再发送给持久订阅者。
造成的影响是:当持久订阅者处于 inactive 状态时,Broker 需要为持久订阅者保存消息,如果持久订阅者订阅的消息太多则会溢出。(当消息投递成功之后,Broker 就可以把消息删除了)
但仅限于中途宕机的情况,如果一个新的 Consumer 第一次连上一个已经启动了的 Broker,那么在此 Consumer 连上 Broker 之前的所有消息,Broker 不会再发给这个 Consumer,无论是否设置消息缓存。 -
非持续订阅:
一般方式:
创建一般方式和 Queue 基本一样,只是 MessageConsumer messageConsumer = session.createConsumer(topic);
非持续订阅的一般方式:无论是第一次连上还是中途宕机,只要是在 Consumer 离线过程中错过的消息,当 Consumer 再连上 Broker 时,Broker 都不会重发那些错过的数据,而是从最新的消息开始发。
所以 此种方式是可靠性不高。
提高非持久订阅者的可靠性 以及 订阅恢复策略
Retroactive Consumers:是一种非持续订阅者,创建方式是在创建 Topic 的时候指定 consumer 为 retroactive,例如 String topicName = "simon.topic?consumer.retroactive=true"
由于非持续订阅者可能中途宕机或者比 Broker 运行的要晚,所以可能会漏掉一些消息。此时,可以使用一些 恢复策略 来保证非持续订阅的可靠性。例如:
<subscriptionRecoveryPolicy>
<!-- 缓存最新的100条数据,若中途有Consumer连上Broker,那么Broker将这100条数据发给Consumer. -->
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>
</subscriptionRecoveryPolicy>
还有其他一些恢复订阅策略就不一 一介绍了。总之,恢复订阅策略针对的是非持久化的 retroactive consumer 订阅者而言的。它提高了非持久化消息的可靠性。
http://activemq.apache.org/subscription-recovery-policy.html
之前也了解过一点消息中间件,但却没有实际应用过。这次刚好项目需要,趁此机会,仔细了解了下,顺便做点笔记。其实也没啥好记的,官网内容很全,也很容易上手。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于