多线程

本贴最后更新于 238 天前,其中的信息可能已经时异事殊

1.线程状态

new:尚未启动的线程 runnable:在Java虚拟机中执行的线程 blocked:被阻塞等待监视器锁定的线程 waiting:正在等待 执行特定动作的线程 timed_waiting:正在等待另一个线程执行动作达到指定等待时间的线程 terminated(终止、结束):已退出的线程

image

2.线程阻塞

sleep yield(),放弃cpu,不阻塞 suspend() 和 resume(): wait() 和 notify() 线程a.join:等线程a执行完了,本线程再继续
1.wait(),notify()和 suspend(),resume()之间的区别
1.sleep()、suspend()和resume()、yield()阻塞时都不会释放占用的锁(如果占用了的话),wait(),notify()方法阻塞时要释放占用的锁 2.wait和notify时object,锁是任意对象都具有的,调用任意对象的wait导致线程阻塞,因此他俩也需要在**synchronized方法或块中调用** 3.notify只会唤醒wait阻塞的线程中随机选取一个
2.wait 和 sleep 区别
wait属于object,sleep属于thread类 wait释放锁,sleep不释放锁 wait只能在同步方法或代码块中使用,sleep任意地方使用 wait不需要捕获异常,sleep需要捕获异常

3.线程池

1.ThreadPoolExecutor 参数
corePoolSize:核心线程数,创建后会销毁 小于等于corePoolSize的时候,会创建新线程 >corePoolSize的时候,放到队列里,队列满了,创建新线程,直到大于maxNumPoolSize,被拒绝 maxNumPoolSize:最大线程数,包括核心线程和非核心线程,非核心线程创建后会自动销毁 keepAliveTime:线程存活时间,非核心线程的空闲时间超过keepAliveTime则会终止 unit:存活时间单位 workQueue:任务队列 SynchronousQueue:没有任何容量 ArrayBlockingQueue 基于数组实现的阻塞队列 LinkedBlockingQueue 其实也是有界队列,如果不设置最大值,就成了无界队列,内部是基于链表实现的 threadFactory:线程工厂 handler:拒绝策略 AbortPolicy:直接报错(默认) DiscardOldestPolicy:队列头部任务被删除,然后重新尝试执行任务(如果再次失败,则重复此过程) DiscardPolicy:直接丢弃 CallerRunsPolicy:调用者线程运行该任务
2.线程池创建方法
Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待; Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程; Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序; Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池; Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池; Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。 ThreadPoolExecutor:最原始的创建线程池的方式,它包含了 7 个参数可供设置,后面会详细讲。

4.execute()方法和 submit()

execute:
线程池顶级接口Executor定义 只能接收实现Runnable接口类型的任务,返回值是void run方法有异常会打印
submit
ExecutorService接口定义 接收Runnable,Callable类型的任务,返回值是Future run方法有异常不会打印

5.源码相关

线程池状态和线程数用一个int类型存储,按位操作,高3位标识状态,29位标识数量
1.提交线程
一种是submit,一种是execute。submit方法提交到线程池后会返回一个Future对象,可以使用Future跟踪线程执行是否执行完成及获取结果,其实是将任务封装成RunnableFuture对象,真正的执行也是调用的execute方法,而execute方法不带返回值
// 提交Runnable任务,返回Future对象 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 提交Runnable任务,返回Future对象,带返回值result public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } // 提交Callable任务,返回Future对象,具体的返回值是实现Callable的call方法的返回值。 public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // 执行任务的具体实现方法 public void execute(Runnable command) { // 任务不能为null if (command == null) throw new NullPointerException(); int c = ctl.get(); // 线程池中的线程数小于核心核心线程数,则添加核心线程,由核心线程去执行任务,然后核心线程会循环从阻塞队列等待获取任务并执行。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 走到这里有两种情况,1:线程池的线程数已经>=核心线程数了,2:线程池的线程数没有达到核心线程数但添加核心线程失败了。如果线程还是Running状态,则先把任务添加到阻塞队列里 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 线程池非Running状态并且移除阻塞队列移除该任务成功,则执行拒绝策略。 reject(command); else if (workerCountOf(recheck) == 0) // 工作线程数是0的情况下,增加一个新工作线程 addWorker(null, false); } // 走到这里说明线程数已经>=核心线程数了,而且往阻塞队列添加任务也失败了,可能是队列满了,这时候就尝试添加非核心线程,成功则结果,失败则执行拒绝策略。 else if (!addWorker(command, false)) reject(command); } final void reject(Runnable command) { // 拒绝策略,调用构造线程池时传递的RejectedExecutionHandler对象rejectedExecution方法 handler.rejectedExecution(command, this); }
2.添加工作线程 addWorker
/* * 添加工作线程 * @param firstTask 具体任务,可以为null * @param core 是否核心线程表示,线程本质是不区分核心还是分核心的,只是用这个来标识要添加哪一种类型的线程,做判断用的。比如true的情况下,当前线程池的线程数要 < corePoolSize才能继续添加线程,为false的情况下,当前线程池的线程数要 < maximumPoolSize才能继续添加线程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 判断是否需要添加工作线程,线程池状态>=SHUTDOWN时,再判断如果状态>SHUTDOWN或者传参firstTask不等于null或者阻塞队列没有任务这三种情况则不添加新的工作线程。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 这里采用自旋+cas操作,目的就是将工作线程数加1 for (;;) { int wc = workerCountOf(c); // 目前线程数>=线程池可以创建的理论线程数最大值则不再创建工作线程,或者要创建核心线程但线程池线程数已经达到corePoolSize,或者要创建非核心线程但线程池线程数已经达到maximumPoolSize,这两种情况也不再创建工作线程,直接返回添加工作线程失败标识。 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // cas操作线程数加1,成功则跳出内层循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 因为addWorker没有加锁操作,这里再判断一次线程池状态,拿到ctl最新值 if (runStateOf(c) != rs) continue retry; } } // 以上只是将ctl的线程数加1了,以下是真正的创建一个工作线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 构造一个Worker对象,构造方法里就实例化了一个具体的Thread线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 创建工作线程时,需要加锁,防止其他线程并发操作。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 判断线程池状态,并将Worker放入工作线程集合里 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 这里就是标记线程池里创建线程的最大值,这个值最大也不会超过maximumPoolSize。 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动工作线程 t.start(); workerStarted = true; } } } finally { // 如果工作线程没有启动,说明添加工作线程失败,需要把之前ctl线程数加1的操作回滚,就是ctl线程数减1,如果已经添加到工作线程集合里也需要移除 if (! workerStarted) addWorkerFailed(w); } return workerStarted; } private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 涉及工作线程的相关操作都需要加锁 mainLock.lock(); try { // 从工作线程集合里移除worker if (w != null) workers.remove(w); // cas操作ctl线程数减1 decrementWorkerCount(); // 判断是否需要终止线程池 tryTerminate(); } finally { mainLock.unlock(); } } //Worker类是ThreadPoolExecutor内部类,就是具体的工作线程类,继承了AbstractQueuedSynchronizer并实现了Runnable接口,主要利用了AQS的同步机制来控制对工作线程的并发访问 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // Worker的Thread属性,其实干活的就是这个线程 final Thread thread; // 任务 Runnable firstTask; // 线程已经执行完成的任务总数 volatile long completedTasks; // 构造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 以当前对象创建Thread线程,线程执行时调用的就是这个类的run方法 this.thread = getThreadFactory().newThread(this); } // run方法执行任务,调用的是外部ThreadPoolExecutor的runWorker方法 public void run() { runWorker(this); } ... }
1.工作线程count +1 2.创建一个Work t,加到当前工作线程集合works里面,启动工作线程t.start
3.任务执行
public void run() { // 其实线程真正的执行逻辑是外部的ThreadPoolExecutor类的runWorker方法 runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果获取的任务为null,则线程会销毁,其实线程的复用核心就是在这里,线程在while循环中不停的去获取任务并执行。 while (task != null || (task = getTask()) != null) { w.lock(); // 判断线程的状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 可自定义任务执行前的逻辑,默认空实现 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务的业务逻辑 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 可自定义任务执行后的逻辑,默认空实现 afterExecute(task, thrown); } } finally { task = null; // 完成的任务数加1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 没有拿到任务,线程销毁操作,completedAbruptly标识,false:正常退出,true:异常退出 processWorkerExit(w, completedAbruptly); } }
4.任务获取
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? // 循环调用,其中会判断线程池状态 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池即将关闭状态,如果阻塞队列中也没有任务了,返回null,runWorker方法没有拿到task则退出while循环,销毁线程。 // 这里根据shutdown和shutdownNow设置不同的线程池状态走不同的逻辑 // 如果线程池状态是STOP则直接线程数减1,然后返回null,runWorker方法会退出while循环,线程销毁 // 如果线程池状态是SHUTDOWN则再看看阻塞队列是否为空,为空则线程数减1,后续线程销毁,不会空则继续获取任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 获取当前线程数 int wc = workerCountOf(c); // 是否允许超时标识,allowCoreThreadTimeOut核心线程是否允许超时 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 非核心线程过多或者允许超时的情况下,如果队列为空则工作线程减1,后续销毁线程,这里就返回null if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 允许核心线程超时或者线程数大于核心线程数时,采用poll取数据,非阻塞,超过keepAliveTime没有获取到数据就继续自旋获取任务, // 不允许核心线程超时或者线程数小于等于核心线程数时,采用take取数据,阻塞等待直到获取到任务或者被中断 //take()方法是阻塞的,如果队列为空,它会一直等待直到队列中有元素可取。而poll()方法是非阻塞的,如果队列为空,它会立即返回null Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
从getTask方法返回,只有两种结果,一种是拿到了任务,执行任务后接着进行下一次循环继续拿任务,还有一种是返回null,表示未获取到任务,这就是空闲线程,可以销毁了,这时已经在getTask方法中奖ctl的线程数减1了,在runWorker方法的processWorkerExit正常将线程从工作线程集合移除即可
5.线程销毁
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly为true,说明可能异常导致线程退出,这是非正常退出,为保证线程数量需要重新再创建一个线程,所以这里先将线程数减1,正常退出时已经在getTask方法线程数减过1了。 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 总的任务完成后把当前即将销毁的线程完成的任务数加上 completedTaskCount += w.completedTasks; // 线程集合里移除这个工作线程 workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 线程池在STOP状态前,如果是正常退出,任务队列还有任务的情况下,最少还得一个线程,如果线程池线程数>=1则不再创建,否则新建一个工作线程。如果是异常退出则直接新建一个工作线程。 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // 这里会再添加一个线程 addWorker(null, false); } } 再看一下tryTerminate方法,这个方法是在线程要销毁或者线程池要关闭时会调用,主要是判断是否需要中断空闲的线程,而且逻辑中调用了terminated()方法,可以自定义扩展实现 final void tryTerminate() { for (;;) { int c = ctl.get(); // 线程池在运行状态或者阻塞队列不为空时直接返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果工作线程数>0,则中断一个线程 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 走到这里,说明工作线程数为0了,cas操作将线程状态置为TIDYING,这是个过渡状态,线程数设置为0 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 默认方法体为空,可作为扩展实现,线程池即将STOP状态时会调这个方法,相当于从外界感知线程池真正关闭的通知 terminated(); } finally { // 最终会将线程池状态设置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } }

6.线程池关闭

shutdown:不再接受新任务,将阻塞队列的任务执行完成后,线程池关闭 shutdownNow:将线程池状态设置为STOP后不再接受新任务,然后将所有线程中断(这里的中断已经拿到任务并执行不会响应中断,是在调用getTask获取下一个任务时看线程池状态为STOP则不会再取阻塞队列任务,直接返回null,然后工作线程销毁,还有一种情况是正在阻塞等待拿任务,阻塞在poll或take上,都会响应中断,然后再一次循环任务返回null),并将未执行的任务返回,线程池关闭

7.线程池扩展机制

1.任务前后的处理
扩展ThreadPoolExecutor类,重写其beforeExecute()和afterExecute()方法,来实现任务执行前后的处理
2.自定义拒绝策略
RejectedExecutionHandler接口,自定义拒绝策略
  • 线程
    123 引用 • 111 回帖 • 3 关注

相关帖子

回帖

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...