消息中间件之ActiveMQ原理及使用教程(持久化、JMS可靠消息、重试机制与幂等性问题)

消息中间件产生的背景

1、在客户端与服务器进行通讯时,客户端调用后,必须等待服务对象完成处理返回结果才能继续执行,这个过程是基于请求与响应的同步过程。

2、客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都都必须正常运行如果由于服务对象崩溃或者网络故障导致用户的请求不可达,客户会受到异常。

3、点对点通信: 客户的一次调用只发送给某个单独的目标对象。

 

同步除了可能会产生阻塞和超时的问题之外,还可能会导致接口重复提交、数据的幂等性问题

问1:针对同步接口中,接口的重复提交问题该怎么解决?   token+验证码

问2:同步过程中,由于外因(客户端网络慢)、外因(服务器压力大响应慢),假设A中的数据需要同步到B,但迟迟未能同步,就产生了数据的幂等性问题。这该怎么解决?

A调用B,如果B没有及时响应。A项目默认有3次重试补偿机制,将该信息存放在日志表(补偿表)中,使用定时任务每天晚上进行健康检查数据,再自己手动补偿,但这种补偿不是实时的。

 

同步有一个很大的局限性,那就是客户进程和服务对象进程都都必须正常运行,如果在执行定时任务手动补偿时,由于接口B宕机等原因,A的数据仍不能同步到B,那么这种补偿机制也不是那么可靠,根源就是在于同步。

 

 

什么是消息中间件


面向消息的中间件(MessageOrlented MiddlewareMOM)较好的解决了以上问题。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。

这种模式下,发送和接收是异步的,发送者无需等待;二者的生命周期未必相同发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行,这种异步机制显得更加灵活。同时可以实现一对多通信:对于一个消息可以有多个接收者

 

MQ(Message Queue)是一个非常经典的生产者与消费者模型,简单来说,提供接口给别人调用的是生产者,而调用接口的是消费者。  而消费者与生产者之间的通信,是通过队列实现的。

1、生产者向队列发送消息,如果消费者不在,消息缓存在队列中

2、当消费者从队列中获取到消息后,视为消息被消费,则该消息会从队列中移除,防止重复消费

 

思考:如果生产者在某一时候向队列中发送了大量消息,而消费者未能及时消费,会不会导致消费者挂掉?

换个角度想一下,如果使用传统的同步(请求与响应的过程),生产者A向消费者B某一时刻发送大量消息,B可能会直接宕机挂掉,而MQ不一样,MQ是异步处理的,生产者向消费者发送大量消息,都会缓存在队列中,排队消费,并且消息队列是可以持久化的。所以由于消息是可缓存的,消息中间件也是解决高并发的一种手段

 

JMS介绍

JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输

 

通讯模型的分类

1、点对点通信Point-to-Point

概念:一个生产者的消息只能给一个消费者消费,消息缓存在消息队列中,直到消息被消费或过期,才从队列中移除

生产者和消费者之间在时间上没有依赖性,也就是说当生产者发送了消息之后,不管消费者有没有正在运行,它不会影响到消息被发送到队列

消费者在成功接收消息之后需向队列应答成功

如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。

 

2、发布订阅Publish/Subscribe(Pub/Sub)

概念:发布者将消息发送到主题(topic),系统将这些消息传递给多个订阅者。

每个消息可以有多个消费者。

发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

消息的消费 
在JMS中,消息的产生是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。 
同步 
订阅者或消费者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞 
异步 
订阅者或消费者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

 

消息中间件的应用场景

用户注册、订单修改库存、日志存储

 

使用ActiveMQ实现点对点通讯模式

1、引入pom文件依赖

  <dependencies>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
  </dependencies>

 

2、书写原生的activemq点对点模型——生产者

package cn.itcats.active;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 使用ActiveMQ完成点对点模型
 * @author fatah
 * 生产者
 */
public class Producer {
	 public static void main(String[] args) throws JMSException {
		 //1.创建activemq连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
		//2.创建连接
		Connection conn = connectionFactory.createConnection();
		//3.启动连接
		conn.start();
		//4.创建session工厂
		/**
		 * 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED  
		 * 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 
		 * Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
		 * Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
		 * DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
		 */
		//不支持事务,自动确认
		Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//5.创建队列
		Destination myQueue = session.createQueue("My Frist Queue");
		//6.创建生产者
		MessageProducer producer = session.createProducer(myQueue);
		//7.是否持久化    不持久化
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//8.发送消息
		for (int i = 0; i < 5; i++) {
			System.out.println(i);
			sendMessage(session,producer,"hello world!  "+ i);
		}
		System.out.println("消息发送完毕");
	}
	 
	 public static void sendMessage(Session session, MessageProducer producer,String message) throws JMSException{
		 TextMessage textMessage = session.createTextMessage(message);
		 producer.send(textMessage);
	 }
	 
}

 

3、书写原生的activemq点对点模型——消费者

package cn.itcats.active;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 使用ActiveMQ完成点对点模型
 * @author fatah
 * 消费者者
 */
public class Consumer {
	 public static void main(String[] args) throws JMSException {
		 //1.创建activemq连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
		//2.创建连接
		Connection conn = connectionFactory.createConnection();
		//3.启动连接
		conn.start();
		//4.创建session工厂
		/**
		 * 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED  
		 * 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 
		 * Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
		 * Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
		 * DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
		 */
		//不支持事务,自动确认
		Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//5.创建队列
		Destination myQueue = session.createQueue("My Frist Queue");
		//6.创建消费者
		MessageConsumer consumer = session.createConsumer(myQueue);
		while(true){
			TextMessage mess = (TextMessage) consumer.receive();
			if(mess != null){
				String text = mess.getText();
				System.out.println("消费者获取到消息" + text);
			}else{
				break;
			}
		}
		
	}
}

点对点模式不会造成重复消费,一个生产者的消息只能给一个消费者消费。

 

 

使用ActiveMQ实现发布/订阅通讯模式

该模式实际开发过程中用的并不算多

书写原生的activemq发布/订阅模型——发布者

package cn.itcats.active;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 使用ActiveMQ完成发布/订阅模型
 * @author fatah
 * 发布者
 */
public class Publish {
	 public static void main(String[] args) throws JMSException {
		 //1.创建activemq连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
		//2.创建连接
		Connection conn = connectionFactory.createConnection();
		//3.启动连接
		conn.start();
		//4.创建session工厂
		/**
		 * 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED  
		 * 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 
		 * Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
		 * Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
		 * DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
		 */
		//不支持事务,自动确认
		Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//5.创建发布者
		MessageProducer producer = session.createProducer(null);
		//6.是否持久化    不持久化
		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		//7.发送消息
		for (int i = 0; i < 5; i++) {
			System.out.println(i);
			sendMessage(session,producer,"hello world!  "+ i);
		}
		System.out.println("消息发送完毕");
	}
	 
	 public static void sendMessage(Session session, MessageProducer producer,String message) throws JMSException{
		 TextMessage textMessage = session.createTextMessage(message);
		 Destination topic = session.createTopic("My Frist Topic");
		 producer.send(topic,textMessage);
	 }
	 
}

 

书写原生的activemq发布/订阅模型——订阅者

package cn.itcats.active;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 使用ActiveMQ完成发布/订阅模型
 * @author fatah
 * 订阅者
 */
public class Subscribe {
	 public static void main(String[] args) throws JMSException {
		 //1.创建activemq连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
		//2.创建连接
		Connection conn = connectionFactory.createConnection();
		//3.启动连接
		conn.start();
		//4.创建session工厂
		/**
		 * 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED  
		 * 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 
		 * Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
		 * Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
		 * DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
		 */
		//不支持事务,自动确认
		Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//5.创建发布者
		Destination topic = session.createTopic("My Frist Topic");
		MessageConsumer consumer = session.createConsumer(topic);
		while(true){
			TextMessage mess = (TextMessage) consumer.receive();
			if(mess != null){
				String text = mess.getText();
				System.out.println("消费者获取到消息" + text);
			}else{
				break;
			}
		}
		System.out.println("消费完毕");
	}
}

 

注意哦,发布订阅模型和点对点模型不一样,若发布消息之前未启动订阅者,则订阅者启动时,订阅者也无法接收到消息(未持久化状态)。另外,发布订阅模型,消费者是可以重新消费的。

 

 

ActiveMQ持久化【高可用】

ActiveMQ设置为持久化只需要在生产者端设置一个API即可:

MessageProducer producer = session.createProducer(myQueue);
//DeliveryMode.PERSISTENT为持久化
//DeliveryMode.NON_PERSISTENT为不进行持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

设置了activeMQ持久化后,即使当activeMQ宕机后重启,未被消费的消息依然能够存在于消息队列中,消费者仍能进行消费。

 

JMS可靠消息【签收多种方式】

1、自动签收,生产者向队列发送消息后,只要消费者监听了消息队列,消费者将立刻获得消息,不管消费者是否成功取得消息,过程是否抛出异常导致消费者无法获得消息,都不会触发重试机制。 缺点:没有事务机制,没有补偿机制

2、事务签收,对于生产者而言,生产者要想向消息队列发送消息,必须提交事务。对于消费者而言,如果消费没有提交事务,则默认表示没有消费,会触发重试机制。                                                                                       双方事务提交

3、手动签收,需要消费者手动签收,如果消费者没有进行签收,则默认消息没有被消费。   单方事务提交

//场景1  
//生产者不开启session,客户端必须有手动签收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
//消费者不开启session,客户端必须有手动签收模式
TextMessage textMessage = (TextMessage) createConsumer.receive();
//接受消息
textMessage.acknowledge();


//场景2
//生产者不开启session,客户端自动签收模式 
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//消费者不开启session,自动签收消息
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

//场景3
//事物消息 生产者以事物形式,必须要将消息提交事物,才可以提交到队列中。
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
//消费者 消费完后必须提交,不提交生产者不知道消费者消费了
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();

 

使用MQ的注意事项【合理重试及消息幂等性问题】

什么情况下mq会重试?

ActiveMQ在消费者端抛出异常时/网络延迟导致消费者未签收情况,MQ会自动重试

 

①合适选择重试机制

情况1:  消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试?      需要重试

情况2:  消费者获取到消息后,抛出数据转换异常,是否需要重试?                                      不需要重试

 

总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿

 

 

②消费者如果保证消息幂等性,不被重复消费。

产生原因:网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。

解决办法:

使用全局MessageID判断消费方使用同一个,解决幂等性。

伪代码:

//activeMQ提供了textMessageId,使用业务ID(订单号)也可以
String jmsMessageId = textMessage.getJMSMessageID();

//网络延迟环境下,第二次请求过来,应该使用全局ID判断该消息是否被使用过
if(jmsMessageId == redis内的id){
    //把消息签收掉,否则将继续重试,有些人觉得难理解,我解释一下:
     textMessage.acknowledge();   //避免第三次重试

    //消息可以重发,但是消息不能做重复操作,如重复向数据库中做插入操作,造成幂等性问题,
    //所以当第二次发送过来的时候,就可能造成重复提交问题,我们使用手动提交(业务中一般也是使用手动提交多)
    //手动提交可以把重复发送的消息从队列中移除,那么接下来就不会触发重试了。
}
    //将拿到的消息做业务处理,如插入修改操作等...
    //消费成功,把jmsMessageId放入redis

    

 

 

  • 6
    点赞
  • 26
    收藏
    觉得还不错? 一键收藏
  • 4
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 4
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值