1.线程状态
new:尚未启动的线程
runnable:在Java虚拟机中执行的线程
blocked:被阻塞等待监视器锁定的线程
waiting:正在等待 执行特定动作的线程
timed_waiting:正在等待另一个线程执行动作达到指定等待时间的线程
terminated(终止、结束):已退出的线程
2.线程阻塞
sleep
yield(),放弃cpu,不阻塞
suspend() 和 resume():
wait() 和 notify()
线程a.join:等线程a执行完了,本线程再继续
1.wait(),notify()和 suspend(),resume()之间的区别
1.sleep()、suspend()和resume()、yield()阻塞时都不会释放占用的锁(如果占用了的话),wait(),notify()方法阻塞时要释放占用的锁
2.wait和notify时object,锁是任意对象都具有的,调用任意对象的wait导致线程阻塞,因此他俩也需要在**synchronized方法或块中调用**
3.notify只会唤醒wait阻塞的线程中随机选取一个
2.wait 和 sleep 区别
wait属于object,sleep属于thread类
wait释放锁,sleep不释放锁
wait只能在同步方法或代码块中使用,sleep任意地方使用
wait不需要捕获异常,sleep需要捕获异常
3.线程池
1.ThreadPoolExecutor 参数
corePoolSize:核心线程数,创建后会销毁
小于等于corePoolSize的时候,会创建新线程
>corePoolSize的时候,放到队列里,队列满了,创建新线程,直到大于maxNumPoolSize,被拒绝
maxNumPoolSize:最大线程数,包括核心线程和非核心线程,非核心线程创建后会自动销毁
keepAliveTime:线程存活时间,非核心线程的空闲时间超过keepAliveTime则会终止
unit:存活时间单位
workQueue:任务队列
SynchronousQueue:没有任何容量
ArrayBlockingQueue 基于数组实现的阻塞队列
LinkedBlockingQueue 其实也是有界队列,如果不设置最大值,就成了无界队列,内部是基于链表实现的
threadFactory:线程工厂
handler:拒绝策略
AbortPolicy:直接报错(默认)
DiscardOldestPolicy:队列头部任务被删除,然后重新尝试执行任务(如果再次失败,则重复此过程)
DiscardPolicy:直接丢弃
CallerRunsPolicy:调用者线程运行该任务
2.线程池创建方法
Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;
Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;
Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序;
Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池;
Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池;
Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。
ThreadPoolExecutor:最原始的创建线程池的方式,它包含了 7 个参数可供设置,后面会详细讲。
4.execute()方法和 submit()
execute:
线程池顶级接口Executor定义
只能接收实现Runnable接口类型的任务,返回值是void
run方法有异常会打印
submit
ExecutorService接口定义
接收Runnable,Callable类型的任务,返回值是Future
run方法有异常不会打印
5.源码相关
线程池状态和线程数用一个int类型存储,按位操作,高3位标识状态,29位标识数量
1.提交线程
一种是submit,一种是execute。submit方法提交到线程池后会返回一个Future对象,可以使用Future跟踪线程执行是否执行完成及获取结果,其实是将任务封装成RunnableFuture对象,真正的执行也是调用的execute方法,而execute方法不带返回值
// 提交Runnable任务,返回Future对象
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 提交Runnable任务,返回Future对象,带返回值result
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 提交Callable任务,返回Future对象,具体的返回值是实现Callable的call方法的返回值。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 执行任务的具体实现方法
public void execute(Runnable command) {
// 任务不能为null
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 线程池中的线程数小于核心核心线程数,则添加核心线程,由核心线程去执行任务,然后核心线程会循环从阻塞队列等待获取任务并执行。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 走到这里有两种情况,1:线程池的线程数已经>=核心线程数了,2:线程池的线程数没有达到核心线程数但添加核心线程失败了。如果线程还是Running状态,则先把任务添加到阻塞队列里
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
// 线程池非Running状态并且移除阻塞队列移除该任务成功,则执行拒绝策略。
reject(command);
else if (workerCountOf(recheck) == 0)
// 工作线程数是0的情况下,增加一个新工作线程
addWorker(null, false);
}
// 走到这里说明线程数已经>=核心线程数了,而且往阻塞队列添加任务也失败了,可能是队列满了,这时候就尝试添加非核心线程,成功则结果,失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
final void reject(Runnable command) {
// 拒绝策略,调用构造线程池时传递的RejectedExecutionHandler对象rejectedExecution方法
handler.rejectedExecution(command, this);
}
2.添加工作线程 addWorker
/*
* 添加工作线程
* @param firstTask 具体任务,可以为null
* @param core 是否核心线程表示,线程本质是不区分核心还是分核心的,只是用这个来标识要添加哪一种类型的线程,做判断用的。比如true的情况下,当前线程池的线程数要 < corePoolSize才能继续添加线程,为false的情况下,当前线程池的线程数要 < maximumPoolSize才能继续添加线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断是否需要添加工作线程,线程池状态>=SHUTDOWN时,再判断如果状态>SHUTDOWN或者传参firstTask不等于null或者阻塞队列没有任务这三种情况则不添加新的工作线程。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 这里采用自旋+cas操作,目的就是将工作线程数加1
for (;;) {
int wc = workerCountOf(c);
// 目前线程数>=线程池可以创建的理论线程数最大值则不再创建工作线程,或者要创建核心线程但线程池线程数已经达到corePoolSize,或者要创建非核心线程但线程池线程数已经达到maximumPoolSize,这两种情况也不再创建工作线程,直接返回添加工作线程失败标识。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// cas操作线程数加1,成功则跳出内层循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 因为addWorker没有加锁操作,这里再判断一次线程池状态,拿到ctl最新值
if (runStateOf(c) != rs)
continue retry;
}
}
// 以上只是将ctl的线程数加1了,以下是真正的创建一个工作线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 构造一个Worker对象,构造方法里就实例化了一个具体的Thread线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 创建工作线程时,需要加锁,防止其他线程并发操作。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 判断线程池状态,并将Worker放入工作线程集合里
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 这里就是标记线程池里创建线程的最大值,这个值最大也不会超过maximumPoolSize。
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动工作线程
t.start();
workerStarted = true;
}
}
} finally {
// 如果工作线程没有启动,说明添加工作线程失败,需要把之前ctl线程数加1的操作回滚,就是ctl线程数减1,如果已经添加到工作线程集合里也需要移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 涉及工作线程的相关操作都需要加锁
mainLock.lock();
try {
// 从工作线程集合里移除worker
if (w != null)
workers.remove(w);
// cas操作ctl线程数减1
decrementWorkerCount();
// 判断是否需要终止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}
//Worker类是ThreadPoolExecutor内部类,就是具体的工作线程类,继承了AbstractQueuedSynchronizer并实现了Runnable接口,主要利用了AQS的同步机制来控制对工作线程的并发访问
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// Worker的Thread属性,其实干活的就是这个线程
final Thread thread;
// 任务
Runnable firstTask;
// 线程已经执行完成的任务总数
volatile long completedTasks;
// 构造方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 以当前对象创建Thread线程,线程执行时调用的就是这个类的run方法
this.thread = getThreadFactory().newThread(this);
}
// run方法执行任务,调用的是外部ThreadPoolExecutor的runWorker方法
public void run() {
runWorker(this);
}
...
}
1.工作线程count +1
2.创建一个Work t,加到当前工作线程集合works里面,启动工作线程t.start
3.任务执行
public void run() {
// 其实线程真正的执行逻辑是外部的ThreadPoolExecutor类的runWorker方法
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果获取的任务为null,则线程会销毁,其实线程的复用核心就是在这里,线程在while循环中不停的去获取任务并执行。
while (task != null || (task = getTask()) != null) {
w.lock();
// 判断线程的状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 可自定义任务执行前的逻辑,默认空实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务的业务逻辑
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 可自定义任务执行后的逻辑,默认空实现
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的任务数加1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 没有拿到任务,线程销毁操作,completedAbruptly标识,false:正常退出,true:异常退出
processWorkerExit(w, completedAbruptly);
}
}
4.任务获取
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
// 循环调用,其中会判断线程池状态
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池即将关闭状态,如果阻塞队列中也没有任务了,返回null,runWorker方法没有拿到task则退出while循环,销毁线程。
// 这里根据shutdown和shutdownNow设置不同的线程池状态走不同的逻辑
// 如果线程池状态是STOP则直接线程数减1,然后返回null,runWorker方法会退出while循环,线程销毁
// 如果线程池状态是SHUTDOWN则再看看阻塞队列是否为空,为空则线程数减1,后续线程销毁,不会空则继续获取任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
// 获取当前线程数
int wc = workerCountOf(c);
// 是否允许超时标识,allowCoreThreadTimeOut核心线程是否允许超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 非核心线程过多或者允许超时的情况下,如果队列为空则工作线程减1,后续销毁线程,这里就返回null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 允许核心线程超时或者线程数大于核心线程数时,采用poll取数据,非阻塞,超过keepAliveTime没有获取到数据就继续自旋获取任务,
// 不允许核心线程超时或者线程数小于等于核心线程数时,采用take取数据,阻塞等待直到获取到任务或者被中断
//take()方法是阻塞的,如果队列为空,它会一直等待直到队列中有元素可取。而poll()方法是非阻塞的,如果队列为空,它会立即返回null
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从getTask方法返回,只有两种结果,一种是拿到了任务,执行任务后接着进行下一次循环继续拿任务,还有一种是返回null,表示未获取到任务,这就是空闲线程,可以销毁了,这时已经在getTask方法中奖ctl的线程数减1了,在runWorker方法的processWorkerExit正常将线程从工作线程集合移除即可
5.线程销毁
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly为true,说明可能异常导致线程退出,这是非正常退出,为保证线程数量需要重新再创建一个线程,所以这里先将线程数减1,正常退出时已经在getTask方法线程数减过1了。
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 总的任务完成后把当前即将销毁的线程完成的任务数加上
completedTaskCount += w.completedTasks;
// 线程集合里移除这个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
// 线程池在STOP状态前,如果是正常退出,任务队列还有任务的情况下,最少还得一个线程,如果线程池线程数>=1则不再创建,否则新建一个工作线程。如果是异常退出则直接新建一个工作线程。
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 这里会再添加一个线程
addWorker(null, false);
}
}
再看一下tryTerminate方法,这个方法是在线程要销毁或者线程池要关闭时会调用,主要是判断是否需要中断空闲的线程,而且逻辑中调用了terminated()方法,可以自定义扩展实现
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 线程池在运行状态或者阻塞队列不为空时直接返回
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程数>0,则中断一个线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 走到这里,说明工作线程数为0了,cas操作将线程状态置为TIDYING,这是个过渡状态,线程数设置为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 默认方法体为空,可作为扩展实现,线程池即将STOP状态时会调这个方法,相当于从外界感知线程池真正关闭的通知
terminated();
} finally {
// 最终会将线程池状态设置为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
6.线程池关闭
shutdown:不再接受新任务,将阻塞队列的任务执行完成后,线程池关闭
shutdownNow:将线程池状态设置为STOP后不再接受新任务,然后将所有线程中断(这里的中断已经拿到任务并执行不会响应中断,是在调用getTask获取下一个任务时看线程池状态为STOP则不会再取阻塞队列任务,直接返回null,然后工作线程销毁,还有一种情况是正在阻塞等待拿任务,阻塞在poll或take上,都会响应中断,然后再一次循环任务返回null),并将未执行的任务返回,线程池关闭
7.线程池扩展机制
1.任务前后的处理
扩展ThreadPoolExecutor类,重写其beforeExecute()和afterExecute()方法,来实现任务执行前后的处理
2.自定义拒绝策略
RejectedExecutionHandler接口,自定义拒绝策略
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于