关于我是线程池
并发总是离不开多线程,多线程的应用能够更好地帮助我们协调利用 CPU、Memory、Net、I/O 等系统资源。频繁的创建、销毁线程会浪费大量的系统资源,增加并发编程的风险。利用线程池可以实现类似主次线程隔离、定时执行、定时执行、周期执行等任务。作用包括:
- 利用线程池管理并复用线程、控制最大并发数等。
- 实现某些与时间相关的功能,如定时执行、周期执行等。
- 隔离线程环境。比如交易服务和搜索服务在同一台服务器上,分别开启两个线程池,交易线程的资源消耗明显要更大;因此,通过配置读的线程池,将两者隔开,避免个服务线程相互影响。
关于线程池的基础概念和一些简单场景,可以看看这篇文章:线程池开门营业招聘开发人员的一天
迷思
如下是我定义的一个线程工具类,我定义了核心线程数量大小为 4;最大核心线程数量大小为 8, LinkedBlockingQueue
容量大小未初始化,也未定义一个 handle,当我在利用这个线程池生产线程的过程中发现,当创建速度大于它的处理速度时,核心线程数量依旧是 4 个。
嗯?说好的,最大核心线程数不是 BUG,8 个吗?难道当前不应该是 8 个?
public class ThreadPoolUtils {
private static final Logger logger = LoggerFactory.getLogger(ThreadPoolUtils.class);
/**
* 线程池维护线程的最少数量
*/
private static final int SIZE_CORE_POOL = 4;
/**
* 线程池维护线程的最大数量
*/
private static final int SIZE_MAX_POOL = 8;
/**
* 禁止手动初始化
*/
private ThreadPoolUtils() {}
public static void printPoolInfo() {
logger.info("当前线程Pool的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getPoolSize());
logger.info("当前task的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getTaskCount());
logger.info("当前执行task的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getActiveCount());
logger.info("当前完成task的数量 = [{}]",Singleton.SINGLETON.getThreadPool().getCompletedTaskCount());
}
/**
* 通过枚举创建单例对象
*/
private enum Singleton {
/**
* 线程池单例
*/
SINGLETON;
private ThreadPoolExecutor threadPool;
private ScheduledExecutorService service;
Singleton() {
// 为线程命名
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("线程池工具类-pool-%d").build();
// 创建线程池1
threadPool = new ThreadPoolExecutor(
SIZE_CORE_POOL,
SIZE_MAX_POOL,
10L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
namedThreadFactory);
// 创建线程池2
service = Executors.newScheduledThreadPool(4);
}
/**
* 返回单例对象
*/
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
public ScheduledExecutorService getScheduledThreadPool() {
return service;
}
}
/**
* 向池中添加任务
* 单次执行
* @param task
*/
public static void addExecuteTask(Runnable task) {
if (task != null) {
ThreadPoolExecutor threadPoolExecutor = Singleton.SINGLETON.getThreadPool();
threadPoolExecutor.execute(task);
}
}
public static void addScheduleTask(Runnable task) {
Singleton.SINGLETON.getScheduledThreadPool().scheduleWithFixedDelay(task, 5, 3, TimeUnit.SECONDS);
}
}
探究
看来还是 basic 不够扎实啊,学的是个 JB!我们看一下 ThreadPoolExecutor
的源码,查看下的他的 4 个构造方法如下图,我们来看看比较难懂的几个参数:
- 第 5 个参数:
workQueue
表示缓存队列。当请求的线程大于maximumPoolSize
时,线程进入BlockingQueue
阻塞队列。是一个生产消费模型队列。 - 第 7 个参数:
handle
表示执行拒绝策略的对象。当超过第 5 个参数workQueue
的任务缓存区上限的时候,就可以通过该策略处理请求,是一种简单的限流保护。
那么,我们上面的实例化是怎么写的?
// 创建线程池1
threadPool = new ThreadPoolExecutor(
SIZE_CORE_POOL,
SIZE_MAX_POOL,
10L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
namedThreadFactory);
我们把 LinkedBlockingQueue<Runnable>()
作为缓存队列,我们不关心的它内部实现,通过源码可以知道它是一种无限队列,构造器容量默认值大小是 Integer.MAX_VALUE
,往往在生产场景中很难达到这个值,所以像我上面这样写是极其不科学的,应该根据实际场景设置一个可承载容量大小,并配合 handle
做出拒绝策略,才是一个完整的流程。
我们稍微熟悉了它的构造方法之后,怎么知道它是如何工作的呢?另外我之前的迷思,为什么核心线程数始终等于 4 呢?
原因
首先我们可以通过源码查看 execute
方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 1. 如果当前线程数 小于 corePoolSize,则尝试添加新线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 尝试向workQueue添加队列(offer方法在workQueue没有容量时,添加失败),线程已经存在不会创建新的线程,如果不存在则创建新的线程。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 添加新线程,此处会比较maximumPoolSize,如果大于maximumPoolSize,则会使用饱和策略
else if (!addWorker(command, false))
reject(command);
}
而 addWorker
方法主要是动态的调整线程池的线程数量。从 execute 方法和 addWorker 方法可以看出,当前线程数优先与 corePoolSize 比较,大于 corePoolSize ,则与 workQueue 容量比较;如果当前线程数大于 workQueue 容量,则与 maximumPoolSize 比较;如果当前线程数大于 maximumPoolSize,则执行饱和策略;最后,根据饱和策略做出相应的处理。
所以我粗略的总结下当 corePoolSize(核心线程数)满了,接下来的线程先进入 workQueue(任务队列),当队列也满了之后,创建新线程,直到达到 maximumPoolSize(最大线程数),之后再尝试创建线程时,会进入拒绝 rejectedExecution。
所以为什么线程池的核心线程数一直是 4 个,因为多余的都处在任务队列阻塞中,由于未设置一个容量大小,所以这个容量非常的大,其实是超出我们的处理能力的,我们程序始终就也没能够达到最大线程数。或者可以这么理解,这个 maximumPoolSize 算是一种比较坏(极限)的情况,很少情况并不会真的按照这个数量处理任务,只有当任务队列都不够时,才会继续创建线程,直到达到最大线程数,超过了之后就必须要 handle 来处理拒绝策略了。
测试
好了,既然大致了解了线程池的工作原理之后,可以进行一个测试来验证以下是否符合:
public class TestThreadPool {
private static ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("factory-pool-%d").build();
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 8, 10L,
TimeUnit.MINUTES, new LinkedBlockingQueue<>(50), tf);
public static void main(String[] args) throws InterruptedException {
System.out.println("================start================");
for (int i = 0; i<100; i++) {
threadPoolExecutor.execute(new Task(String.valueOf(i)));
}
System.out.println("================end================");
}
}
class Task implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(TestThreadPool.class);
private String name;
public Task(String name) {
this.name = "[ " + name + " ]";
}
@Override
public void run() {
logger.info(name + "只要干不死,就往死里干,奥利干!!");
// System.out.println(name + "奥利给!!");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
一个线程类 Task
,在 run
方法中打印一句「正能量」,然后 sleep
2 秒来模拟处理这个任务。我们创建一个核心线程数为 4,最大线程数 8,容量大小为 50 的 LinkedBlockingQueue
队列,然后在循环中持续创建线程。当成功创建完这 100 个线程之后,应该会有 ====================end====================
打印出来。
我们可以期待一下结果是什么?
按照上面的工作流程来说,有几种情况:
- 线程处理的速度远远大于线程创建的速度,可能 4 个核心数都完全够用,甚至用不到
workQueue
,最后打印了 end。emmm,当然从我们写的测试代码来说几乎是不可能的,for 循环表示:烙呢?中国 🇨 🇳 速度嗷! - 线程创建的速度大于回收速度,但是
workQueue
和maximumPoolSize
完全可以支撑,100 个线程创建成功并完成任务。 - 当
corePoolSize
和workQueue
以及maximumPoolSize
都过载,丢弃任务并抛出RejectedExecutionException
异常了。
其实可以很明显知道, sleep
2 秒加上 logger.info()
方法,线程的创建的速度一定是大大于执行的。按照 4、8、50 的配置,当地 58 个创建被创建成功之后,要是目前没有任何一个线程被释放的话,第 59 个线程会因为上限问题而被拒绝,这时候就会抛出异常了。当然这是个 for
循环,产生的速度足够快,基本上 100 次循环完成,第一个线程都没完成,所以可以大胆猜测, logger
一共会打印 58 行日志,并伴随着 RejectedExecutionException
的出现。
我们看一下运行的结果:
果然第 58 个线程被创建之后,后续第 59 个线程想被创建就抛出了异常,如图刚好是 58 行(0 ~ 57),也没有 ====================end====================
的出现。
奥利给
当我们把这个线程类的 run
方法分别改成如下:
@Override
public void run() {
// logger.info(name + "只要干不死,就往死里干,奥利干!!");
System.out.println(name + "奥利给!!");
}
@Override
public void run() {
// logger.info(name + "只要干不死,就往死里干,奥利干!!");
// System.out.println(name + "奥利给!!");
logger.info(name + "奥利给!!");
}
分别看下结果:
看来 org.slf4j.Logger.info()
的耗时不是一般的长,比 System.out.println()
还长。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于