线程池使用与原理

本贴最后更新于 182 天前,其中的信息可能已经时过境迁

概述

在大型项目中,在代码中直接创建线程是不允许的,如果需要使用多线性则必须通过线程池了创建,因而了解线程池的使用规范以及底层实现是非常有必要的。

使用

预想的用法

但说到线程池,我们可能首先会类比连接池这类池化资源,会以为线程池是通过 acquire() 来获取资源,通过 release() 来释放资源,就像下边这样:

//采用一般意义上池化资源的设计方法
class ThreadPool{
  // 获取空闲线程
  Thread acquire() {
  }
  // 释放线程
  void release(Thread t){
  }
} 
//期望的使用
ThreadPool pool
Thread T1=pool.acquire();
//传入Runnable对象
T1.execute(()->{
  //具体业务逻辑
  ......
});

也就是说,我们在使用的时候直接从 pool 中获取一个线程实例 T1,然后直接向 T1 里边传业务逻辑就行了。但实际在使用的时候发现 Thread 类中根本没有类似于 execute(Runnable task) 这样的公共方法,因而线程池是没有办法按照池化的思想来设计的。

实际的用法

业界当前在设计线程池时,普遍采用的都是生产者-消费者模式,类似下面这种形式:

//简化的线程池,仅用来说明工作原理
class MyThreadPool{
  //利用阻塞队列实现生产者-消费者模式
  BlockingQueue<Runnable> workQueue;
  //保存内部工作线程
  List<WorkerThread> threads 
    = new ArrayList<>();
  // 构造方法
  MyThreadPool(int poolSize, 
    BlockingQueue<Runnable> workQueue){
    this.workQueue = workQueue;
    // 创建工作线程
    for(int idx=0; idx<poolSize; idx++){
      WorkerThread work = new WorkerThread();
      work.start();
      threads.add(work);
    }
  }
  // 提交任务
  void execute(Runnable command){
    workQueue.put(command);
  }
  // 工作线程负责消费任务,并执行任务
  class WorkerThread extends Thread{
    public void run() {
      //循环取任务并执行
      while(true){
        Runnable task = workQueue.take();
        task.run();
      } 
    }
  }  
}

/** 下面是使用示例 **/
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = 
  new LinkedBlockingQueue<>(2);
// 创建线程池  
MyThreadPool pool = new MyThreadPool(
  10, workQueue);
// 提交任务  
pool.execute(()->{
    System.out.println("hello");
});

在 MyThreadPool 的内部,维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务。也就是说在使用线程池时,我们编写的代码充当的角色类似于生产者,只要往线程池中放任务即可,线程池自己会获取任务并执行。

底层实现

当然实际线程池的功能比我们前面写的要强大的多,实现也有多种。但无论哪种实现实际上都是在 Executor框架 之下的,什么是 Executor 框架呢?

Executor 框架

实际上它是线程执行规范的抽象,它有三个模块组成:

1. 对任务的抽象(Runnable/Callable)

所要执行的任务都要实现 Runnable 或者 Callable 接口,重写其内部对应的方法,这样才能被线程池接受执行。

其中 Runnable 接口自 Java1.0 以来一直存在,但 Callable 接口仅在 Java1.5 中引入,目的是为了能够返回任务执行之后的结果或者异常的抛出。

具体到代码层面上:

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}
//---------------------------------------//
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

我们可以看到实现 Ruable 接口的类需要重写 run() 方法,该方法不返回任何结果;而实现 Callable 接口的类则中重写的方法是 call(),该方法会返回一个泛型 V,通过这个返回值我们就可以获取到任务执行的结果。

当然两者也是进行相互转化的,通过工具类 Executors 类可以实现 Runnable 对象和 Callable 对象直接的转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

2. 对任务执行的抽象(Executor)

Executor 接口非常简单内部只有一个 void execute(Runnable command) 也就是说所有要接受任务(Runnable)的线程池都必须实现该接口,那 Callable 接口怎么办呢?

我们先看下边这张类关系图:

image.png

从类的继承关系图上可以看到,Executor 下边还有一个子接口 ExecutorService,所有的线程池实现类都实现了该接口,我们看这个接口的结构图:

image.png

可以看到 ExecutorService 接口在实现了 Executor接口 的前提下还定义了许多新的方法,其中 submit(Callable) 就是用来提交 Callable 类型任务的。

3. 异步计算结果的抽象(Future)

此处为什么要强调异步呢?主要原因在于当你提交一个任务到线程池中之后,任务的执行是异步的,不会同步返回给主线程。Future 内部也比较简单,定义了常用对结果操作的方法:

image.png

Executor 的执行流程

通过前面 Executor框架 的说明,我们可以看一下下面这张流程示意图:

image.png

ThreadPoolExecutor 类

虽然说 Java 提供了多种线程池的实现方案,但最核心的就是 ThreadPoolExecutor,其他大部分类都是在该类基础上进一步封装的。根据《阿里巴巴编码规范》所有线程池都必须由 ThreadPoolExecutor 类创建,由此可见该类的重要性。

具体创建线程池时,我们需要借助于 ThreadPoolExecutor 的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                            int maximumPoolSize,
                            long keepAliveTime,
                            TimeUnit unit,
                            BlockingQueue<Runnable> workQueue,
                            ThreadFactory threadFactory,
                            RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造方法内部的逻辑非常简单,就是初始化各个变量,但每个参数的含义我们必须重点掌握。❗️

具体使用方式其实就是我们前面讲的遵循"Executor 的执行流程",具体来说,我们可以参考下面这段代码:

public class TestThreadPool {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(QUEUE_CAPACITY),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        Future future = null;
        for (int i = 0; i < 10; i++) {
            Task myTask = new Task("thread" + i);
            executor.execute(myTask);
        }
        //任务提交之后终止线程池,但未运行结束的任务继续执行
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("任务执行完成!!");
    }
}

class Task implements Runnable {
    private String command;

    public Task(String s) {
        this.command = s;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

执行结果如下:

pool-1-thread-4 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-3 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-1 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-5 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-2 Start. Time = Thu Jul 29 16:55:17 CST 2021
pool-1-thread-3 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-4 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-5 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-2 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-4 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-2 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-5 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-3 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-1 End. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-1 Start. Time = Thu Jul 29 16:55:22 CST 2021
pool-1-thread-5 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-1 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-3 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-4 End. Time = Thu Jul 29 16:55:27 CST 2021
pool-1-thread-2 End. Time = Thu Jul 29 16:55:27 CST 2021

从执行的结果上看,虽然我们提交了 10 个任务,但实际在执行的时候,只创建了 5 个线程,这是怎么回事呢?

image.png

为了搞清楚这个问题,我们看一下 execute() 方法内部的原理:

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   
    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。有点类似于双重检测的思想
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

也就是说线程池在提交任务时的执行逻辑如下图所示:

image.png

  1. 提交任务之后首先判断核心线程是否已经满了,如果没有的话,创建新线程并执行任务
  2. 如果核心线程满了的话,判断队列是否已经满了,如果没有满的话,先把任务放到队列中
  3. 如果队列也满了,判断线程池是否达到了设定的最大值,如果没有达到的话,创建线程从任务队列中取任务并执行任务放到线程池中执行。
  4. 如果已经达到了最大线程数量,则按照设定的 拒绝策略 拒绝任务

了解了 execute() 方法的执行逻辑之后,我们就理解前边方法的执行结果:因为我们在创建的时候核心线程数量是 5,队列容量是 100。因而在创建 10 个任务往线程池中放的时候,首先会创建五个线程来执行 5 个任务,然后其他 5 个任务会被放到任务队列中。当有线程执行完毕之后,才会从任务队列中取出任务进行执行。

其他线程池方案

当然创建线程池的方法,不止是通过 ThreadPoolExecutor 的构造函数来创建,我们也可以通过 Executors 工具类来创建其他类型的线程池,但大公司的编码规范都不建议这样做,具体原因有两个:

  1. Executors 工具类创建的线程池,其内部本质上还是通过调用 ThreadPoolExecutor 构造方法来创建的
  2. 其他线程池方案都存在一些不足,或者说具有一定的不可控性

FixedThreadPool

FixedThreadPool 被称为可重用固定线程数的线程池,创建方法也非常容易直接调用 Executors.FixedThreadPoll() 方法即可创建,但如果了解其原理,还是要先看一下它的源码:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
}

从源码中可以看到 FixedThreadPoolcorePoolSize maximumPoolSize 都被设置为 nThreads,也就是 FiexedThreadPool 的线程数是一个固定值,不会再扩充;

不足之处:

需要注意的是它选择的任务队列是 LinkedBlockingQueue 这是一个无界的队列 ❗️,这就可能造成一个问题,如果堆积的任务非常多的情况下,由于任务会一直被放到队列锁,最终可能会导致 OOM。

SingleThreadExecutor

SingleThreadExecutor 是一个只有一个线程池,同样的我们先看它的源码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

从源码中看,它是把核心线程数量和最大线程数量设置成了 1,这样线程池的数量就会固定只有 1。

不足之处:

它和一样选择 LinkedBlockingQueue 作为任务队列,最终可能会导致 OOM。

CachedThreadPool

CachedThreadPool 是一个会根据需要创建新线程的线程池。接下来还是看下它的源码:

/**
     * 创建一个线程池,根据需要创建新线程,但会在先前构建的线程可用时重用它。
     */
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

从源码中可以看到,它的最大线程数量被设置成了 Integer.Max_Value,并且任务容器选择了 SynchronousQueue 这个队列比较特殊,它内部没有容量,只是起到一个协调作用,如果放到其中的任务没有被消费,它便会阻塞生产者。因此 CachedThreadPoll 线程在遇到新任务时会不断创建新线程来执行。

不足之处:

这也就意味着如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,CachedThreadPool 会不断创建新的线程。极端情况下,这样会导致耗尽 cpu 和内存资源。

总结

Java 线程池在设计的时候,是按照 生产者-消费者 的模式来设计的,整个过程遵循 Executor框架,用 Runnable/Callable 来作为任务的抽象;用 Executor 接口作为任务执行的抽象,Future 作为异步执行结果的抽象。在工程实践中,我们都推荐使用 ThreadPoolExecutor 的构造方法来创建线程 k 池,而不推荐 Executors 封装的方法去创建其他类型的线程池。

ThreadPoolExecutor 在创建线程时,一共有六个:

引用

  1. 《java 线程池与五种常用线程池策略使用与解析》
  2. 《Java 并发进阶常见面试题总结》
  3. 《java 线程池学习总结》
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3014 引用 • 8158 回帖 • 546 关注
  • 并发
    73 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...