写在前面的话
- 本次源码阅读的主要类
- AbstractExecutorService
- ThreadPoolExecutor
- 需要提前了解的相关知识
- 线程池参数,参考我之前的文章关于线程池参数的理解
- 位运算
ThreadPoolExecutor使用位运算来做状态标志
- Unsafe 类的 API
- BlockQueue 类的 API
存取任务都会用到
- Runnable,Future
以上的类都在
java.util.concurrent
包中
AbstractExecutorService
方法名称 | 功能 |
---|---|
invokeAny | 执行队列中的所有任务, 当某一个方法完成时则会立即返回,同时取消其余未完成的任务 |
invokeAll | 执行所有任务,知道全部任务都完成时才返回 |
invokeAny -> doInvokeAny
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this); try { ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator<? extends Callable<T>> it = tasks.iterator(); futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (;;) { Future<T> f = ecs.poll(); // #1 if (f == null) { // #1.1 if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // #1.2 else if (active == 0) break; // #1.3 else if (timed) { f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } // #1.4 else f = ecs.take(); } // #2 if (f != null) { --active; try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } } if (ee == null) ee = new ExecutionException(); // #3 throw ee; } finally { for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
- 理想情况下的代码流程
- 执行#2
即直接获取到了任务的结果,然后立即返回。
在返回后之前会执行finally里的语句块,取消其余未完成的任务
- 执行#2
- 不能立即得到任务结果
- 首先执行#1
- 若存在待执行的任务,则执行#1.1
添加一个新的任务去执行。意思就是说之前的那个任务没执行完,我们可以认为它工作的比较慢,再上一个新任务试试,指不定这个新任务还会先完成
- 若也没有在执行的任务,则执行#1.2,然后执行#3 抛出异常。
很明显这种情形下是出现了某些不可预知的问题,
毕竟即获取不到结果,当前又没有任务在执行,还没有可用的任务,那妥妥的任务在执行的时候出现了异常
- 若设置了超时等待,则执行#1.3
设置了超时等待的话,就用poll在指定的时间范围了去获取任务结果,
如果时间到了还没获取到,那么按照调用者的意图就应该抛出异常,告诉他了
- 若没有设置超时等待,则执行#1.4
阻塞的等待任务的返回结果
ecs肯定使用了生产者消费者模式,只要有一个任务完成了,肯定就会把数据存到ecs中
- 若存在待执行的任务,则执行#1.1
- 然后执行#2
- 首先执行#1
invokeAll
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); boolean done = false; try { // #1 for (Callable<T> t : tasks) { RunnableFuture<T> f = newTaskFor(t); futures.add(f); execute(f); } // #2 for (int i = 0, size = futures.size(); i < size; i++) { Future<T> f = futures.get(i); if (!f.isDone()) { try { f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } // #3 done = true; return futures; } finally { if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }
invokeAll 方法就简单些了,就是单纯的阻塞式的去获取每一个任务的结果然后返回。
- 理想情况下的代码流程
- 执行#1
提交并执行所有的任务
- 执行#2
阻塞式的去获取所有任务的结果
- 执行#3
标志所有的任务已完成
- 执行#1
- 异常情况下的代码流程
- 执行#1
- 执行#2
- 抛出了非
CancellationException
和ExecutionException
的异常 - 不会执行#3 !!!
- 执行 finally,将所有的任务取消掉
ThreadPoolExecutor
主要方法
名称 | 作用 |
---|---|
execute | 提交任务 |
submit | 提交任务,同时能够获取执行结果。 该方法在父类 AbstractExecutorService 中 |
addWorker | 启动一个新线程,并且执行任务。 当线程数量大于线程参数的 coreSize 时就不会再执行了 |
主要属性
变量名 | 作用 | 二进制码 |
---|---|---|
ctl | 该变量是一个原子类的 Integer,二进制码一共 32 位, 高 3 位用来标志线程池状态,剩下的 29 为用来记录线程数量 |
|
RUNNING | 运行中 的二进制标志 |
高 3 位为 111 |
SHUTDOWN | 优雅关闭 的二进制标志 |
高 3 位为 000 |
STOP | 暴力关闭 的二进制标志 |
高 3 位为 001 |
TIDYING | 即将完全关闭 的二进制标志 |
高 3 位为 010 |
TERMINATED | 已完全关闭 的二进制标志 |
高 3 位为 011 |
上方表格中,后面的 5 个变量都是用来标志线程池的状态。他们是有顺序的,越往后说明线程池的活跃程度越低。 并且只有当值为负数时线程池才是 运行中 的状态 |
复习一下各个状态的含义:
- RUNNING
- 线程池处于运行中,可以接受新的任务
- SHUTDOWN
- 线程池开始关闭,不会接受新的任务,但是已经提交却还处于执行中的任务会让它执行完成
- STOP
- 立即关闭线程池,不接受新任务,同时打断还在执行中的任务
- TIDYING
- 所有任务已关闭,
ctl
中记录的线程数也为 0 了,然后会调用terminated()
函数,terminated()
函数在 ThreadPoolExecutor 中并没有实现,我们可以自己去重写它用于完成一些自定义的收尾工作
- 所有任务已关闭,
- TERMINATED
- 线程池彻底停止
辅助方法
名称 | 作用 |
---|---|
runStateOf | 取得 ctl 的高 3 位,即线程池的运行状态 |
workerCountOf | 取得 ctl 的低 29 位,即运行的线程数量 |
ctlOf | 将运行状态和运行线程数量存放在一个变量中 |
execute 方法解析
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 1. 如果线程池中的运行线程数小于corePoolSize那么就启动 * 一个新的线程去执行任务,如果成功那么就退出该方法,不 * 功的原因是:处于并发的环境下,在判断的时候线程数小于 * corePoolSize,但此时其他地方启动了一个新线程刚好导致 * 运行线程数达到corePoolSize,调用addWorker就会失败 * * 2. 如果任务能存放进队列,那么仍然需要再次检查线程池 * 状态,因为可能在判断的时候线程池还是运行状态,但是 * 进入方法体之后线程池就被关闭了,所以需要再检查一下, * 并且在必要时移除任务或者启动新的线程 * * 3. 如果我们不能将任务放进队列中,说明队列已经满了, * 我们就会尝试新启动一个线程(此时启动的线程就是由 * corePoolSize增加到maxPoolSize的过程)。如果启动失败 * 则拒绝该任务(启动失败的原因可能是线程数已经大于 * maxPoolSize) */ int c = ctl.get(); // #1 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // #2 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // #3 else if (!addWorker(command, false)) reject(command); }
简单来说 execute
方法主要判断是 启动线程执行任务
还是 将任务放进待执行队列
,
任务执行的调用主要还是靠 addWorker
方法来完成
addWorker 方法解析
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // #1 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // #2 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; // #3 try { // #3.1 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // #3.2 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // #3.3 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
该方法是用来创建,运行,清理线程的。其两个参数的作用:
- firstTask
- 该参数用来表示创建线程时首先要执行这个任务,这样可以避免放入到队列中进行排队
- 只有在创建 coreThread 时以及创建 maxThread 时才会传递该参数,其余时候都是传递的 null
举个例子,如果我们在创建 coreThread 时没有传递 firstTask,那么这个新的线程就会先空闲着,然后等待任务队列里有数据时,从任务队列里取一个任务出来再执行。这样这个线程刚创建那会儿就会闲着,浪费资源
我们的目标就是榨干CPU,不能浪费
- core
- 该参数在代码块里只用了一次,就是来判断是创建 coreThread 还是临时的 maxThread,
然后根据不同的类型,来判断他们的数量是否达到上限了
- 该参数在代码块里只用了一次,就是来判断是创建 coreThread 还是临时的 maxThread,
执行流程
- 首先执行#1
- 检查一下线程池的状态,如果是 STOP,TIDYING,TERMINATED 状态的话,则直接返回 false 表明启动线程失败。
- 如果现在状态是 SHUTDOWN,但是 firstTask 不为空或者 workQueue 为空的话,那么也直接返回 false。
因为SHUTDOWN状态允许还在运行中的任务继续执行,但是若还想启动线程并且携带一个任务那就不允许了
- 接着执行#2
- 检查线程数量是否太多了,如果过多则直接返回 false
- 如果线程数量还允许继续增加,那么使用 CAS 添加线程数,添加成功则跳出大循环去执行#3
- 添加失败了,那就在判断一下线程池的状态和之前是否相同,不同的话说明出现了一点点小问题,那么就从头再来,继续执行#1
- 兜兜转转一圈终于申请到了可以添加线程的权限,接下来执行#3,进行真正的创建线程
- #3.1 创建一个 worker
worker内部创建了线程
- #3.2 检查线程池状态,只有当以下两种情况时才可能说明创建线程真正的成功
- 情况 1:线程池是 RUNNING
- 情况 2:线程池是 SHUTDOWN 但是没有携带任务
- #3.3 上面的 2 个步骤都通过后接着启动线程!
终于启动了
- #3.1 创建一个 worker
主流程大体就是这样,还需要去看一看 Worker 的实现才行
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于