多线程之线程池小纪

本贴最后更新于 1932 天前,其中的信息可能已经东海扬尘

cover

关于我是线程池

并发总是离不开多线程,多线程的应用能够更好地帮助我们协调利用 CPU、Memory、Net、I/O 等系统资源。频繁的创建、销毁线程会浪费大量的系统资源,增加并发编程的风险。利用线程池可以实现类似主次线程隔离定时执行定时执行周期执行等任务。作用包括:

  1. 利用线程池管理并复用线程、控制最大并发数等。
  2. 实现某些与时间相关的功能,如定时执行、周期执行等。
  3. 隔离线程环境。比如交易服务搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要更大;因此,通过配置读的线程池,将两者隔开,避免个服务线程相互影响。

关于线程池的基础概念和一些简单场景,可以看看这篇文章:线程池开门营业招聘开发人员的一天

迷思

如下是我定义的一个线程工具类,我定义了核心线程数量大小为 4;最大核心线程数量大小为 8, LinkedBlockingQueue 容量大小未初始化,也未定义一个 handle,当我在利用这个线程池生产线程的过程中发现,当创建速度大于它的处理速度时,核心线程数量依旧是 4 个

嗯?说好的,最大核心线程数不是 BUG,8 个吗?难道当前不应该是 8 个?

public class ThreadPoolUtils { private static final Logger logger = LoggerFactory.getLogger(ThreadPoolUtils.class); /** * 线程池维护线程的最少数量 */ private static final int SIZE_CORE_POOL = 4; /** * 线程池维护线程的最大数量 */ private static final int SIZE_MAX_POOL = 8; /** * 禁止手动初始化 */ private ThreadPoolUtils() {} public static void printPoolInfo() { logger.info("当前线程Pool的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getPoolSize()); logger.info("当前task的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getTaskCount()); logger.info("当前执行task的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getActiveCount()); logger.info("当前完成task的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getCompletedTaskCount()); } /** * 通过枚举创建单例对象 */ private enum Singleton { /** * 线程池单例 */ SINGLETON; private ThreadPoolExecutor threadPool; private ScheduledExecutorService service; Singleton() { // 为线程命名 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("线程池工具类-pool-%d").build(); // 创建线程池1 threadPool = new ThreadPoolExecutor( SIZE_CORE_POOL, SIZE_MAX_POOL, 10L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), namedThreadFactory); // 创建线程池2 service = Executors.newScheduledThreadPool(4); } /** * 返回单例对象 */ public ThreadPoolExecutor getThreadPool() { return threadPool; } public ScheduledExecutorService getScheduledThreadPool() { return service; } } /** * 向池中添加任务 * 单次执行 * @param task */ public static void addExecuteTask(Runnable task) { if (task != null) { ThreadPoolExecutor threadPoolExecutor = Singleton.SINGLETON.getThreadPool(); threadPoolExecutor.execute(task); } } public static void addScheduleTask(Runnable task) { Singleton.SINGLETON.getScheduledThreadPool().scheduleWithFixedDelay(task, 5, 3, TimeUnit.SECONDS); } }

探究

看来还是 basic 不够扎实啊,学的是个 JB!我们看一下 ThreadPoolExecutor 的源码,查看下的他的 4 个构造方法如下图,我们来看看比较难懂的几个参数:

ThreadPoolExecutor

  • 第 5 个参数: workQueue 表示缓存队列。当请求的线程大于 maximumPoolSize 时,线程进入 BlockingQueue 阻塞队列。是一个生产消费模型队列。
  • 第 7 个参数: handle 表示执行拒绝策略的对象。当超过第 5 个参数 workQueue 的任务缓存区上限的时候,就可以通过该策略处理请求,是一种简单的限流保护。

那么,我们上面的实例化是怎么写的?

// 创建线程池1 threadPool = new ThreadPoolExecutor( SIZE_CORE_POOL, SIZE_MAX_POOL, 10L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), namedThreadFactory);

我们把 LinkedBlockingQueue<Runnable>() 作为缓存队列,我们不关心的它内部实现,通过源码可以知道它是一种无限队列,构造器容量默认值大小是 Integer.MAX_VALUE ,往往在生产场景中很难达到这个值,所以像我上面这样写是极其不科学的,应该根据实际场景设置一个可承载容量大小,并配合 handle 做出拒绝策略,才是一个完整的流程。

我们稍微熟悉了它的构造方法之后,怎么知道它是如何工作的呢?另外我之前的迷思,为什么核心线程数始终等于 4 呢?

原因

首先我们可以通过源码查看 execute 方法:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); // 1. 如果当前线程数 小于 corePoolSize,则尝试添加新线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2. 尝试向workQueue添加队列(offer方法在workQueue没有容量时,添加失败),线程已经存在不会创建新的线程,如果不存在则创建新的线程。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3. 添加新线程,此处会比较maximumPoolSize,如果大于maximumPoolSize,则会使用饱和策略 else if (!addWorker(command, false)) reject(command); }

addWorker 方法主要是动态的调整线程池的线程数量。从 execute 方法和 addWorker 方法可以看出,当前线程数优先与 corePoolSize 比较,大于 corePoolSize ,则与 workQueue 容量比较;如果当前线程数大于 workQueue 容量,则与 maximumPoolSize 比较;如果当前线程数大于 maximumPoolSize,则执行饱和策略;最后,根据饱和策略做出相应的处理。

所以我粗略的总结下当 corePoolSize(核心线程数)满了,接下来的线程先进入 workQueue(任务队列),当队列也满了之后,创建新线程,直到达到 maximumPoolSize(最大线程数),之后再尝试创建线程时,会进入拒绝 rejectedExecution。

所以为什么线程池的核心线程数一直是 4 个,因为多余的都处在任务队列阻塞中,由于未设置一个容量大小,所以这个容量非常的大,其实是超出我们的处理能力的,我们程序始终就也没能够达到最大线程数。或者可以这么理解,这个 maximumPoolSize 算是一种比较坏(极限)的情况,很少情况并不会真的按照这个数量处理任务,只有当任务队列都不够时,才会继续创建线程,直到达到最大线程数,超过了之后就必须要 handle 来处理拒绝策略了。

测试

好了,既然大致了解了线程池的工作原理之后,可以进行一个测试来验证以下是否符合:

public class TestThreadPool { private static ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("factory-pool-%d").build(); private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 10L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(50), tf); public static void main(String[] args) throws InterruptedException { System.out.println("================start================"); for (int i = 0; i<100; i++) { threadPoolExecutor.execute(new Task(String.valueOf(i))); } System.out.println("================end================"); } } class Task implements Runnable { private static final Logger logger = LoggerFactory.getLogger(TestThreadPool.class); private String name; public Task(String name) { this.name = "[ " + name + " ]"; } @Override public void run() { logger.info(name + "只要干不死,就往死里干,奥利干!!"); // System.out.println(name + "奥利给!!"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }

一个线程类 Task ,在 run 方法中打印一句「正能量」,然后 sleep 2 秒来模拟处理这个任务。我们创建一个核心线程数为 4,最大线程数 8,容量大小为 50LinkedBlockingQueue 队列,然后在循环中持续创建线程。当成功创建完这 100 个线程之后,应该会有 ====================end==================== 打印出来。

我们可以期待一下结果是什么?

按照上面的工作流程来说,有几种情况:

  1. 线程处理的速度远远大于线程创建的速度,可能 4 个核心数都完全够用,甚至用不到 workQueue ,最后打印了 end。emmm,当然从我们写的测试代码来说几乎是不可能的,for 循环表示:烙呢?中国 🇨 🇳 速度嗷!
  2. 线程创建的速度大于回收速度,但是 workQueuemaximumPoolSize 完全可以支撑,100 个线程创建成功并完成任务。
  3. corePoolSizeworkQueue 以及 maximumPoolSize 都过载,丢弃任务并抛出 RejectedExecutionException 异常了。

其实可以很明显知道, sleep 2 秒加上 logger.info() 方法,线程的创建的速度一定是大大于执行的。按照 4、8、50 的配置,当地 58 个创建被创建成功之后,要是目前没有任何一个线程被释放的话,第 59 个线程会因为上限问题而被拒绝,这时候就会抛出异常了。当然这是个 for 循环,产生的速度足够快,基本上 100 次循环完成,第一个线程都没完成,所以可以大胆猜测, logger 一共会打印 58 行日志,并伴随着 RejectedExecutionException 的出现。

我们看一下运行的结果:

异常它 lei 了

果然第 58 个线程被创建之后,后续第 59 个线程想被创建就抛出了异常,如图刚好是 58 行(0 ~ 57),也没有 ====================end==================== 的出现。

奥利给

当我们把这个线程类的 run 方法分别改成如下:

@Override public void run() { // logger.info(name + "只要干不死,就往死里干,奥利干!!"); System.out.println(name + "奥利给!!"); }
@Override public void run() { // logger.info(name + "只要干不死,就往死里干,奥利干!!"); // System.out.println(name + "奥利给!!"); logger.info(name + "奥利给!!"); }

分别看下结果:

奥利给

奥利给大失败

看来 org.slf4j.Logger.info() 的耗时不是一般的长,比 System.out.println() 还长。

  • 线程
    123 引用 • 111 回帖 • 3 关注
  • 并发
    75 引用 • 73 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...