消息中间件产生的背景
1、在客户端与服务器进行通讯时,客户端调用后,必须等待服务对象完成处理返回结果才能继续执行,这个过程是基于请求与响应的同步过程。
2、客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都都必须正常运行;如果由于服务对象崩溃或者网络故障导致用户的请求不可达,客户会受到异常。
3、点对点通信: 客户的一次调用只发送给某个单独的目标对象。
同步除了可能会产生阻塞和超时的问题之外,还可能会导致接口重复提交、数据的幂等性问题。
问1:针对同步接口中,接口的重复提交问题该怎么解决? token+验证码
问2:同步过程中,由于外因(客户端网络慢)、外因(服务器压力大响应慢),假设A中的数据需要同步到B,但迟迟未能同步,就产生了数据的幂等性问题。这该怎么解决?
A调用B,如果B没有及时响应。A项目默认有3次重试补偿机制,将该信息存放在日志表(补偿表)中,使用定时任务每天晚上进行健康检查数据,再自己手动补偿,但这种补偿不是实时的。
同步有一个很大的局限性,那就是客户进程和服务对象进程都都必须正常运行,如果在执行定时任务手动补偿时,由于接口B宕机等原因,A的数据仍不能同步到B,那么这种补偿机制也不是那么可靠,根源就是在于同步。
什么是消息中间件
面向消息的中间件(MessageOrlented MiddlewareMOM)较好的解决了以上问题。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。
这种模式下,发送和接收是异步的,发送者无需等待;二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行,这种异步机制显得更加灵活。同时可以实现一对多通信:对于一个消息可以有多个接收者。
MQ(Message Queue)是一个非常经典的生产者与消费者模型,简单来说,提供接口给别人调用的是生产者,而调用接口的是消费者。 而消费者与生产者之间的通信,是通过队列实现的。
1、生产者向队列发送消息,如果消费者不在,消息缓存在队列中。
2、当消费者从队列中获取到消息后,视为消息被消费,则该消息会从队列中移除,防止重复消费。
思考:如果生产者在某一时候向队列中发送了大量消息,而消费者未能及时消费,会不会导致消费者挂掉?
换个角度想一下,如果使用传统的同步(请求与响应的过程),生产者A向消费者B某一时刻发送大量消息,B可能会直接宕机挂掉,而MQ不一样,MQ是异步处理的,生产者向消费者发送大量消息,都会缓存在队列中,排队消费,并且消息队列是可以持久化的。所以由于消息是可缓存的,消息中间件也是解决高并发的一种手段。
JMS介绍
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
通讯模型的分类
1、点对点通信Point-to-Point
概念:一个生产者的消息只能给一个消费者消费,消息缓存在消息队列中,直到消息被消费或过期,才从队列中移除。
生产者和消费者之间在时间上没有依赖性,也就是说当生产者发送了消息之后,不管消费者有没有正在运行,它不会影响到消息被发送到队列
消费者在成功接收消息之后需向队列应答成功
如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。
2、发布订阅Publish/Subscribe(Pub/Sub)
概念:发布者将消息发送到主题(topic),系统将这些消息传递给多个订阅者。
每个消息可以有多个消费者。
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。
消息的消费
在JMS中,消息的产生是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
同步
订阅者或消费者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞
异步
订阅者或消费者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
消息中间件的应用场景
用户注册、订单修改库存、日志存储
使用ActiveMQ实现点对点通讯模式
1、引入pom文件依赖
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
</dependencies>
2、书写原生的activemq点对点模型——生产者
package cn.itcats.active;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 使用ActiveMQ完成点对点模型
* @author fatah
* 生产者
*/
public class Producer {
public static void main(String[] args) throws JMSException {
//1.创建activemq连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
//2.创建连接
Connection conn = connectionFactory.createConnection();
//3.启动连接
conn.start();
//4.创建session工厂
/**
* 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
* 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
* Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
* Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
* DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
*/
//不支持事务,自动确认
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.创建队列
Destination myQueue = session.createQueue("My Frist Queue");
//6.创建生产者
MessageProducer producer = session.createProducer(myQueue);
//7.是否持久化 不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//8.发送消息
for (int i = 0; i < 5; i++) {
System.out.println(i);
sendMessage(session,producer,"hello world! "+ i);
}
System.out.println("消息发送完毕");
}
public static void sendMessage(Session session, MessageProducer producer,String message) throws JMSException{
TextMessage textMessage = session.createTextMessage(message);
producer.send(textMessage);
}
}
3、书写原生的activemq点对点模型——消费者
package cn.itcats.active;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 使用ActiveMQ完成点对点模型
* @author fatah
* 消费者者
*/
public class Consumer {
public static void main(String[] args) throws JMSException {
//1.创建activemq连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
//2.创建连接
Connection conn = connectionFactory.createConnection();
//3.启动连接
conn.start();
//4.创建session工厂
/**
* 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
* 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
* Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
* Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
* DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
*/
//不支持事务,自动确认
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.创建队列
Destination myQueue = session.createQueue("My Frist Queue");
//6.创建消费者
MessageConsumer consumer = session.createConsumer(myQueue);
while(true){
TextMessage mess = (TextMessage) consumer.receive();
if(mess != null){
String text = mess.getText();
System.out.println("消费者获取到消息" + text);
}else{
break;
}
}
}
}
点对点模式不会造成重复消费,一个生产者的消息只能给一个消费者消费。
使用ActiveMQ实现发布/订阅通讯模式
该模式实际开发过程中用的并不算多
书写原生的activemq发布/订阅模型——发布者
package cn.itcats.active;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 使用ActiveMQ完成发布/订阅模型
* @author fatah
* 发布者
*/
public class Publish {
public static void main(String[] args) throws JMSException {
//1.创建activemq连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
//2.创建连接
Connection conn = connectionFactory.createConnection();
//3.启动连接
conn.start();
//4.创建session工厂
/**
* 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
* 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
* Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
* Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
* DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
*/
//不支持事务,自动确认
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.创建发布者
MessageProducer producer = session.createProducer(null);
//6.是否持久化 不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//7.发送消息
for (int i = 0; i < 5; i++) {
System.out.println(i);
sendMessage(session,producer,"hello world! "+ i);
}
System.out.println("消息发送完毕");
}
public static void sendMessage(Session session, MessageProducer producer,String message) throws JMSException{
TextMessage textMessage = session.createTextMessage(message);
Destination topic = session.createTopic("My Frist Topic");
producer.send(topic,textMessage);
}
}
书写原生的activemq发布/订阅模型——订阅者
package cn.itcats.active;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 使用ActiveMQ完成发布/订阅模型
* @author fatah
* 订阅者
*/
public class Subscribe {
public static void main(String[] args) throws JMSException {
//1.创建activemq连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://127.0.0.1:61616");
//2.创建连接
Connection conn = connectionFactory.createConnection();
//3.启动连接
conn.start();
//4.创建session工厂
/**
* 是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
* 第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
* Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
* Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
* DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
*/
//不支持事务,自动确认
Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//5.创建发布者
Destination topic = session.createTopic("My Frist Topic");
MessageConsumer consumer = session.createConsumer(topic);
while(true){
TextMessage mess = (TextMessage) consumer.receive();
if(mess != null){
String text = mess.getText();
System.out.println("消费者获取到消息" + text);
}else{
break;
}
}
System.out.println("消费完毕");
}
}
注意哦,发布订阅模型和点对点模型不一样,若发布消息之前未启动订阅者,则订阅者启动时,订阅者也无法接收到消息(未持久化状态)。另外,发布订阅模型,消费者是可以重新消费的。
ActiveMQ持久化【高可用】
ActiveMQ设置为持久化只需要在生产者端设置一个API即可:
MessageProducer producer = session.createProducer(myQueue);
//DeliveryMode.PERSISTENT为持久化
//DeliveryMode.NON_PERSISTENT为不进行持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
设置了activeMQ持久化后,即使当activeMQ宕机后重启,未被消费的消息依然能够存在于消息队列中,消费者仍能进行消费。
JMS可靠消息【签收多种方式】
1、自动签收,生产者向队列发送消息后,只要消费者监听了消息队列,消费者将立刻获得消息,不管消费者是否成功取得消息,过程是否抛出异常导致消费者无法获得消息,都不会触发重试机制。 缺点:没有事务机制,没有补偿机制
2、事务签收,对于生产者而言,生产者要想向消息队列发送消息,必须提交事务。对于消费者而言,如果消费没有提交事务,则默认表示没有消费,会触发重试机制。 双方事务提交
3、手动签收,需要消费者手动签收,如果消费者没有进行签收,则默认消息没有被消费。 单方事务提交
//场景1
//生产者不开启session,客户端必须有手动签收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
//消费者不开启session,客户端必须有手动签收模式
TextMessage textMessage = (TextMessage) createConsumer.receive();
//接受消息
textMessage.acknowledge();
//场景2
//生产者不开启session,客户端自动签收模式
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//消费者不开启session,自动签收消息
Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//场景3
//事物消息 生产者以事物形式,必须要将消息提交事物,才可以提交到队列中。
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
//消费者 消费完后必须提交,不提交生产者不知道消费者消费了
Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
session.commit();
使用MQ的注意事项【合理重试及消息幂等性问题】
什么情况下mq会重试?
ActiveMQ在消费者端抛出异常时/网络延迟导致消费者未签收情况,MQ会自动重试
①合适选择重试机制
情况1: 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试
情况2: 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要重试
总结:对于情况2,如果消费者代码抛出异常是需要发布新版本才能解决的问题,那么不需要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿
②消费者如果保证消息幂等性,不被重复消费。
产生原因:网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
解决办法:
使用全局MessageID判断消费方使用同一个,解决幂等性。
伪代码:
//activeMQ提供了textMessageId,使用业务ID(订单号)也可以
String jmsMessageId = textMessage.getJMSMessageID();
//网络延迟环境下,第二次请求过来,应该使用全局ID判断该消息是否被使用过
if(jmsMessageId == redis内的id){
//把消息签收掉,否则将继续重试,有些人觉得难理解,我解释一下:
textMessage.acknowledge(); //避免第三次重试
//消息可以重发,但是消息不能做重复操作,如重复向数据库中做插入操作,造成幂等性问题,
//所以当第二次发送过来的时候,就可能造成重复提交问题,我们使用手动提交(业务中一般也是使用手动提交多)
//手动提交可以把重复发送的消息从队列中移除,那么接下来就不会触发重试了。
}
//将拿到的消息做业务处理,如插入修改操作等...
//消费成功,把jmsMessageId放入redis