项目已经开源 ,源码地址: RedisDelayQueue
一、引入 pom
将项目中的 redis-delay-queue-core 模块打包 推送到自己公司的中央仓库,然后引入 pom 依赖
<dependency> <artifactId>redis-delay-queue-core</artifactId> <groupId>com.shirc</groupId> <version>0.0.1-SNAPSHOT</version> </dependency>
二、将 RedisDelayQueue 被 Spring 管理
/** * @Description 引入 redisdelayqueue * @Author shirenchuang * @Date 2019/8/6 5:58 PM **/ @Component public class DelayConfig { @Autowired private RedisTemplate redisTemplate; @Bean public RedisDelayQueueContext getRdctx(){ /**传入redisTemplate实例, 第二个参数为项目名 projectName ;不同项目需要设置不一样**/ RedisDelayQueueContext context = new RedisDelayQueueContext(redisTemplate,"maybach-go"); return context; } /**加了这个 可以在其他地方直接 @Autowire RedisDelayQueue 使用了**/ @Bean public RedisDelayQueue getRedisOperation(RedisDelayQueueContext context){ return context.getRedisDelayQueue(); } }
三、注册 Topic 任务
/** * @Description 注册延迟队列 Demo * @Author shirenchuang * @Date 2019/8/8 10:07 AM **/ @Service public class DelayQueueDemoJob extends AbstractTopicRegister<DemoArgs> { @Override public String getTopic() { return DelayJobTopicEnums.DEMO_TEST.getTopic(); } @Override public void execute(DemoArgs demoArgs) { // 延迟任务回调接口 //id : 这个Topic下的唯一值 String id = demoArgs.getId(); //重试次数; 如果回调接口超时失败, 调用失败,会自动重试2次; 这个代表重试次数 int retryCount = demoArgs.getRetryCount(); //DemoArgs 是要继承 Args的; 如果没有自己需要定义的回调参数; 泛型那里直接写 Args就行了 System.out.println(demoArgs.getTest()); } /******下面方法可选重写 根据消费情况可以自行调节 超时时间,线程池大小等等******/ /** * 重试2次仍然失败; 通知接口; 可以在这个接口写自己的通知逻辑; 比如发送邮件或者钉钉消息 * @param demoArgs */ @Override public void retryOutTimes(DemoArgs demoArgs) { super.retryOutTimes(demoArgs); } /** * 设置核心线程池数量 默认20 * @return */ @Override public int getCorePoolSize() { return super.getCorePoolSize(); } /** * 设置线程池最大线程数量 默认100 * @return */ @Override public int getMaxPoolSize() { return super.getMaxPoolSize(); } /** * 获取 回调方法的超时时间 默认回调接口超时时间 6秒 * @return */ @Override public int getMethodTimeout() { return super.getMethodTimeout(); } }
上面的 DemoArgs
/** * @Description 回调参数Demo * @Author shirenchuang * @Date 2019/8/8 10:07 AM **/ public class DemoArgs extends Args { private String test; public String getTest() { return test; } public void setTest(String test) { this.test = test; } }
如果没有自己定义的回调属性,或者只需要一个 id ,那么泛型那里传 Args 就行了,这个会返回 id 的;
下面就是已经定义好的 Args
public class Args implements Serializable { private static final long serialVersionUID = 66666l; /**唯一键 不能为空**/ private String id; /** * 已经重试的次数: * 重试机制: 默认重试2次; 总共最多执行3次 * 添加任务的时候可以设置为<0 的值;则表示不希望重试; * 回调接口自己做好幂等 ***/ private int retryCount; /** * 重入次数: * 这里标记的是当前Job某些异常情况导致并没有真正消费到,然后重新放入待消费池的次数; * 比如: BLPOP出来了之后,在去获取Job的时候redis超时了,导致没有正常消费掉; * 重入次数最大 3次; 避免某些不可控因素出现,超过3次则丢弃 */ private int reentry; public Args() { } public Args(String id) { this.id = id; } public Args(String id, int retryCount) { this.id = id; this.retryCount = retryCount; } public String getId() { return id; } public void setId(String id) { this.id = id; } public int getRetryCount() { return retryCount; } public void setRetryCount(int retryCount) { this.retryCount = retryCount; } public int getReentry() { return reentry; } public void setReentry(int reentry) { this.reentry = reentry; } @Override public String toString() { return "Args{" + "id='" + id + '\'' + ", retryCount=" + retryCount + ", reentry=" + reentry + '}'; }
四、建议新建一个 Topic 的枚举类
因为 新增任务的 Topic 和 注册地方的 Topic,还有删除 Topic 要一致,建议用枚举
/** * @Description 所有延迟任务的Topic * @Author shirenchuang **/ public enum DelayJobTopicEnums { DEMO_TEST("DEMO_TEST","测试"), ; private String topic; private String desc; DelayJobTopicEnums(String topic, String desc) { this.topic = topic; this.desc = desc; } public String getTopic() { return topic; } public String getDesc() { return desc; } }
五、如何新增一个延迟任务 、删除一个延迟任务
/** * @Description 新增删除延迟任务 * @Author shirenchuang * @Date 2019/8/8 10:29 AM **/ @Component public class DelayQueueUseDemo { @Autowired RedisDelayQueue redisDelayQueue; private void addDelayQueue(){ //do something //新增一个延迟任务 DemoArgs demoArgs = new DemoArgs(); demoArgs.setId(UUID.randomUUID().toString()); //设置-1 表示我不想要重试 demoArgs.setRetryCount(-1); demoArgs.setTest("我是个Test"); //异步新增一个 一分钟之后执行的延时任务 redisDelayQueue.addAsync(demoArgs,DelayJobTopicEnums.DEMO_TEST.getTopic(),60000); //也可以同步新增延迟任务 redisDelayQueue.add(demoArgs,60000,DelayJobTopicEnums.DEMO_TEST.getTopic(),RunTypeEnum.SYNC); //也可以指定某个时间点执行 redisDelayQueue.add(demoArgs,DelayJobTopicEnums.DEMO_TEST.getTopic(),System.currentTimeMillis()+60000,RunTypeEnum.ASYNC); /**PS:如果同一个ID添加了多次,以最新添加的为准,会覆盖之前的**/ /**如果自己没有需要回调的参数 直接用Args**/ redisDelayQueue.addAsync(new Args(id),DelayJobTopicEnums.DEMO_TEST.getTopic(),60000); } //删除一个之前添加的延迟任务 private void delDelayQueue(String id){ //异步删除 redisDelayQueue.deleteAsync(DelayJobTopicEnums.DEMO_TEST.getTopic(),id); //同步删除 redisDelayQueue.delete(DelayJobTopicEnums.DEMO_TEST.getTopic(),id,RunTypeEnum.SYNC); } }
六、配置日志
在 logback.xml 里面新增如下配置
<!-- 加入 redis-delay-queue的日志配置 --> <appender name="redis_dq_file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/redis_dq.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 日志文件输出文件名 --> <FileNamePattern>${LOG_HOME}/redis_dq.log.%d{yyyy-MM-dd}</FileNamePattern> <!-- 日志文件保留天数 --> <MaxHistory>30</MaxHistory> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!-- 格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度,%msg:日志消息,%n是换行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <!-- 过滤掉低于INFO级别的日志 --> <level>INFO</level> </filter> </appender> <!-- 延迟任务异常日志 --> <appender name="redis_dq_error_file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/redis_dq_error.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 日志文件输出文件名 --> <FileNamePattern>${LOG_HOME}/redis_dq_error.log.%d{yyyy-MM-dd}</FileNamePattern> <!-- 日志文件保留天数 --> <MaxHistory>30</MaxHistory> </rollingPolicy> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <!-- 格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度,%msg:日志消息,%n是换行符 --> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <!-- 过滤掉低于ERROR级别的日志 --> <level>ERROR</level> </filter> </appender> <!-- redis_delay_queue 日志 --> <logger name="com.shirc.redis.delay.queue" level="INFO" additivity="false"> <appender-ref ref="redis_dq_file" /> </logger> <logger name="com.shirc.redis.delay.queue" level="INFO" additivity="false"> <appender-ref ref="redis_dq_error_file" /> </logger>
然后所有的日志都在 redis_dq.log 中; 所有的异常日志都在 redis_dq_error.log 中;
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于