回顾
我们在之前讲了定时任务的 crontab、timer、以及 ScheduledExecutorService 的实现,今天我们来讲讲 Spring Scheduler 实现。
没看前面文章的同学可以点击定时任务系列之 crontab&Timer 实现 以及 定时任务系列之 ScheduledExecutorService
Spring Scheduler
Spring Scheduler 是 Spring 框架提供的一个简单的定时任务实现。我们使用的时候非常简单,只需要添加几个注解就行。
主要是 org.springframework.scheduling.annotation
包下的类。我们先看一下怎么用,然后再分析一下其源码。
代码示例
可以是 xml 配置,也可以用注解实现。此处选择注解实现。
@Service
@Slf4j
@Data
public class SpringScheduleTest {
private AtomicInteger taskNumber = new AtomicInteger(0);
private AtomicInteger task1Number = new AtomicInteger(0);
@Scheduled(cron = "*/5 * * * * ? ")
public void remindTask() throws InterruptedException {
log.info("每隔5秒执行一次, 当前线程名称{} 当前执行次数{}", Thread.currentThread().getName(), taskNumber.incrementAndGet());
}
/**
* 固定频率执行。fixedDelay的单位是ms
*/
@Scheduled(fixedDelay = 1000)
public void remindTask2() throws InterruptedException {
log.info("每隔1s执行一次 当前线程名称{} 当前执行次数{}", Thread.currentThread().getName(), task1Number.incrementAndGet());
}
}
// !!!在启动类上面加上这个注解!!!
@SpringBootApplication
@EnableScheduling
public class QuartzApplication {
public static void main(String[] args) {
SpringApplication.run(QuartzApplication.class, args);
}
}
// 配置线程池
@Configuration
public class ThreadPoolConfig {
@Bean
public ScheduledExecutorFactoryBean scheduledExecutorFactoryBean() {
ScheduledExecutorFactoryBean factoryBean = new ScheduledExecutorFactoryBean();
factoryBean.setPoolSize(20);
factoryBean.setThreadNamePrefix("i'm time task thread - ");
return factoryBean;
}
}
看到使用的代码,优缺点也很明显了。
优点
- 使用简单。
- 支持 cron 表达式,支持固定频率执行。
- 代码侵入性低。
缺点
- 无法解决分布式调度问题。
- 无法监控任务状态。
- 无其他骚功能。如失败提醒,重试等等。
源码
/**
* 处理带有@Schedule的方法
**/
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass)) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
// 重点是processScheduled
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
/**
* Process the given {@code @Scheduled} method declaration on the given bean.
* @param scheduled the @Scheduled annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
// 创建执行任务
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
// 看看是不是要延迟执行(第一次任务)
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
// 解析值
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// 检查cron表达式的值
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
// 解析cron表达式和时区设置
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
// 如果cron功能开启 那么准备添加任务
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
// 添加cron任务
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// 检查是否是设置fixDelay参数的定时任务
long fixedDelay = scheduled.fixedDelay();
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
// 解析fixDelay的值
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
// 解析值变成long
fixedDelay = parseDelayAsLong(fixedDelayString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
// 添加fixDelay任务
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// 检查固定频率的定时任务
long fixedRate = scheduled.fixedRate();
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
// 添加到task中
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
// 解析注解值
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = parseDelayAsLong(fixedRateString);
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
// 添加任务
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks 注册调度任务
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
// cron调度过程如下。此代码位置在org.springframework.scheduling.concurrent.ConcurrentTaskScheduler
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
// 是不是允许并发触发
if (this.enterpriseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler =
(this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
}
}
// 并发调度的调度器
private class EnterpriseConcurrentTriggerScheduler {
// 并发调度的话,执行的时候是采用了future的形式,获取下次触发时间
// 用 ManagedScheduledExecutorService 进行定时调度
public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
@Override
@Nullable
// 计算触发时间
public Date getNextRunTime(@Nullable LastExecution le, Date taskScheduledTime) {
return (trigger.nextExecutionTime(le != null ?
new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
new SimpleTriggerContext()));
}
@Override
public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
return false;
}
});
}
}
// 非并发的调度方式
@Nullable
public ScheduledFuture<?> schedule() {
// 先加锁
synchronized (this.triggerContextMonitor) {
// 获取调度执行时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
// 此处是重点,这个 executor 就是我们上一篇讲到的ScheduledExecutorService。
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
fixDelay & fixRate 的区别
英语不太好的朋友可能看代码注释的时候不是很好理解,我第一遍是没看懂(英语菜鸡)...
简单说来,fixDelay 就是在任务执行后的【fixDelay】时间后,执行下次任务。fixRate 就是不管任务执行时间,我就是 fixRate 这个时间一到,我就执行下一个任务。
假设有一个定时任务,执行时间固定是一秒。分别有两个定时任务配置,一个是 A(fixDelay 1000),一个是 B(fixRate 1000)。那么 A、B 分别在 09:00:00 执行第一次任务,那么 A 执行第二次的任务是 9:00:02(执行完第一次花费一秒,再等待 1s),B 执行第二次任务是 09:00:01(等待 1s,无需管任务执行时间)。
总结
因为底层实现原因(也采用了上一篇我们提到的 ScheduledExecutorService),Spring Scheduler 没有给我们提供一个比较完善的定时任务解决方案,尤其是在分布式流行的现在。但是针对于一些小的应用程序还是比较方便的。
下一篇我们讲 Quartz,敬请期待...
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于