回顾
上篇文章我们讲了选择定时任务框架的一个标准。提到了 crontab & Timer 的使用和实现原理。这篇我们讲 ScheduledExecutorService & Spring Scheduler。
ScheduledExecutorService
上一篇我们提到了 Timer。Timer 的一个定时器需要一个线程去处理,如果每个任务都交给一个新的 Timer 处理,线程创建等消耗资源。如果对一个 Timer 设置了多个任务,而有一个任务没有捕获住异常,那么会导致后面的任务都无法执行了。所以推荐使用 ScheduledExecutorService 来替换 Timer。【如果你使用的 IDE 有 Alibaba Java Coding Guidelines,你会发现它会直接给你提示 Timer 需要替换成 ScheduledExecutorService。 墙裂推荐装这个 idea 插件~】
代码示例
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("schedule-thread-%d").setDaemon(false).build();
// 20是核心线程数量,后面参数是一个线程工厂,采用了建造者模式创建。
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(20, threadFactory);
Runnable task = () -> log.warn("ScheduledExecutorService 5s thread-name:{}", Thread.currentThread().getName());
// 调度的方法里面参数,1-实现runnable接口的方法。2-超时时间。3-超时时间单位。
executorService.schedule(task, 5, TimeUnit.SECONDS);
源码解读
首先我们需要搞懂 ScheduledExecutorService 实现类 ScheduledThreadPoolExecutor。
// 其构造就是一般的线程池构造函数,但是里面的queue是自己的内部类,下面会提到。
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
// 调度的方法
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
// 参数校验
if (command == null || unit == null)
throw new NullPointerException();
// 包装一下Runnable类,变成一个Future。
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 延时执行 *【重点】*
delayedExecute(t);
// future对象返回,可以异步继续执行其他代码逻辑。
return t;
}
// 延迟执行
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 检查状态
if (isShutdown())
reject(task);
else {
// 正常状态下,先把任务加入queue。【其实没有这么单纯 : )】
super.getQueue().add(task);
// 再次检查状态 状态不对赶紧撤销。你要是正在执行 就执行吧。。。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 预先 生成执行任务的线程【还是比较单纯的】
ensurePrestart();
}
}
// 都是父类那一套,此处不是重点。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
// 我们一层层把这个“单纯”的add解开 : )
public boolean add(Runnable e) {
return offer(e);
}
// 一个add 整了这么多代码。我们慢慢看
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 处理queue的时候先给加个锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
// 容量不够 扩容
grow();
size = i + 1;
// 添加第一个任务,小伙子 你就是第一个要执行的人了
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 当不是第一个任务的时候怎么处理呢。。。【重点】
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
// 当一个新的任务在队列的最前面并且可移植性,或者一个新线程可能需要成为leader时,发信号
// 上文 private final Condition available = lock.newCondition();
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
// 如果不是queue里面的唯一任务,就需要这么搞
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 二话不说 先来一个queue循环,k我们上面看到是queue size。
while (k > 0) {
// 堆排序。compareTo方法肯定重写了!下面我们把这个方法给找出来
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
// 上面介绍过了,不提了。
setIndex(key, k);
}
// Future对象的比较方法
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
// 其实不用看 也知道是用的时间 : )
// 将时间最靠近现在的排在前面,方便线程从前往后拿任务执行。
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
// 下面是DelayedWorkQueue的获取任务实现
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 任务没有,那么等待
if (first == null)
available.await();
else {
// 查看任务是不是到了执行时间了
long delay = first.getDelay(NANOSECONDS);
// 到时间了,返回任务
if (delay <= 0)
return finishPoll(first);
// 否则,把任务置为null,继续等着吧
first = null; // don't retain ref while waiting
// leader就是在队列头部等待执行的线程 如果为null 那么就等待吧,等待唤醒
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 还是要等待delay的时间才能执行,继续检查状态。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
OK, 我们终于看完了代码的分析...
优势
- 上面我们已经提到了,比起 Timer,ScheduledExecutorService 可以合理利用线程池资源。在一个任务执行失败时,不会影响其他任务的执行。
- 可以设置周期定时执行,也可以直接设置触发时间。
劣势
- 不支持 cron 这种灵活配置。
- 代码实现不是很灵活,修改需要重启项目。
- 功能单一。可视化、监控等都不支持,也不支持分布式等等功能。
最后
本来这篇文章开始前想着还写个 spring scheduler,但是,越写越长。。。后面我们慢慢学习更成熟的框架吧~
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于