Java 线程池总结

本贴最后更新于 2269 天前,其中的信息可能已经事过境迁

来源

为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。

类图

@startuml interface ExecutorService interface Executor interface ScheduledExecutorService abstract class AbstractExecutorService ExecutorService-up-|>Executor ScheduledExecutorService -up-|>ExecutorService AbstractExecutorService .up.|>ExecutorService ForkJoinPool-up-|>AbstractExecutorService ThreadPoolExecutor-up-|>AbstractExecutorService ScheduledThreadPoolExecutor-up-|>ThreadPoolExecutor Executors .up-> ExecutorService Executors .up-> ScheduledExecutorService @enduml

内部实现

ThreadPoolExecutor(corePoolSize,maxmumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler)

  • corePoolSize:暂时理解为线程池的初始线程数量,创建线程池时会同时创建 coreSize 数量的线程,这些线程在等待分配任务。

  • maxmumPoolSize:线程池最大线程数量,超过 coreSize 的 Idle 线程的存活时间为 keepAliveTime。

  • keepAliveTime:因为线程池数量最大可以到 maxPoolSize,当线程池线程数量超过 corePoolSize 时,多余的线程可以存活的时间。

  • unit: keepAliveTime 的单位。

  • workerQueue:是指被提交但未执行的任务队列,它是一个 BlockingQueue 接口的对象。

  • threadFactory:线程工厂,用于创建线程,一般用默认的即可。

  • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

ThreadPoolExecutor 的任务调度逻辑

  • coreSize 前一章的 corePoolSize
  • maxSize 前一章的 maxmumPoolSize
  • 等待队列 前一章的 workerQueue
  • 拒绝策略 前一章的 handler
@startuml start :任务提交; if (非Idle的线程数量小于coreSize?) then (是) :分配线程执行; else (否) if(等待队列不满?) then (是) :加入等待队列; else (否) if(总线程数小于maxSize?) then (是) :创建新的线程并执行; else (否) :执行拒绝策略; endif endif endif end @enduml

等待队列类型

SynchronousQueue

容量为 0,提交的任务不会被真实保存,而总是将新任务提供给线程执行。
如果总线程数量大于 maxSize,执行拒绝策略

@startuml start :任务提交; if (非Idle的线程数量小于coreSize?) then (是) :分配线程执行; else (否) if(总线程数小于maxSize?) then (是) :创建新的线程并执行; else (否) :执行拒绝策略; endif endif end @enduml

ArrayBlockingQueue

有界的任务队列,构造函数可以指定容量。,有界队列仅
当在任务队列装满时,才可能将线程数提升到 corePoolSize 以上,换言之,除非系统非
常繁忙,否则确保核心线程数维持在在 corePoolSize。

调度逻辑同普通的调度逻辑一样。

LinkedBlockingQueue

无界,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。使用无界队列的线程池的线程数量永远不会超过 coreSize

@startuml start :任务提交; if (非Idle的线程数量小于coreSize?) then (是) :分配线程执行; else (否) :加入等待队列; endif end @enduml

PriorityBlockingQueue

优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue 实现,可以控制任务的执行先后顺序。它是一个特殊的 无界队列。无论是有界队列 ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue 都是按照先进先出算法处理任务的。而 PriorityBlockingQueue 则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)

DelayedWorkQueue

计划任务的延迟队列

工厂方法

newFixedThreadPool

使用无界队列的固定大小的线程池

我觉得固不固定无所谓,用了无界队列,maxSize 也就没用了。

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory);

newSingleThreadExecutor

线程数量为 1 的 FixedThreadPool

return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory));

newCachedThreadPool

使用同步队列,无限线程数量的线程池。

return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());

拒绝策略

内置的策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际应用
需要,完全可以自己扩展 RejectedExecutionHandler 接口

AbortPolicy

直接抛出异常,默认策略

CallerRunsPolicy

只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降

DiscardOledestPolicy

该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

DiscardPolicy

该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!

计划任务

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

ScheduledExecutorService 提供了 3 个方法

schedule

在给定时间,对任务进行一次调度

scheduleAtFixedRate

它是以上一个任务开始执行时间为起点,之后的 period 时间,调度下一次任务。

scheduleWithFixedDelay

在上一个任务结束后,再经过 delay 时间进行任务调度。需要配合上一个任务的运行时间。

Demo

public static void main(String[] args) { DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss"); Runnable cmd2Sec = ()->{ try { Thread.sleep(1000); System.out.println("2 second's task: "+LocalDateTime.now().format(dateTimeFormatter)); } catch (InterruptedException e) { e.printStackTrace(); }}; Runnable cmd5Sec = ()->{ try { Thread.sleep(5000); System.out.println("5 second's task: "+LocalDateTime.now().format(dateTimeFormatter)); } catch (InterruptedException e) { e.printStackTrace(); }}; ScheduledExecutorService ses = Executors.newScheduledThreadPool(4); ses.scheduleAtFixedRate(cmd2Sec, 0, 2, TimeUnit.SECONDS); // ses.scheduleWithFixedDelay(cmd2Sec, 0, 2, TimeUnit.SECONDS); // ses.scheduleAtFixedRate(cmd5Sec, 0, 2, TimeUnit.SECONDS); // ses.scheduleWithFixedDelay(cmd5Sec, 0, 2, TimeUnit.SECONDS); }

执行 ses.scheduleAtFixedRate(cmd2Sec, 0, 2, TimeUnit.SECONDS) 的结果

2 second's task: 12:22:07 2 second's task: 12:22:09 2 second's task: 12:22:11

执行 ses.scheduleWithFixedDelay(cmd2Sec, 0, 2, TimeUnit.SECONDS) 的结果

2 second's task: 12:22:43 2 second's task: 12:22:46 2 second's task: 12:22:49

执行 ses.scheduleAtFixedRate(cmd5Sec, 0, 2, TimeUnit.SECONDS) 的结果

5 second's task: 12:23:32 5 second's task: 12:23:37 5 second's task: 12:23:42 5 second's task: 12:23:47

执行 ses.scheduleWithFixedDelay(cmd5Sec, 0, 2, TimeUnit.SECONDS) 的结果

5 second's task: 12:25:13 5 second's task: 12:25:20 5 second's task: 12:25:27

结论

  1. FixedRate 是从任务起始时间计算下一次调度时间,FixedDelay 是从结束时间
  2. 如果任务执行超过调度周期,FixedRate 会等待任务执行完成之后立刻进行下次调度,FixedDelay 还是在结束后等待一个周期。

扩展线程池

ThreadPoolExecutor 也是一个可以扩展的线程池。它提供了 beforeExecute()、
afterExecute()和 terminated()三个接口对线程池进行控制。

这三个方法应用了模板方法模式,提供了默认的空实现,我们只需要覆盖即可。

源码逻辑:

beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); }

terminated()在执行拒绝策略的时候被调用

优化线程池线程数量

在 《Java Concurrency in Practice》一书中给出了一个估算线程池大小的经验公式:

Ncpu = CPU的数量(Runtime.getRuntime().availableProcessors()) Ucpu = 目标CPU的使用率,0 =< Ucpu<= 1 W/C = 等待时间与计算时间的比率 为保持处理器达到期望的使用率,最优的池的大小等于: Nthreads = Ncpu * Ucpu * ( 1 + W/C )

通常没有太多要求用 Runtime.getRuntime().availableProcessors()即可,这是逻辑处理器的数量

submit 和 execute 方法的区别

  1. 接收的参数不同,submit 可以接收 Callable;

  2. submit()可以有返回值,而 execute()没有;

  3. submit()可以进行 Exception 处理,调用 Future.get()就可以看到在哪加入任务的堆栈;而 execute()只能看到执行任务异常的堆栈。

ForkJoinPool

Fork/Join 框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任
务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给
线程池(称为 ForkJoinPool )中的工作线程。首先来看看如何定义任务和子任务。

Demo

public class ForkJoinSumCalculator extends RecursiveTask { public static void main(String[] args){ System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs" ); } public static <T, R> long measurePerf(Function<T, R> f, T input) { long fastest = Long.MAX_VALUE; for (int i = 0; i < 10; i++) { long start = System.nanoTime(); R result = f.apply(input); long duration = (System.nanoTime() - start) / 1_000_000; System.out.println("Result: " + result); if (duration < fastest) fastest = duration; } return fastest; } public static final long THRESHOLD = 10_000; private final long[] numbers; private final int start; private final int end; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2); leftTask.fork(); ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end); Long rightResult = rightTask.compute(); Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask task = new ForkJoinSumCalculator(numbers); return FORK_JOIN_POOL.invoke(task); } }

工作窃取

  分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,
应该让每个任务都用完全相同的时间完成,让所有的 CPU 内核都同样繁忙。不幸的是,实际中,每
个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如
磁盘访问慢,或是需要和外部服务协调执行。
  分支/合并框架工程用一种称为工作窃取(workstealing)的技术来解决这个问题。在实际应
用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分
配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执
行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经
空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队
列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队
列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程
之间平衡负载。
  一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。下图展示
了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线
程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真

imagepng

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3455 回帖 • 172 关注
  • 线程
    122 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...