定时任务系列之 Spring Scheduler

本贴最后更新于 1692 天前,其中的信息可能已经时移世异

回顾

我们在之前讲了定时任务的 crontab、timer、以及 ScheduledExecutorService 的实现,今天我们来讲讲 Spring Scheduler 实现。
没看前面文章的同学可以点击定时任务系列之 crontab&Timer 实现 以及 定时任务系列之 ScheduledExecutorService

Spring Scheduler

Spring Scheduler 是 Spring 框架提供的一个简单的定时任务实现。我们使用的时候非常简单,只需要添加几个注解就行。
主要是 org.springframework.scheduling.annotation 包下的类。我们先看一下怎么用,然后再分析一下其源码。

代码示例

可以是 xml 配置,也可以用注解实现。此处选择注解实现。

@Service
@Slf4j
@Data
public class SpringScheduleTest {
 
    private AtomicInteger taskNumber = new AtomicInteger(0);
    private AtomicInteger task1Number = new AtomicInteger(0);
 
    @Scheduled(cron = "*/5 * * * * ? ")
    public void remindTask() throws InterruptedException {
        log.info("每隔5秒执行一次, 当前线程名称{} 当前执行次数{}", Thread.currentThread().getName(), taskNumber.incrementAndGet());
    }
 
    /**
     * 固定频率执行。fixedDelay的单位是ms
     */
    @Scheduled(fixedDelay = 1000)
    public void remindTask2() throws InterruptedException {
        log.info("每隔1s执行一次 当前线程名称{} 当前执行次数{}", Thread.currentThread().getName(), task1Number.incrementAndGet());
    }
}
 
 
// !!!在启动类上面加上这个注解!!!
@SpringBootApplication
@EnableScheduling
public class QuartzApplication {
    public static void main(String[] args) {
        SpringApplication.run(QuartzApplication.class, args);
    }
}
 
// 配置线程池
@Configuration
public class ThreadPoolConfig {
 
    @Bean
    public ScheduledExecutorFactoryBean scheduledExecutorFactoryBean() {
        ScheduledExecutorFactoryBean factoryBean = new ScheduledExecutorFactoryBean();
        factoryBean.setPoolSize(20);
        factoryBean.setThreadNamePrefix("i'm time task thread - ");
        return factoryBean;
    }
 
}

看到使用的代码,优缺点也很明显了。

优点

  • 使用简单。
  • 支持 cron 表达式,支持固定频率执行。
  • 代码侵入性低。

缺点

  • 无法解决分布式调度问题。
  • 无法监控任务状态。
  • 无其他骚功能。如失败提醒,重试等等。

源码

/**
 * 处理带有@Schedule的方法
 **/
public Object postProcessAfterInitialization(Object bean, String beanName) {
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
            bean instanceof ScheduledExecutorService) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }

    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass)) {
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                            method, Scheduled.class, Schedules.class);
                    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                });
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
            if (logger.isTraceEnabled()) {
                logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
            }
        }
        else {
            // Non-empty set of methods
            annotatedMethods.forEach((method, scheduledMethods) ->
                    // 重点是processScheduled
                    scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
            if (logger.isTraceEnabled()) {
                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                        "': " + annotatedMethods);
            }
        }
    }
    return bean;
}

/**
 * Process the given {@code @Scheduled} method declaration on the given bean.
 * @param scheduled the @Scheduled annotation
 * @param method the method that the annotation has been declared on
 * @param bean the target bean instance
 * @see #createRunnable(Object, Method)
 */
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        // 创建执行任务
        Runnable runnable = createRunnable(bean, method);
        boolean processedSchedule = false;
        String errorMessage =
                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";

        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

        // Determine initial delay
        // 看看是不是要延迟执行(第一次任务)
        long initialDelay = scheduled.initialDelay();
        String initialDelayString = scheduled.initialDelayString();
        if (StringUtils.hasText(initialDelayString)) {
            Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
            if (this.embeddedValueResolver != null) {
                // 解析值
                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
            }
            if (StringUtils.hasLength(initialDelayString)) {
                try {
                    initialDelay = parseDelayAsLong(initialDelayString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                }
            }
        }

        // 检查cron表达式的值
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                // 解析cron表达式和时区设置
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }
            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                // 如果cron功能开启 那么准备添加任务
                if (!Scheduled.CRON_DISABLED.equals(cron)) {
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    // 添加cron任务
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }
        }

        // At this point we don't need to differentiate between initial delay set or not anymore
        if (initialDelay < 0) {
            initialDelay = 0;
        }

        // 检查是否是设置fixDelay参数的定时任务
        long fixedDelay = scheduled.fixedDelay();
        if (fixedDelay >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
        }
        String fixedDelayString = scheduled.fixedDelayString();
        if (StringUtils.hasText(fixedDelayString)) {
            if (this.embeddedValueResolver != null) {
                // 解析fixDelay的值
                fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
            }

            if (StringUtils.hasLength(fixedDelayString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    // 解析值变成long
                    fixedDelay = parseDelayAsLong(fixedDelayString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                }
                // 添加fixDelay任务
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
        }

        // 检查固定频率的定时任务
        long fixedRate = scheduled.fixedRate();
        if (fixedRate >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            // 添加到task中
            tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
        }
        String fixedRateString = scheduled.fixedRateString();
        if (StringUtils.hasText(fixedRateString)) {
            if (this.embeddedValueResolver != null) {
                // 解析注解值 
                fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
            }
            if (StringUtils.hasLength(fixedRateString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedRate = parseDelayAsLong(fixedRateString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                }
                // 添加任务
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
        }

        // Check whether we had any attribute set
        Assert.isTrue(processedSchedule, errorMessage);

        // Finally register the scheduled tasks 注册调度任务
        synchronized (this.scheduledTasks) {
            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
            regTasks.addAll(tasks);
        }
    }
    catch (IllegalArgumentException ex) {
        throw new IllegalStateException(
                "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
    }
}

// cron调度过程如下。此代码位置在org.springframework.scheduling.concurrent.ConcurrentTaskScheduler
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
    try {
        // 是不是允许并发触发
        if (this.enterpriseConcurrentScheduler) {
            return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
        }
        else {
            ErrorHandler errorHandler =
                    (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
            return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
        }
    }
    catch (RejectedExecutionException ex) {
        throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
    }
}

// 并发调度的调度器
private class EnterpriseConcurrentTriggerScheduler {
    // 并发调度的话,执行的时候是采用了future的形式,获取下次触发时间
    // 用 ManagedScheduledExecutorService 进行定时调度
    public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
        ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
        return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
            @Override
            @Nullable
            // 计算触发时间
            public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {
                return (trigger.nextExecutionTime(le != null ?
                        new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
                        new SimpleTriggerContext()));
            }
            @Override
            public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
                return false;
            }
        });
    }
}

// 非并发的调度方式
@Nullable
public ScheduledFuture<?> schedule() {
    // 先加锁
    synchronized (this.triggerContextMonitor) {
        // 获取调度执行时间
        this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
        if (this.scheduledExecutionTime == null) {
            return null;
        }
        long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
        // 此处是重点,这个 executor 就是我们上一篇讲到的ScheduledExecutorService。
        this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
        return this;
    }
}

fixDelay & fixRate 的区别

英语不太好的朋友可能看代码注释的时候不是很好理解,我第一遍是没看懂(英语菜鸡)...
简单说来,fixDelay 就是在任务执行后的【fixDelay】时间后,执行下次任务。fixRate 就是不管任务执行时间,我就是 fixRate 这个时间一到,我就执行下一个任务。
假设有一个定时任务,执行时间固定是一秒。分别有两个定时任务配置,一个是 A(fixDelay 1000),一个是 B(fixRate 1000)。那么 A、B 分别在 09:00:00 执行第一次任务,那么 A 执行第二次的任务是 9:00:02(执行完第一次花费一秒,再等待 1s),B 执行第二次任务是 09:00:01(等待 1s,无需管任务执行时间)。

总结

因为底层实现原因(也采用了上一篇我们提到的 ScheduledExecutorService),Spring Scheduler 没有给我们提供一个比较完善的定时任务解决方案,尤其是在分布式流行的现在。但是针对于一些小的应用程序还是比较方便的。

下一篇我们讲 Quartz,敬请期待...


  • Spring

    Spring 是一个开源框架,是于 2003 年兴起的一个轻量级的 Java 开发框架,由 Rod Johnson 在其著作《Expert One-On-One J2EE Development and Design》中阐述的部分理念和原型衍生而来。它是为了解决企业应用开发的复杂性而创建的。框架的主要优势之一就是其分层架构,分层架构允许使用者选择使用哪一个组件,同时为 JavaEE 应用程序开发提供集成的框架。

    940 引用 • 1458 回帖 • 158 关注
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3167 引用 • 8207 回帖
  • cron
    11 引用 • 3 回帖
  • 定时任务
    14 引用 • 27 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...