RocketMQ 进阶 - 延时消息

本贴最后更新于 1646 天前,其中的信息可能已经天翻地覆

前言

在开发中经常会遇到延时任务的需求,例如在 12306 购买车票,若生成订单 30 分钟未支付则自动取消;还有在线商城完成订单后 48 小时不评价 ,自动 5 星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务

那么如何实现延迟任务呢?

第一反应是利用 cron 方案来实现:
image.png

启动一个 cron 定时任务,每隔一段时间执行一次,比如 30 分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页 update,这将是一个 for 循环更新操作。

cron 方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:

  • 当数据量大的时候轮询效率低;
  • 时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到 1 小时;
  • 如果通过增加 cron 轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;

既然 cron 方案不是很理想,那就请出我们今天的主角,使用 RocketMQ 的延时消息解决。
在创建订单的时候发送一条延时消息到 RocketMQ,30 分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。

实现

RocketMQ 延迟队列的核心思路是:所有的延迟消息由 producer 发出之后,都会存放到同一个 topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的 topic 下,此时对于 consumer 端此消息才可见,从而被 consumer 消费。

注意:RocketMQ 不支持任意时间的延时,只支持以下几个固定的延时等级
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

下面我们结合 SprintBoot 利用 RocketMQ 发送延时消息

  • 引入 RocketMQ 组件
<dependency>
	<groupId>org.apache.rocketmq</groupId>
	<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
  • 增加 RocketMQ 的配置
rocketmq:
  name-server: 172.31.0.44:9876
  producer:
    group: delay-group
  • 编写生产者
@Component
@Slf4j
public class DelayProduce {
    @Autowired
    private RocketMQTemplate rocketMQTemplatet;

    public void sendDelayMessage(String topic,String message,int delayLevel){
       SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
        log.info("sendtime is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
        log.info("sendResult is{}",sendResult);
    }
}
  • 编写消费者
@Slf4j
@Component
@RocketMQMessageListener(
        topic = "delay-topic",
        consumerGroup = "delay-group"
)
public class DelayConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("received message time is {}", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
        log.info("received message is {}",message);
    }
}
  • 测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest {
    @Autowired
    private DelayProduce delayProduce;

    @Test
    public void sendDelayMessage() {
        delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);
    }
}

这里 delayLevel 设置成 5,对应 RocketMQ 的延时等级就是 1 分钟后投递消息。

  • 运行结果
    image.png
    发送时间
    image.png
    消费时间

修改延时级别

RocketMQ 的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的 level。例如:你想支持 1 天的延迟,修改最后一个 level 的值为 1d,这个时候依然是 18 个 level;也可以增加一个 1d,这个时候总共就有 19 个 level。

  • 打开 RocketMQ 的配置文件,修改 messageDelayLevel 属性
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
storePathRootDir = /app/rocketmq/data
messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这次将延时等级 1 修改成了 90s,生产者发送消息后需要 90s 后再进行消息投递。修改完成后重启 RocketMQ。 nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

  • 使用延时等级 1 发送消息
public void sendDelayMessage() {
	delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);
}
  • 测试
    image.png
    发送时间
    image.png
    消费时间

通过比对发送时间与消费时间证明延时等级修改生效。

RocketMQ 相关文章

  • RocketMQ
    22 引用 • 10 回帖
  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    944 引用 • 1459 回帖 • 17 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...