定时任务系列之 Spring Scheduler

本贴最后更新于 2054 天前,其中的信息可能已经时移世异

回顾

我们在之前讲了定时任务的 crontab、timer、以及 ScheduledExecutorService 的实现,今天我们来讲讲 Spring Scheduler 实现。
没看前面文章的同学可以点击定时任务系列之 crontab&Timer 实现 以及 定时任务系列之 ScheduledExecutorService

Spring Scheduler

Spring Scheduler 是 Spring 框架提供的一个简单的定时任务实现。我们使用的时候非常简单,只需要添加几个注解就行。
主要是 org.springframework.scheduling.annotation 包下的类。我们先看一下怎么用,然后再分析一下其源码。

代码示例

可以是 xml 配置,也可以用注解实现。此处选择注解实现。

@Service @Slf4j @Data public class SpringScheduleTest { private AtomicInteger taskNumber = new AtomicInteger(0); private AtomicInteger task1Number = new AtomicInteger(0); @Scheduled(cron = "*/5 * * * * ? ") public void remindTask() throws InterruptedException { log.info("每隔5秒执行一次, 当前线程名称{} 当前执行次数{}", Thread.currentThread().getName(), taskNumber.incrementAndGet()); } /** * 固定频率执行。fixedDelay的单位是ms */ @Scheduled(fixedDelay = 1000) public void remindTask2() throws InterruptedException { log.info("每隔1s执行一次 当前线程名称{} 当前执行次数{}", Thread.currentThread().getName(), task1Number.incrementAndGet()); } } // !!!在启动类上面加上这个注解!!! @SpringBootApplication @EnableScheduling public class QuartzApplication { public static void main(String[] args) { SpringApplication.run(QuartzApplication.class, args); } } // 配置线程池 @Configuration public class ThreadPoolConfig { @Bean public ScheduledExecutorFactoryBean scheduledExecutorFactoryBean() { ScheduledExecutorFactoryBean factoryBean = new ScheduledExecutorFactoryBean(); factoryBean.setPoolSize(20); factoryBean.setThreadNamePrefix("i'm time task thread - "); return factoryBean; } }

看到使用的代码,优缺点也很明显了。

优点

  • 使用简单。
  • 支持 cron 表达式,支持固定频率执行。
  • 代码侵入性低。

缺点

  • 无法解决分布式调度问题。
  • 无法监控任务状态。
  • 无其他骚功能。如失败提醒,重试等等。

源码

/** * 处理带有@Schedule的方法 **/ public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> // 重点是processScheduled scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; } /** * Process the given {@code @Scheduled} method declaration on the given bean. * @param scheduled the @Scheduled annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance * @see #createRunnable(Object, Method) */ protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 创建执行任务 Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay // 看看是不是要延迟执行(第一次任务) long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { // 解析值 initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // 检查cron表达式的值 String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { // 解析cron表达式和时区设置 cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; // 如果cron功能开启 那么准备添加任务 if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 添加cron任务 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // 检查是否是设置fixDelay参数的定时任务 long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { // 解析fixDelay的值 fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { // 解析值变成long fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } // 添加fixDelay任务 tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // 检查固定频率的定时任务 long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; // 添加到task中 tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { // 解析注解值 fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } // 添加任务 tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks 注册调度任务 synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } } // cron调度过程如下。此代码位置在org.springframework.scheduling.concurrent.ConcurrentTaskScheduler public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { try { // 是不是允许并发触发 if (this.enterpriseConcurrentScheduler) { return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); } else { ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } } // 并发调度的调度器 private class EnterpriseConcurrentTriggerScheduler { // 并发调度的话,执行的时候是采用了future的形式,获取下次触发时间 // 用 ManagedScheduledExecutorService 进行定时调度 public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) { ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor; return executor.schedule(task, new javax.enterprise.concurrent.Trigger() { @Override @Nullable // 计算触发时间 public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) { return (trigger.nextExecutionTime(le != null ? new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) : new SimpleTriggerContext())); } @Override public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) { return false; } }); } } // 非并发的调度方式 @Nullable public ScheduledFuture<?> schedule() { // 先加锁 synchronized (this.triggerContextMonitor) { // 获取调度执行时间 this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); // 此处是重点,这个 executor 就是我们上一篇讲到的ScheduledExecutorService。 this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } }

fixDelay & fixRate 的区别

英语不太好的朋友可能看代码注释的时候不是很好理解,我第一遍是没看懂(英语菜鸡)...
简单说来,fixDelay 就是在任务执行后的【fixDelay】时间后,执行下次任务。fixRate 就是不管任务执行时间,我就是 fixRate 这个时间一到,我就执行下一个任务。
假设有一个定时任务,执行时间固定是一秒。分别有两个定时任务配置,一个是 A(fixDelay 1000),一个是 B(fixRate 1000)。那么 A、B 分别在 09:00:00 执行第一次任务,那么 A 执行第二次的任务是 9:00:02(执行完第一次花费一秒,再等待 1s),B 执行第二次任务是 09:00:01(等待 1s,无需管任务执行时间)。

总结

因为底层实现原因(也采用了上一篇我们提到的 ScheduledExecutorService),Spring Scheduler 没有给我们提供一个比较完善的定时任务解决方案,尤其是在分布式流行的现在。但是针对于一些小的应用程序还是比较方便的。

下一篇我们讲 Quartz,敬请期待...


  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    948 引用 • 1460 回帖
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3196 引用 • 8215 回帖
  • cron
    11 引用 • 3 回帖
  • 定时任务
    14 引用 • 27 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...