Java 线程池总结

本贴最后更新于 2202 天前,其中的信息可能已经事过境迁

来源

为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。

类图

@startuml
interface ExecutorService
interface Executor
interface ScheduledExecutorService
abstract class AbstractExecutorService
ExecutorService-up-|>Executor
ScheduledExecutorService -up-|>ExecutorService
AbstractExecutorService .up.|>ExecutorService
ForkJoinPool-up-|>AbstractExecutorService
ThreadPoolExecutor-up-|>AbstractExecutorService
ScheduledThreadPoolExecutor-up-|>ThreadPoolExecutor
Executors .up-> ExecutorService
Executors .up-> ScheduledExecutorService
@enduml

内部实现

ThreadPoolExecutor(corePoolSize,maxmumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler)

  • corePoolSize:暂时理解为线程池的初始线程数量,创建线程池时会同时创建 coreSize 数量的线程,这些线程在等待分配任务。

  • maxmumPoolSize:线程池最大线程数量,超过 coreSize 的 Idle 线程的存活时间为 keepAliveTime。

  • keepAliveTime:因为线程池数量最大可以到 maxPoolSize,当线程池线程数量超过 corePoolSize 时,多余的线程可以存活的时间。

  • unit: keepAliveTime 的单位。

  • workerQueue:是指被提交但未执行的任务队列,它是一个 BlockingQueue 接口的对象。

  • threadFactory:线程工厂,用于创建线程,一般用默认的即可。

  • handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

ThreadPoolExecutor 的任务调度逻辑

  • coreSize 前一章的 corePoolSize
  • maxSize 前一章的 maxmumPoolSize
  • 等待队列 前一章的 workerQueue
  • 拒绝策略 前一章的 handler
@startuml
start
:任务提交;
if (非Idle的线程数量小于coreSize?) then (是)
:分配线程执行;
else (否)
if(等待队列不满?) then (是)
:加入等待队列;
else (否)
if(总线程数小于maxSize?) then (是)
:创建新的线程并执行;
else (否)
:执行拒绝策略;
endif
endif
endif
end
@enduml

等待队列类型

SynchronousQueue

容量为 0,提交的任务不会被真实保存,而总是将新任务提供给线程执行。
如果总线程数量大于 maxSize,执行拒绝策略

@startuml
start
:任务提交;
if (非Idle的线程数量小于coreSize?) then (是)
:分配线程执行;
else (否)
if(总线程数小于maxSize?) then (是)
:创建新的线程并执行;
else (否)
:执行拒绝策略;
endif
endif
end
@enduml

ArrayBlockingQueue

有界的任务队列,构造函数可以指定容量。,有界队列仅
当在任务队列装满时,才可能将线程数提升到 corePoolSize 以上,换言之,除非系统非
常繁忙,否则确保核心线程数维持在在 corePoolSize。

调度逻辑同普通的调度逻辑一样。

LinkedBlockingQueue

无界,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。使用无界队列的线程池的线程数量永远不会超过 coreSize

@startuml
start
:任务提交;
if (非Idle的线程数量小于coreSize?) then (是)
:分配线程执行;
else (否)
:加入等待队列;
endif
end
@enduml

PriorityBlockingQueue

优先任务队列是带有执行优先级的队列。它通过 PriorityBlockingQueue 实现,可以控制任务的执行先后顺序。它是一个特殊的 无界队列。无论是有界队列 ArrayBlockingQueue,还是未指定大小的无界队列 LinkedBlockingQueue 都是按照先进先出算法处理任务的。而 PriorityBlockingQueue 则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)

DelayedWorkQueue

计划任务的延迟队列

工厂方法

newFixedThreadPool

使用无界队列的固定大小的线程池

我觉得固不固定无所谓,用了无界队列,maxSize 也就没用了。

return new ThreadPoolExecutor(nThreads, nThreads,
                              0L, TimeUnit.MILLISECONDS,
                              new LinkedBlockingQueue(),
                              threadFactory);

newSingleThreadExecutor

线程数量为 1 的 FixedThreadPool

return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue(),
                            threadFactory));

newCachedThreadPool

使用同步队列,无限线程数量的线程池。

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue());
							  

拒绝策略

内置的策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际应用
需要,完全可以自己扩展 RejectedExecutionHandler 接口

AbortPolicy

直接抛出异常,默认策略

CallerRunsPolicy

只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降

DiscardOledestPolicy

该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

DiscardPolicy

该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!

计划任务

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

ScheduledExecutorService 提供了 3 个方法

schedule

在给定时间,对任务进行一次调度

scheduleAtFixedRate

它是以上一个任务开始执行时间为起点,之后的 period 时间,调度下一次任务。

scheduleWithFixedDelay

在上一个任务结束后,再经过 delay 时间进行任务调度。需要配合上一个任务的运行时间。

Demo

public static void main(String[] args) {

        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
        Runnable cmd2Sec = ()->{
            try {
                Thread.sleep(1000);
                System.out.println("2 second's task: "+LocalDateTime.now().format(dateTimeFormatter));
            } catch (InterruptedException e) {
                e.printStackTrace();

        }};

        Runnable cmd5Sec = ()->{
            try {
                Thread.sleep(5000);
                System.out.println("5 second's task: "+LocalDateTime.now().format(dateTimeFormatter));
            } catch (InterruptedException e) {
                e.printStackTrace();

            }};

        ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
        ses.scheduleAtFixedRate(cmd2Sec, 0, 2, TimeUnit.SECONDS);
//        ses.scheduleWithFixedDelay(cmd2Sec, 0, 2, TimeUnit.SECONDS);
//        ses.scheduleAtFixedRate(cmd5Sec, 0, 2, TimeUnit.SECONDS);
//        ses.scheduleWithFixedDelay(cmd5Sec, 0, 2, TimeUnit.SECONDS);
  }

执行 ses.scheduleAtFixedRate(cmd2Sec, 0, 2, TimeUnit.SECONDS) 的结果

2 second's task: 12:22:07
2 second's task: 12:22:09
2 second's task: 12:22:11

执行 ses.scheduleWithFixedDelay(cmd2Sec, 0, 2, TimeUnit.SECONDS) 的结果

2 second's task: 12:22:43
2 second's task: 12:22:46
2 second's task: 12:22:49

执行 ses.scheduleAtFixedRate(cmd5Sec, 0, 2, TimeUnit.SECONDS) 的结果

5 second's task: 12:23:32
5 second's task: 12:23:37
5 second's task: 12:23:42
5 second's task: 12:23:47

执行 ses.scheduleWithFixedDelay(cmd5Sec, 0, 2, TimeUnit.SECONDS) 的结果

5 second's task: 12:25:13
5 second's task: 12:25:20
5 second's task: 12:25:27

结论

  1. FixedRate 是从任务起始时间计算下一次调度时间,FixedDelay 是从结束时间
  2. 如果任务执行超过调度周期,FixedRate 会等待任务执行完成之后立刻进行下次调度,FixedDelay 还是在结束后等待一个周期。

扩展线程池

ThreadPoolExecutor 也是一个可以扩展的线程池。它提供了 beforeExecute()、
afterExecute()和 terminated()三个接口对线程池进行控制。

这三个方法应用了模板方法模式,提供了默认的空实现,我们只需要覆盖即可。

源码逻辑:

beforeExecute(wt, task);
Throwable thrown = null;
try {
    task.run();
} catch (RuntimeException x) {
    thrown = x; throw x;
} catch (Error x) {
    thrown = x; throw x;
} catch (Throwable x) {
    thrown = x; throw new Error(x);
} finally {
    afterExecute(task, thrown);
}

terminated()在执行拒绝策略的时候被调用

优化线程池线程数量

在 《Java Concurrency in Practice》一书中给出了一个估算线程池大小的经验公式:

Ncpu = CPU的数量(Runtime.getRuntime().availableProcessors())
Ucpu = 目标CPU的使用率,0 =< Ucpu<= 1 
W/C = 等待时间与计算时间的比率
为保持处理器达到期望的使用率,最优的池的大小等于:
Nthreads = Ncpu * Ucpu * ( 1 + W/C )

通常没有太多要求用 Runtime.getRuntime().availableProcessors()即可,这是逻辑处理器的数量

submit 和 execute 方法的区别

  1. 接收的参数不同,submit 可以接收 Callable;

  2. submit()可以有返回值,而 execute()没有;

  3. submit()可以进行 Exception 处理,调用 Future.get()就可以看到在哪加入任务的堆栈;而 execute()只能看到执行任务异常的堆栈。

ForkJoinPool

Fork/Join 框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任
务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给
线程池(称为 ForkJoinPool )中的工作线程。首先来看看如何定义任务和子任务。

Demo

public class ForkJoinSumCalculator extends RecursiveTask {

    public static void main(String[] args){
        System.out.println("ForkJoin sum done in: " + measurePerf(ForkJoinSumCalculator::forkJoinSum, 10_000_000L) + " msecs" );
    }

    public static <T, R> long measurePerf(Function<T, R> f, T input) {
        long fastest = Long.MAX_VALUE;
        for (int i = 0; i < 10; i++) {
            long start = System.nanoTime();
            R result = f.apply(input);
            long duration = (System.nanoTime() - start) / 1_000_000;
            System.out.println("Result: " + result);
            if (duration < fastest) fastest = duration;
        }
        return fastest;
    }

    public static final long THRESHOLD = 10_000;

    private final long[] numbers;
    private final int start;
    private final int end;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
  protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        leftTask.fork();
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask task = new ForkJoinSumCalculator(numbers);
        return FORK_JOIN_POOL.invoke(task);
    }
}

工作窃取

  分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,
应该让每个任务都用完全相同的时间完成,让所有的 CPU 内核都同样繁忙。不幸的是,实际中,每
个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如
磁盘访问慢,或是需要和外部服务协调执行。
  分支/合并框架工程用一种称为工作窃取(workstealing)的技术来解决这个问题。在实际应
用中,这意味着这些任务差不多被平均分配到 ForkJoinPool 中的所有线程上。每个线程都为分
配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执
行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经
空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队
列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队
列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程
之间平衡负载。
  一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。下图展示
了这个过程。当工作线程队列中有一个任务被分成两个子任务时,一个子任务就被闲置的工作线
程“偷走”了。如前所述,这个过程可以不断递归,直到规定子任务应顺序执行的条件为真

imagepng

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:SymSoloVditor思源笔记

    1063 引用 • 3454 回帖 • 191 关注
  • 线程
    122 引用 • 111 回帖 • 3 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...