来源
为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。
类图
@startuml
interface ExecutorService
interface Executor
interface ScheduledExecutorService
abstract class AbstractExecutorService
ExecutorService-up-|>Executor
ScheduledExecutorService -up-|>ExecutorService
AbstractExecutorService .up.|>ExecutorService
ForkJoinPool-up-|>AbstractExecutorService
ThreadPoolExecutor-up-|>AbstractExecutorService
ScheduledThreadPoolExecutor-up-|>ThreadPoolExecutor
Executors .up-> ExecutorService
Executors .up-> ScheduledExecutorService
@enduml
内部实现
ThreadPoolExecutor(corePoolSize,maxmumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler)
-
corePoolSize:暂时理解为线程池的初始线程数量,创建线程池时会同时创建 coreSize 数量的线程,这些线程在等待分配任务。
-
maxmumPoolSize:线程池最大线程数量,超过 coreSize 的 Idle 线程的存活时间为 keepAliveTime。
-
keepAliveTime:因为线程池数量最大可以到 maxPoolSize,当线程池线程数量超过 corePoolSize 时,多余的线程可以存活的时间。
-
unit: keepAliveTime 的单位。
-
workerQueue:是指被提交但未执行的任务队列,它是一个 BlockingQueue 接口的对象。
-
threadFactory:线程工厂,用于创建线程,一般用默认的即可。
-
handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。
ThreadPoolExecutor 的任务调度逻辑
- coreSize 前一章的 corePoolSize
- maxSize 前一章的 maxmumPoolSize
- 等待队列 前一章的 workerQueue
- 拒绝策略 前一章的 handler
@startuml
start
:任务提交;
if (非Idle的线程数量小于coreSize?) then (是)
:分配线程执行;
else (否)
if(等待队列不满?) then (是)
:加入等待队列;
else (否)
if(总线程数小于maxSize?) then (是)
:创建新的线程并执行;
else (否)
:执行拒绝策略;
endif
endif
endif
end
@enduml
等待队列类型
SynchronousQueue
容量为 0,提交的任务不会被真实保存,而总是将新任务提供给线程执行。
如果总线程数量大于 maxSize,执行拒绝策略
@startuml
start
:任务提交;
if (非Idle的线程数量小于coreSize?) then (是)
:分配线程执行;
else (否)
if(总线程数小于maxSize?) then (是)
:创建新的线程并执行;
else (否)
:执行拒绝策略;
endif
endif
end
@enduml
ArrayBlockingQueue
有界的任务队列,构造函数可以指定容量。,有界队列仅
当在任务队列装满时,才可能将线程数提升到 corePoolSize 以上,换言之,除非系统非
常繁忙,否则确保核心线程数维持在在 corePoolSize。
调度逻辑同普通的调度逻辑一样。
LinkedBlockingQueue
无界,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。使用无界队列的线程池的线程数量永远不会超过 coreSize
@startuml
start
:任务提交;
if (非Idle的线程数量小于coreSize?) then (是)
:分配线程执行;
else (否)
:加入等待队列;
endif
end
@enduml
PriorityBlockingQueue
优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue 实现,可以控制任务的执行先后顺序。它是一个特殊的 无界队列
。无论是有界队列 ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue 都是按照先进先出算法处理任务的。而 PriorityBlockingQueue 则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)
DelayedWorkQueue
计划任务的延迟队列
工厂方法
newFixedThreadPool
使用无界队列的固定大小的线程池
我觉得固不固定无所谓,用了无界队列,maxSize 也就没用了。
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory);
newSingleThreadExecutor
线程数量为 1 的 FixedThreadPool
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
threadFactory));
newCachedThreadPool
使用同步队列,无限线程数量的线程池。
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
拒绝策略
内置的策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际应用
需要,完全可以自己扩展 RejectedExecutionHandler 接口
AbortPolicy
直接抛出异常,默认策略
CallerRunsPolicy
只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
DiscardOledestPolicy
该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
DiscardPolicy
该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!
计划任务
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ScheduledExecutorService 提供了 3 个方法
schedule
在给定时间,对任务进行一次调度
scheduleAtFixedRate
它是以上一个任务开始执行时间为起点,之后的 period 时间,调度下一次任务。
scheduleWithFixedDelay
在上一个任务结束后,再经过 delay 时间进行任务调度。需要配合上一个任务的运行时间。
Demo
public static void main(String[] args) {
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
Runnable cmd2Sec = ()->{
try {
Thread.sleep(1000);
System.out.println("2 second's task: "+LocalDateTime.now().format(dateTimeFormatter));
} catch (InterruptedException e) {
e.printStackTrace();
}};
Runnable cmd5Sec = ()->{
try {
Thread.sleep(5000);
System.out.println("5 second's task: "+LocalDateTime.now().format(dateTimeFormatter));
} catch (InterruptedException e) {
e.printStackTrace();
}};
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
ses.scheduleAtFixedRate(cmd2Sec, 0, 2, TimeUnit.SECONDS);
// ses.scheduleWithFixedDelay(cmd2Sec, 0, 2, TimeUnit.SECONDS);
// ses.scheduleAtFixedRate(cmd5Sec, 0, 2, TimeUnit.SECONDS);
// ses.scheduleWithFixedDelay(cmd5Sec, 0, 2, TimeUnit.SECONDS);
}
执行 ses.scheduleAtFixedRate(cmd2Sec, 0, 2, TimeUnit.SECONDS)
的结果
2 second's task: 12:22:07
2 second's task: 12:22:09
2 second's task: 12:22:11
执行 ses.scheduleWithFixedDelay(cmd2Sec, 0, 2, TimeUnit.SECONDS)
的结果
2 second's task: 12:22:43
2 second's task: 12:22:46
2 second's task: 12:22:49
执行 ses.scheduleAtFixedRate(cmd5Sec, 0, 2, TimeUnit.SECONDS)
的结果
5 second's task: 12:23:32
5 second's task: 12:23:37
5 second's task: 12:23:42
5 second's task: 12:23:47
执行 ses.scheduleWithFixedDelay(cmd5Sec, 0, 2, TimeUnit.SECONDS)
的结果
5 second's task: 12:25:13
5 second's task: 12:25:20
5 second's task: 12:25:27
结论
- FixedRate 是从任务起始时间计算下一次调度时间,FixedDelay 是从结束时间
- 如果任务执行超过调度周期,FixedRate 会等待任务执行完成之后立刻进行下次调度,FixedDelay 还是在结束后等待一个周期。
扩展线程池
ThreadPoolExecutor 也是一个可以扩展的线程池。它提供了 beforeExecute()、
afterExecute()和 terminated()三个接口对线程池进行控制。
这三个方法应用了模板方法模式,提供了默认的空实现,我们只需要覆盖即可。
源码逻辑:
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
terminated()在执行拒绝策略的时候被调用
优化线程池线程数量
在 《Java Concurrency in Practice》一书中给出了一个估算线程池大小的经验公式:
Ncpu = CPU的数量(Runtime.getRuntime().availableProcessors())
Ucpu = 目标CPU的使用率,0 =< Ucpu<= 1
W/C = 等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的池的大小等于:
Nthreads = Ncpu * Ucpu * ( 1 + W/C )
通常没有太多要求用 Runtime.getRuntime().availableProcessors()即可,这是逻辑处理器的数量
submit 和 execute 方法的区别
-
接收的参数不同,submit 可以接收 Callable;
-
submit()可以有返回值,而 execute()没有;
-
submit()可以进行 Exception 处理,调用 Future.get()就可以看到在哪加入任务的堆栈;而 execute()只能看到执行任务异常的堆栈。
ForkJoinPool
Fork/Join 框架
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任
务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给
线程池(称为 ForkJoinPool )中的工作线程。首先来看看如何定义任务和子任务。
Demo
public class ForkJoinSumCalculator extends RecursiveTask {
public static void main(String[] args){
System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs" );
}
public static <T, R> long measurePerf(Function<T, R> f, T input) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
R result = f.apply(input);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + result);
if (duration < fastest) fastest = duration;
}
return fastest;
}
public static final long THRESHOLD = 10_000;
private final long[] numbers;
private final int start;
private final int end;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask task = new ForkJoinSumCalculator(numbers);
return FORK_JOIN_POOL.invoke(task);
}
}
工作窃取
分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,
应该让每个任务都用完全相同的时间完成,让所有的 CPU 内核都同样繁忙。不幸的是,实际中,每
个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如
磁盘访问慢,或是需要和外部服务协调执行。
分支/合并框架工程用一种称为工作窃取(workstealing)的技术来解决这个问题。在实际应
用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分
配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执
行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经
空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队
列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队
列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程
之间平衡负载。
一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。下图展示
了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线
程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于