如何让 ThreadPoolExecutor 在排队之前将线程增加到最大值?
线程池的策略是当核心线程处理不下的任务会优先放入队列中,等到队列存满了在开启临时线程去处理任务,这么做有部分场景是不合适的:比如队列很大,等到队列满的时候再去创建线程去消费其实已经于事无补了,再比如线程池是 newFixedThreadPool
他的阻塞队列其实是一个 LinkedBlockingQueue
这个队列其实是没有上限的,是一个无限大小的队列,也就是说这样的线程池永远也不会创建临时线程来消费任务;
我们是否可以实现一个“弹性”的线程池,即优先开启更多线程而不是依赖队列呢,其实可以考虑一下方式:
- 由于线程池在工作队列满了无法入队的情况下会扩容线程池,那么我们是否可以重写队列的 offer 方法,造成这个队列已满的假象呢?
- 由于我们 Hack 了队列,在达到了最大线程后势必会触发拒绝策略,那么能否实现一个自定义的拒绝策略处理程序,这个时候再把任务真正插入队列呢?
stackoverflow 的文章里的内容:
我相信我终于找到了一个有点优雅(也许有点 hacky)的解决方案来解决这个限制 ThreadPoolExecutor
。它涉及到当已经有一些任务排队时 LinkedBlockingQueue
让它返回。如果当前线程跟不上排队任务,TPE 将添加其他线程。如果池已经达到最大线程数,则将调用将其放入队列中的函数。falseRejectedExecutionHandler``put(...)
编写一个 offer(...)
可以返回 false
且 put()
永不阻塞的队列确实很奇怪,所以这就是黑客部分。但这与 TPE 对队列的使用配合良好,因此我认为这样做没有任何问题。
这是代码:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
通过这种机制,当我向队列提交任务时,将会 ThreadPoolExecutor
:
- 最初将线程数扩展到核心大小(此处为 1)。
- 将其提供给队列。如果队列为空,它将排队等待现有线程处理。
- 如果队列已有 1 个或多个元素,
offer(...)
将返回 false。 - 如果返回 false,则增加池中的线程数量,直到达到最大数量(此处为 50)。
- 如果处于最大值则它调用
RejectedExecutionHandler
- 然后
RejectedExecutionHandler
将任务放入队列中,由第一个可用线程按 FIFO 顺序处理。
尽管在上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果您将容量添加到 1000,那么 LinkedBlockingQueue
它将:
- 将线程扩展到最大
- 然后排队直到满 1000 个任务
- 然后阻塞调用者,直到队列有可用空间。
另外,如果您需要 offer(...)
在 RejectedExecutionHandler
then 中使用,则可以使用该 offer(E, long, TimeUnit)
方法来代替 with Long.MAX_VALUE
作为超时。
警告:
如果您希望在执行程序关闭后将任务添加到执行程序中,那么您可能需要在执行程序服务关闭时更聪明地 RejectedExecutionException
抛出我们的自定义。RejectedExecutionHandler
感谢 @RaduToader 指出了这一点。
编辑:
对此答案的另一个调整可能是询问 TPE 是否有空闲线程,并且仅在有空闲线程时才将项目排队。您必须为此创建一个真正的类并 ourQueue.setThreadPoolExecutor(tpe);
在其上添加方法。
那么你的 offer(...)
方法可能看起来像这样:
- 检查看看
tpe.getPoolSize() == tpe.getMaximumPoolSize()
在这种情况下是否只需致电super.offer(...)
。 - else if
tpe.getPoolSize() > tpe.getActiveCount()
then 调用super.offer(...)
,因为似乎有空闲线程。 - 否则返回
false
派生另一个线程。
也许这个:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
请注意,TPE 上的 get 方法非常昂贵,因为它们访问 volatile
字段或(在 的情况下 getActiveCount()
)锁定 TPE 并遍历线程列表。此外,这里存在竞争条件,可能会导致任务不正确地排队或在有空闲线程时分叉另一个线程。
线程池的声明需要手动进行
Java 中的 Executors 类定义了一些快捷的工具方法,来帮助我们快速创建线程池。《阿里巴巴 Java 开发手册》中提到,禁止使用这些方法来创建线程池,而应该手动 newThreadPoolExecutor 来创建线程池。这一条规则的背后,是大量血淋淋的生产事故,最典型的就是 newFixedThreadPool 和 newCachedThreadPool,可能因为资源耗尽导致 OOM 问题。
首先,我们来看一下 newFixedThreadPool 为什么可能会出现 OOM 的问题。
我们写一段测试代码,来初始化一个单线程的 FixedThreadPool,循环 1 亿次向线程池提交任务,每个任务都会创建一个比较大的字符串然后休眠一小时:
@GetMapping("oom1")
public void oom1() throws InterruptedException {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
printStats(threadPool);
for (int i = 0; i < 100000000; i++) {
threadPool.execute(() -> {
String payload = IntStream.rangeClosed(1, 1000000)
.mapToObj(__ -> "a")
.collect(Collectors.joining("")) + UUID.randomUUID().toString();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
}
log.info(payload);
});
}
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.HOURS);
}
执行不久就 OOM 了,原因是因为这个 newFixedThreadPool 线程池的工作队列直接 new 了一个 LinkedBlockingQueue,而默认构造方法的 LinkedBlockingQueue 是一个 Integer.MAX_VALUE 长度的队列,可以认为是无界的;
虽然使用 newFixedThreadPool 可以把工作线程控制在固定的数量上,但任务队列是无界的。如果任务较多并且执行较慢的话,队列可能会快速积压,撑爆内存导致 OOM。
我们再把刚才的例子稍微改一下,改为使用 newCachedThreadPool 方法来获得线程池。程序运行不久后,同样看到了如下 OOM 异常:
1 [11:30:30.487][http-nio-45678-exec-1][ERROR][.a.c.c.C.[.[.[/].[dispatcherSe2 java.lang.OutOfMemoryError: unable to create new native thread
从日志中可以看到,这次 OOM 的原因是无法创建线程,翻看 newCachedThreadPool 的源码可以看到,这种线程池的最大线程数是 Integer.MAX_VALUE,可以认为是没有上限的,而其工作队列 SynchronousQueue 是一个没有存储空间的阻塞队列。这意味着,只要有请求到来,就必须找到一条工作线程来处理,如果当前没有空闲的线程就再创建一条新的。
由于我们的任务需要 1 小时才能执行完成,大量的任务进来后会创建大量的线程。我们知道线程是需要分配一定的内存空间作为线程栈的,比如 1MB,因此无限制创建线程必然会导致 OOM;
其实,大部分 Java 开发同学知道这两种线程池的特性,只是抱有侥幸心理,觉得只是使用线程池做一些轻量级的任务,不可能造成队列积压或开启大量线程。
但,现实往往是残酷的。我之前就遇到过这么一个事故:用户注册后,我们调用一个外部服务去发送短信,发送短信接口正常时可以在 100 毫秒内响应,TPS 100 的注册量,CachedThreadPool 能稳定在占用 10 个左右线程的情况下满足需求。在某个时间点,外部短信服务不可用了,我们调用这个服务的超时又特别长,比如 1 分钟,1 分钟可能就进来了 6000 用户,产生 6000 个发送短信的任务,需要 6000 个线程,没多久就因为无法创建线程导致了 OOM,整个应用程序崩溃。
因此,我同样不建议使用 Executors 提供的两种快捷的线程池,原因如下:
- 我们需要根据自己的场景、并发情况来评估线程池的几个核心参数,包括核心线程数、最大线程数、线程回收策略、工作队列的类型,以及拒绝策略,确保线程池的工作行为符合需求,一般都需要设置有界的工作队列和可控的线程数。
- 任何时候,都应该为自定义线程池指定有意义的名称,以方便排查问题。当出现线程数量暴增、线程死锁、线程占用大量 CPU、线程执行出现异常等问题时,我们往往会抓取线程栈。此时,有意义的线程名称,就可以方便我们定位问题
除了建议手动声明线程池以外,我还建议用一些监控手段来观察线程池的状态。线程池这个组件往往会表现得任劳任怨、默默无闻,除非是出现了拒绝策略,否则压力再大都不会抛出一个异常。如果我们能提前观察到线程池队列的积压,或者线程数量的快速膨胀,往往可以提早发现并解决问题
默认线程池的工作行为:
- 不会初始化 corePoolSize 个线程,有任务来了才创建工作线程;
- 当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中:
- 当工作队列满了后扩容线程池,一直到线程个数达到 maximumPoolSize 为止;。如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理;
- 当线程数大于核心线程数时,线程等待 keepAliveTime 后还是没有任务需要处理的话,收缩线程到核心线程数。
了解这个策略,有助于我们根据实际的容量规划需求,为线程池设置合适的初始化参数。当然,我们也可以通过一些手段来改变这些默认工作行为,比如:
- 声明线程池后立即调用 prestartAllCoreThreads 方法,来启动所有核心线程;
- 传入 true 给 allowCoreThreadTimeOut 方法,来让线程池在空闲的时候同样回收核心线程。
务必确认清楚线程池本身是不是复用的
要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列:
- 对于执行比较慢、数量不大的 IO 任务,或许要考虑更多的线程数,而不需要太大的队列。
- 而对于吞吐量较大的计算型任务,线程数量不宜过多,可以是 CPU 核数或核数*2(理由是,线程一定调度到某个 CPU 进行执行,如果任务本身是 CPU 绑定的任务,那么过多的线程只会增加线程切换的开销,并不能提升吞吐量),但可能需要较长的队列来做缓冲。
盲目复用线程池混用线程的问题在于,别人定义的线程池属性不一定适合你的任务,而且混用会相互干扰。这就好比,我们往往会用虚拟化技术来实现资源的隔离,而不是让所有应用程序都直接使用物理机。
就线程池混用问题,我想再和你补充一个坑:Java 8 的 parallel stream 功能,可以让我们很方便地并行处理集合中的元素,其背后是共享同一个 ForkJoinPool,默认并行度是 CPU 核数-1。对于 CPU 绑定的任务来说,使用这样的配置比较合适,但如果集合操作涉及同步 IO 操作的话(比如数据库操作、外部服务调用等),建议自定义一个 ForkJoinPool(或普通线程池)
重点回顾
线程池管理着线程,线程又属于宝贵的资源,有许多应用程序的性能问题都来自线程池的配置和使用不当。在今天的学习中,我通过三个和线程池相关的生产事故,和你分享了使用线程池的几个最佳实践。
第一,Executors 类提供的一些快捷声明线程池的方法虽然简单,但隐藏了线程池的参数细节。因此,使用线程池时,我们一定要根据场景和需求配置合理的线程数、任务队列、拒绝策略、线程回收策略,并对线程进行明确的命名方便排查问题。
第二,既然使用了线程池就需要确保线程池是在复用的,每次 new 一个线程池出来可能比不用线程池还糟糕。如果你没有直接声明线程池而是使用其他同学提供的类库来获得一个线程池,请务必查看源码,以确认线程池的实例化方式和配置是符合预期的。
第三,复用线程池不代表应用程序始终使用同一个线程池,我们应该根据任务的性质来选用不同的线程池。特别注意 IO 绑定的任务和 CPU 绑定的任务对于线程池属性的偏好,如果希望减少任务间的相互干扰,考虑按需使用隔离的线程池。
最后我想强调的是,线程池作为应用程序内部的核心组件往往缺乏监控(如果你使用类似 RabbitMQ 这样的 MQ 中间件,运维同学一般会帮我们做好中间件监控),往往到程序崩溃后才发现线程池的问题,很被动。在设计篇中我们会重新谈及这个问题及其解决方案。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于