
回顾
上篇文章我们讲了选择定时任务框架的一个标准。提到了 crontab & Timer 的使用和实现原理。这篇我们讲 ScheduledExecutorService & Spring Scheduler。
ScheduledExecutorService
上一篇我们提到了 Timer。Timer 的一个定时器需要一个线程去处理,如果每个任务都交给一个新的 Timer 处理,线程创建等消耗资源。如果对一个 Timer 设置了多个任务,而有一个任务没有捕获住异常,那么会导致后面的任务都无法执行了。所以推荐使用 ScheduledExecutorService 来替换 Timer。【如果你使用的 IDE 有 Alibaba Java Coding Guidelines,你会发现它会直接给你提示 Timer 需要替换成 ScheduledExecutorService。 墙裂推荐装这个 idea 插件~】
代码示例
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("schedule-thread-%d").setDaemon(false).build(); // 20是核心线程数量,后面参数是一个线程工厂,采用了建造者模式创建。 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(20, threadFactory); Runnable task = () -> log.warn("ScheduledExecutorService 5s thread-name:{}", Thread.currentThread().getName()); // 调度的方法里面参数,1-实现runnable接口的方法。2-超时时间。3-超时时间单位。 executorService.schedule(task, 5, TimeUnit.SECONDS);
源码解读
首先我们需要搞懂 ScheduledExecutorService 实现类 ScheduledThreadPoolExecutor。
// 其构造就是一般的线程池构造函数,但是里面的queue是自己的内部类,下面会提到。 public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } // 调度的方法 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { // 参数校验 if (command == null || unit == null) throw new NullPointerException(); // 包装一下Runnable类,变成一个Future。 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); // 延时执行 *【重点】* delayedExecute(t); // future对象返回,可以异步继续执行其他代码逻辑。 return t; } // 延迟执行 private void delayedExecute(RunnableScheduledFuture<?> task) { // 检查状态 if (isShutdown()) reject(task); else { // 正常状态下,先把任务加入queue。【其实没有这么单纯 : )】 super.getQueue().add(task); // 再次检查状态 状态不对赶紧撤销。你要是正在执行 就执行吧。。。 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else // 预先 生成执行任务的线程【还是比较单纯的】 ensurePrestart(); } } // 都是父类那一套,此处不是重点。 void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); } // 我们一层层把这个“单纯”的add解开 : ) public boolean add(Runnable e) { return offer(e); } // 一个add 整了这么多代码。我们慢慢看 public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; // 处理queue的时候先给加个锁。 final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) // 容量不够 扩容 grow(); size = i + 1; // 添加第一个任务,小伙子 你就是第一个要执行的人了 if (i == 0) { queue[0] = e; setIndex(e, 0); } else { // 当不是第一个任务的时候怎么处理呢。。。【重点】 siftUp(i, e); } if (queue[0] == e) { leader = null; // 当一个新的任务在队列的最前面并且可移植性,或者一个新线程可能需要成为leader时,发信号 // 上文 private final Condition available = lock.newCondition(); available.signal(); } } finally { lock.unlock(); } return true; } // 如果不是queue里面的唯一任务,就需要这么搞 private void siftUp(int k, RunnableScheduledFuture<?> key) { // 二话不说 先来一个queue循环,k我们上面看到是queue size。 while (k > 0) { // 堆排序。compareTo方法肯定重写了!下面我们把这个方法给找出来 int parent = (k - 1) >>> 1; RunnableScheduledFuture<?> e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; // 上面介绍过了,不提了。 setIndex(key, k); } // Future对象的比较方法 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { // 其实不用看 也知道是用的时间 : ) // 将时间最靠近现在的排在前面,方便线程从前往后拿任务执行。 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } // 下面是DelayedWorkQueue的获取任务实现 public RunnableScheduledFuture<?> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture<?> first = queue[0]; // 任务没有,那么等待 if (first == null) available.await(); else { // 查看任务是不是到了执行时间了 long delay = first.getDelay(NANOSECONDS); // 到时间了,返回任务 if (delay <= 0) return finishPoll(first); // 否则,把任务置为null,继续等着吧 first = null; // don't retain ref while waiting // leader就是在队列头部等待执行的线程 如果为null 那么就等待吧,等待唤醒 if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 还是要等待delay的时间才能执行,继续检查状态。 available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
OK, 我们终于看完了代码的分析...
优势
- 上面我们已经提到了,比起 Timer,ScheduledExecutorService 可以合理利用线程池资源。在一个任务执行失败时,不会影响其他任务的执行。
- 可以设置周期定时执行,也可以直接设置触发时间。
劣势
- 不支持 cron 这种灵活配置。
- 代码实现不是很灵活,修改需要重启项目。
- 功能单一。可视化、监控等都不支持,也不支持分布式等等功能。
最后
本来这篇文章开始前想着还写个 spring scheduler,但是,越写越长。。。后面我们慢慢学习更成熟的框架吧~

欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于