线程池使用与原理

本贴最后更新于 1194 天前,其中的信息可能已经时过境迁

概述

在大型项目中,在代码中直接创建线程是不允许的,如果需要使用多线性则必须通过线程池了创建,因而了解线程池的使用规范以及底层实现是非常有必要的。

使用

预想的用法

但说到线程池,我们可能首先会类比连接池这类池化资源,会以为线程池是通过 acquire() 来获取资源,通过 release() 来释放资源,就像下边这样:

//采用一般意义上池化资源的设计方法
class ThreadPool{
  // 获取空闲线程
  Thread acquire() {
  }
  // 释放线程
  void release(Thread t){
  }
} 
//期望的使用
ThreadPool pool
Thread T1=pool.acquire();
//传入Runnable对象
T1.execute(()->{
  //具体业务逻辑
  ......
});

也就是说,我们在使用的时候直接从 pool 中获取一个线程实例 T1,然后直接向 T1 里边传业务逻辑就行了。但实际在使用的时候发现 Thread 类中根本没有类似于 execute(Runnable task) 这样的公共方法,因而线程池是没有办法按照池化的思想来设计的。

实际的用法

业界当前在设计线程池时,普遍采用的都是生产者-消费者模式,类似下面这种形式:

//简化的线程池,仅用来说明工作原理
class MyThreadPool{
  //利用阻塞队列实现生产者-消费者模式
  BlockingQueue<Runnable> workQueue;
  //保存内部工作线程
  List<WorkerThread> threads 
    = new ArrayList<>();
  // 构造方法
  MyThreadPool(int poolSize, 
    BlockingQueue<Runnable> workQueue){
    this.workQueue = workQueue;
    // 创建工作线程
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // 提交任务
  void execute(Runnable command){
    workQueue.put(command);
  }
  // 工作线程负责消费任务,并执行任务
  class WorkerThread extends Thread{
    public void run() {
      //循环取任务并执行
      while(true){
        Runnable task = workQueue.take();
        task.run();
      } 
    }
  }  
}

/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = 
  new LinkedBlockingQueue<>(2);
// 创建线程池  
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// 提交任务  
pool.execute(()->{
    System.out.println("hello");
});

在 MyThreadPool 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务。也就是说在使用线程池时,我们编写的代码充当的角色类似于生产者,只要往线程池中放任务即可,线程池自己会获取任务并执行。

底层实现

当然实际线程池的功能比我们前面写的要强大的多,实现也有多种。但无论哪种实现实际上都是在 Executor框架 之下的,什么是 Executor 框架呢?

Executor 框架

实际上它是线程执行规范的抽象,它有三个模块组成:

1. 对任务的抽象(Runnable/Callable)

所要执行的任务都要实现 Runnable 或者 Callable 接口,重写其内部对应的方法,这样才能被线程池接受执行。

其中 Runnable 接口自 Java1.0 以来一直存在,但 Callable 接口仅在 Java1.5 中引入,目的是为了能够返回任务执行之后的结果或者异常的抛出。

具体到代码层面上:

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}
//---------------------------------------//
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

我们可以看到实现 Ruable 接口的类需要重写 run() 方法,该方法不返回任何结果;而实现 Callable 接口的类则中重写的方法是 call(),该方法会返回一个泛型 V,通过这个返回值我们就可以获取到任务执行的结果。

当然两者也是进行相互转化的,通过工具类 Executors 类可以实现 Runnable 对象和 Callable 对象直接的转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

2. 对任务执行的抽象(Executor)

Executor 接口非常简单内部只有一个 void execute(Runnable command) 也就是说所有要接受任务(Runnable)的线程池都必须实现该接口,那 Callable 接口怎么办呢?

我们先看下边这张类关系图:

image.png

从类的继承关系图上可以看到,Executor 下边还有一个子接口 ExecutorService,所有的线程池实现类都实现了该接口,我们看这个接口的结构图:

image.png

可以看到 ExecutorService 接口在实现了 Executor接口 的前提下还定义了许多新的方法,其中 submit(Callable) 就是用来提交 Callable 类型任务的。

3. 异步计算结果的抽象(Future)

此处为什么要强调异步呢?主要原因在于当你提交一个任务到线程池中之后,任务的执行是异步的,不会同步返回给主线程。Future 内部也比较简单,定义了常用对结果操作的方法:

image.png

  • get() 可以阻塞主线程,并获取任务的执行结果
  • get(long,TimeUnit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完
  • cancel(boolean) 可以取消任务的执行

Executor 的执行流程

通过前面 Executor框架 的说明,我们可以看一下下面这张流程示意图:

image.png

  • 主线程可以创建 Ruaable 类型或者 Callable 类型的任务
  • 然后通过 submit() 方法或者 execut() 方法可以将任务放到 ExecutorService 中执行
  • 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现 Future 接口的对象
  • 最后可以通过 Future.get() 方法来阻塞主线程等待任务执行完成获取任务的返回结果。(如果通过 submit(Runnable) 方法获取到的 Future 对象,其 get() 结果为 null 👌)

ThreadPoolExecutor 类

虽然说 Java 提供了多种线程池的实现方案,但最核心的就是 ThreadPoolExecutor,其他大部分类都是在该类基础上进一步封装的。根据《阿里巴巴编码规范》所有线程池都必须由 ThreadPoolExecutor 类创建,由此可见该类的重要性。

具体创建线程池时,我们需要借助于 ThreadPoolExecutor 的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造方法内部的逻辑非常简单,就是初始化各个变量,但每个参数的含义我们必须重点掌握。❗️

  • corePoolSize:线程池保有的最小数量
  • maximumPoolSize:线程池最大的线程数量,就是说在线程池中线程资源不够用时,进行扩充时能够扩充的最大的线程数量
  • workQueue:当核心线程占满后,如果有任务到来需要放到队列中,这个就是提供的线程队列,用来存放待执行的任务,推荐使用阻塞类队列
  • keepAliveTime & unit:当线程池中的线程数量大于 corePoolSize,这些空闲线程不会立即回收而是要等待一段时间,而 keepAliveTime 和 unite 则是对等待时间的定义
  • threadFactory: 线程工程用来创建线程,一般选择默认即可
  • handler:线程阻塞拒绝策略,也称之为饱和策略。如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,线程池会按照定义的饱和策略来拒绝任务
    • AbortPolicy:抛出一个 RejectedExecutionException 异常来拒绝新任务的处理
    • CallerRunsPolicy既不抛弃任务也不抛出异常,直接运行任务的 run 方法,换言之将任务回退给调用者来直接运行。比如在主线程中,通过线程池提交任务,当触发该策略时,主线程会执行任务的 run() 方法
    • DiscardPolicy: 直接丢弃任务,不抛出任何异常
    • DiscardOldestPolicy:丢弃最老的任务,也就是把最先进入阻塞队列的任务丢掉。

具体使用方式其实就是我们前面讲的遵循"Executor 的执行流程",具体来说,我们可以参考下面这段代码:

public class TestThreadPool {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        Future future = null;
        for (int i = 0; i < 10; i++) {
            Task myTask = new Task("thread" + i);
            executor.execute(myTask);
        }
        //任务提交之后终止线程池,但未运行结束的任务继续执行
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("任务执行完成!!");
    }
}

class Task implements Runnable {
    private String command;

    public Task(String s) {
        this.command = s;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

执行结果如下:

pool-1-thread-4 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-3 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-1 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-5 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-2 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-3 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-4 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-5 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-2 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-4 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-2 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-5 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-3 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-1 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-1 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-5 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-1 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-3 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-4 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-2 End. Time = Thu Jul 29 16:55:27 CST 2021

从执行的结果上看,虽然我们提交了 10 个任务,但实际在执行的时候,只创建了 5 个线程,这是怎么回事呢?

image.png

为了搞清楚这个问题,我们看一下 execute() 方法内部的原理:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。有点类似于双重检测的思想
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

也就是说线程池在提交任务时的执行逻辑如下图所示:

image.png

  1. 提交任务之后首先判断核心线程是否已经满了,如果没有的话,创建新线程并执行任务
  2. 如果核心线程满了的话,判断队列是否已经满了,如果没有满的话,先把任务放到队列中
  3. 如果队列也满了,判断线程池是否达到了设定的最大值,如果没有达到的话,创建线程从任务队列中取任务并执行任务放到线程池中执行。
  4. 如果已经达到了最大线程数量,则按照设定的 拒绝策略 拒绝任务

了解了 execute() 方法的执行逻辑之后,我们就理解前边方法的执行结果:因为我们在创建的时候核心线程数量是 5,队列容量是 100。因而在创建 10 个任务往线程池中放的时候,首先会创建五个线程来执行 5 个任务,然后其他 5 个任务会被放到任务队列中。当有线程执行完毕之后,才会从任务队列中取出任务进行执行。

其他线程池方案

当然创建线程池的方法,不止是通过 ThreadPoolExecutor 的构造函数来创建,我们也可以通过 Executors 工具类来创建其他类型的线程池,但大公司的编码规范都不建议这样做,具体原因有两个:

  1. Executors 工具类创建的线程池,其内部本质上还是通过调用 ThreadPoolExecutor 构造方法来创建的
  2. 其他线程池方案都存在一些不足,或者说具有一定的不可控性

FixedThreadPool

FixedThreadPool 被称为可重用固定线程数的线程池,创建方法也非常容易直接调用 Executors.FixedThreadPoll() 方法即可创建,但如果了解其原理,还是要先看一下它的源码:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
}

从源码中可以看到 FixedThreadPoolcorePoolSize maximumPoolSize 都被设置为 nThreads,也就是 FiexedThreadPool 的线程数是一个固定值,不会再扩充;

不足之处:

需要注意的是它选择的任务队列是 LinkedBlockingQueue 这是一个无界的队列 ❗️,这就可能造成一个问题,如果堆积的任务非常多的情况下,由于任务会一直被放到队列锁,最终可能会导致 OOM。

SingleThreadExecutor

SingleThreadExecutor 是一个只有一个线程池,同样的我们先看它的源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

从源码中看,它是把核心线程数量和最大线程数量设置成了 1,这样线程池的数量就会固定只有 1。

不足之处:

它和"FixedThreadPool"一样选择 LinkedBlockingQueue 作为任务队列,最终可能会导致 OOM。

CachedThreadPool

CachedThreadPool 是一个会根据需要创建新线程的线程池。接下来还是看下它的源码:

/**
     * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

从源码中可以看到,它的最大线程数量被设置成了 Integer.Max_Value,并且任务容器选择了 SynchronousQueue 这个队列比较特殊,它内部没有容量,只是起到一个协调作用,如果放到其中的任务没有被消费,它便会阻塞生产者。因此 CachedThreadPoll 线程在遇到新任务时会不断创建新线程来执行。

不足之处:

这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

总结

Java 线程池在设计的时候,是按照 生产者-消费者 的模式来设计的,整个过程遵循 Executor框架,用 Runnable/Callable 来作为任务的抽象;用 Executor 接口作为任务执行的抽象,Future 作为异步执行结果的抽象。在工程实践中,我们都推荐使用 ThreadPoolExecutor 的构造方法来创建线程 k 池,而不推荐 Executors 封装的方法去创建其他类型的线程池。

ThreadPoolExecutor 在创建线程时,一共有六个:

  • corePoolSize:线程池保有的最小数量
  • maximumPoolSize:线程池最大的线程数量,就是说在线程池中线程资源不够用时,进行扩充时能够扩充的最大的线程数量
  • workQueue:当核心线程占满后,如果有任务到来需要放到队列中,这个就是提供的线程队列,用来存放待执行的任务,推荐使用阻塞类队列
  • keepAliveTime & unit:当线程池中的线程数量大于 corePoolSize,这些空闲线程不会立即回收而是要等待一段时间,而 keepAliveTime 和 unite 则是对等待时间的定义
  • threadFactory: 线程工厂用来创建线程,一般选择默认即可
  • handler:线程阻塞拒绝策略,也称之为饱和策略。如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,线程池会按照定义的饱和策略来拒绝任务
    • AbortPolicy:抛出一个 RejectedExecutionException 异常来拒绝新任务的处理
    • CallerRunsPolicy既不抛弃任务也不抛出异常,直接运行任务的 run 方法,换言之将任务回退给调用者来直接运行。比如在主线程中,通过线程池提交任务,当触发该策略时,主线程会执行任务的 run() 方法
    • DiscardPolicy: 直接丢弃任务,不抛出任何异常
    • DiscardOldestPolicy:丢弃最老的任务,也就是把最先进入阻塞队列的任务丢掉。

引用

  1. 《java 线程池与五种常用线程池策略使用与解析》
  2. 《Java 并发进阶常见面试题总结》
  3. 《java 线程池学习总结》
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3186 引用 • 8212 回帖 • 1 关注
  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...