多线程

1.线程状态

new:尚未启动的线程  
runnable:在Java虚拟机中执行的线程  
blocked:被阻塞等待监视器锁定的线程  
waiting:正在等待 执行特定动作的线程  
timed_waiting:正在等待另一个线程执行动作达到指定等待时间的线程  
terminated(终止、结束):已退出的线程

image

2.线程阻塞

sleep

yield(),放弃cpu,不阻塞

suspend() 和 resume():

wait() 和 notify()

线程a.join:等线程a执行完了,本线程再继续
1.wait(),notify()和 suspend(),resume()之间的区别
	1.sleep()、suspend()和resume()、yield()阻塞时都不会释放占用的锁(如果占用了的话),wait(),notify()方法阻塞时要释放占用的锁

	2.wait和notify时object,锁是任意对象都具有的,调用任意对象的wait导致线程阻塞,因此他俩也需要在**synchronized方法或块中调用**

	3.notify只会唤醒wait阻塞的线程中随机选取一个
2.wait 和 sleep 区别
	wait属于object,sleep属于thread类

	wait释放锁,sleep不释放锁

	wait只能在同步方法或代码块中使用,sleep任意地方使用

	wait不需要捕获异常,sleep需要捕获异常

3.线程池

1.ThreadPoolExecutor 参数
	corePoolSize:核心线程数,创建后会销毁

		小于等于corePoolSize的时候,会创建新线程

		>corePoolSize的时候,放到队列里,队列满了,创建新线程,直到大于maxNumPoolSize,被拒绝

	maxNumPoolSize:最大线程数,包括核心线程和非核心线程,非核心线程创建后会自动销毁

	keepAliveTime:线程存活时间,非核心线程的空闲时间超过keepAliveTime则会终止

	unit:存活时间单位

	workQueue:任务队列

		SynchronousQueue:没有任何容量

		ArrayBlockingQueue 基于数组实现的阻塞队列  
                    LinkedBlockingQueue 其实也是有界队列,如果不设置最大值,就成了无界队列,内部是基于链表实现的

	threadFactory:线程工厂

	handler:拒绝策略

		AbortPolicy:直接报错(默认)

		DiscardOldestPolicy:队列头部任务被删除,然后重新尝试执行任务(如果再次失败,则重复此过程)

		DiscardPolicy:直接丢弃

		CallerRunsPolicy:调用者线程运行该任务
2.线程池创建方法
	Executors.newFixedThreadPool:创建一个固定大小的线程池,可控制并发的线程数,超出的线程会在队列中等待;  
	Executors.newCachedThreadPool:创建一个可缓存的线程池,若线程数超过处理所需,缓存一段时间后会回收,若线程数不够,则新建线程;  
	Executors.newSingleThreadExecutor:创建单个线程数的线程池,它可以保证先进先出的执行顺序;  
	Executors.newScheduledThreadPool:创建一个可以执行延迟任务的线程池;  
	Executors.newSingleThreadScheduledExecutor:创建一个单线程的可以执行延迟任务的线程池;  
	Executors.newWorkStealingPool:创建一个抢占式执行的线程池(任务执行顺序不确定)【JDK 1.8 添加】。  
	ThreadPoolExecutor:最原始的创建线程池的方式,它包含了 7 个参数可供设置,后面会详细讲。

4.execute()方法和 submit()

execute:
	线程池顶级接口Executor定义

	只能接收实现Runnable接口类型的任务,返回值是void

	run方法有异常会打印
submit
	ExecutorService接口定义

	接收Runnable,Callable类型的任务,返回值是Future

	run方法有异常不会打印

5.源码相关

线程池状态和线程数用一个int类型存储,按位操作,高3位标识状态,29位标识数量
1.提交线程
	一种是submit,一种是execute。submit方法提交到线程池后会返回一个Future对象,可以使用Future跟踪线程执行是否执行完成及获取结果,其实是将任务封装成RunnableFuture对象,真正的执行也是调用的execute方法,而execute方法不带返回值
// 提交Runnable任务,返回Future对象
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
// 提交Runnable任务,返回Future对象,带返回值result
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
// 提交Callable任务,返回Future对象,具体的返回值是实现Callable的call方法的返回值。
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
// 执行任务的具体实现方法
public void execute(Runnable command) {
    // 任务不能为null
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
	// 线程池中的线程数小于核心核心线程数,则添加核心线程,由核心线程去执行任务,然后核心线程会循环从阻塞队列等待获取任务并执行。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
	// 走到这里有两种情况,1:线程池的线程数已经>=核心线程数了,2:线程池的线程数没有达到核心线程数但添加核心线程失败了。如果线程还是Running状态,则先把任务添加到阻塞队列里
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            // 线程池非Running状态并且移除阻塞队列移除该任务成功,则执行拒绝策略。
            reject(command);
        else if (workerCountOf(recheck) == 0)
            // 工作线程数是0的情况下,增加一个新工作线程
            addWorker(null, false);
    }
    // 走到这里说明线程数已经>=核心线程数了,而且往阻塞队列添加任务也失败了,可能是队列满了,这时候就尝试添加非核心线程,成功则结果,失败则执行拒绝策略。
    else if (!addWorker(command, false))
        reject(command);
}
 
final void reject(Runnable command) {
    // 拒绝策略,调用构造线程池时传递的RejectedExecutionHandler对象rejectedExecution方法
    handler.rejectedExecution(command, this);
}
2.添加工作线程 addWorker
/*
 * 添加工作线程
 * @param firstTask 具体任务,可以为null
 * @param core 是否核心线程表示,线程本质是不区分核心还是分核心的,只是用这个来标识要添加哪一种类型的线程,做判断用的。比如true的情况下,当前线程池的线程数要 < corePoolSize才能继续添加线程,为false的情况下,当前线程池的线程数要 < maximumPoolSize才能继续添加线程
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // 判断是否需要添加工作线程,线程池状态>=SHUTDOWN时,再判断如果状态>SHUTDOWN或者传参firstTask不等于null或者阻塞队列没有任务这三种情况则不添加新的工作线程。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
    	// 这里采用自旋+cas操作,目的就是将工作线程数加1
        for (;;) {
            int wc = workerCountOf(c);
            // 目前线程数>=线程池可以创建的理论线程数最大值则不再创建工作线程,或者要创建核心线程但线程池线程数已经达到corePoolSize,或者要创建非核心线程但线程池线程数已经达到maximumPoolSize,这两种情况也不再创建工作线程,直接返回添加工作线程失败标识。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // cas操作线程数加1,成功则跳出内层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 因为addWorker没有加锁操作,这里再判断一次线程池状态,拿到ctl最新值
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
	// 以上只是将ctl的线程数加1了,以下是真正的创建一个工作线程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 构造一个Worker对象,构造方法里就实例化了一个具体的Thread线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 创建工作线程时,需要加锁,防止其他线程并发操作。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
            	// 判断线程池状态,并将Worker放入工作线程集合里
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 这里就是标记线程池里创建线程的最大值,这个值最大也不会超过maximumPoolSize。
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动工作线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程没有启动,说明添加工作线程失败,需要把之前ctl线程数加1的操作回滚,就是ctl线程数减1,如果已经添加到工作线程集合里也需要移除
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
 
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
	// 涉及工作线程的相关操作都需要加锁
    mainLock.lock();
    try {
        // 从工作线程集合里移除worker
        if (w != null)
            workers.remove(w);
        // cas操作ctl线程数减1
        decrementWorkerCount();
        // 判断是否需要终止线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

//Worker类是ThreadPoolExecutor内部类,就是具体的工作线程类,继承了AbstractQueuedSynchronizer并实现了Runnable接口,主要利用了AQS的同步机制来控制对工作线程的并发访问
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
	// Worker的Thread属性,其实干活的就是这个线程
    final Thread thread;
    // 任务
    Runnable firstTask;
    // 线程已经执行完成的任务总数
    volatile long completedTasks;

	// 构造方法
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 以当前对象创建Thread线程,线程执行时调用的就是这个类的run方法
        this.thread = getThreadFactory().newThread(this);
    }
 
    // run方法执行任务,调用的是外部ThreadPoolExecutor的runWorker方法
    public void run() {
        runWorker(this);
    }
    ...
}
	1.工作线程count +1

	2.创建一个Work t,加到当前工作线程集合works里面,启动工作线程t.start
3.任务执行
public void run() {
    // 其实线程真正的执行逻辑是外部的ThreadPoolExecutor类的runWorker方法
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 如果获取的任务为null,则线程会销毁,其实线程的复用核心就是在这里,线程在while循环中不停的去获取任务并执行。
        while (task != null || (task = getTask()) != null) {
            w.lock();
           // 判断线程的状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 可自定义任务执行前的逻辑,默认空实现
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务的业务逻辑
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 可自定义任务执行后的逻辑,默认空实现
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 完成的任务数加1
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 没有拿到任务,线程销毁操作,completedAbruptly标识,false:正常退出,true:异常退出
        processWorkerExit(w, completedAbruptly);
    }
}
4.任务获取
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
 
    // 循环调用,其中会判断线程池状态
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
    	// 线程池即将关闭状态,如果阻塞队列中也没有任务了,返回null,runWorker方法没有拿到task则退出while循环,销毁线程。
        // 这里根据shutdown和shutdownNow设置不同的线程池状态走不同的逻辑
        // 如果线程池状态是STOP则直接线程数减1,然后返回null,runWorker方法会退出while循环,线程销毁
        // 如果线程池状态是SHUTDOWN则再看看阻塞队列是否为空,为空则线程数减1,后续线程销毁,不会空则继续获取任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
    	// 获取当前线程数
        int wc = workerCountOf(c);
    	// 是否允许超时标识,allowCoreThreadTimeOut核心线程是否允许超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    	// 非核心线程过多或者允许超时的情况下,如果队列为空则工作线程减1,后续销毁线程,这里就返回null
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
 
        try {
            // 允许核心线程超时或者线程数大于核心线程数时,采用poll取数据,非阻塞,超过keepAliveTime没有获取到数据就继续自旋获取任务,
            // 不允许核心线程超时或者线程数小于等于核心线程数时,采用take取数据,阻塞等待直到获取到任务或者被中断
			//take()方法是阻塞的,如果队列为空,它会一直等待直到队列中有元素可取。而poll()方法是非阻塞的,如果队列为空,它会立即返回null
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
从getTask方法返回,只有两种结果,一种是拿到了任务,执行任务后接着进行下一次循环继续拿任务,还有一种是返回null,表示未获取到任务,这就是空闲线程,可以销毁了,这时已经在getTask方法中奖ctl的线程数减1了,在runWorker方法的processWorkerExit正常将线程从工作线程集合移除即可
5.线程销毁
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果completedAbruptly为true,说明可能异常导致线程退出,这是非正常退出,为保证线程数量需要重新再创建一个线程,所以这里先将线程数减1,正常退出时已经在getTask方法线程数减过1了。
    if (completedAbruptly)
        decrementWorkerCount();
 
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 总的任务完成后把当前即将销毁的线程完成的任务数加上
        completedTaskCount += w.completedTasks;
        // 线程集合里移除这个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
 
    tryTerminate();
 
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        // 线程池在STOP状态前,如果是正常退出,任务队列还有任务的情况下,最少还得一个线程,如果线程池线程数>=1则不再创建,否则新建一个工作线程。如果是异常退出则直接新建一个工作线程。
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 这里会再添加一个线程
        addWorker(null, false);
    }
}
再看一下tryTerminate方法,这个方法是在线程要销毁或者线程池要关闭时会调用,主要是判断是否需要中断空闲的线程,而且逻辑中调用了terminated()方法,可以自定义扩展实现
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 线程池在运行状态或者阻塞队列不为空时直接返回
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 如果工作线程数>0,则中断一个线程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 走到这里,说明工作线程数为0了,cas操作将线程状态置为TIDYING,这是个过渡状态,线程数设置为0
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 默认方法体为空,可作为扩展实现,线程池即将STOP状态时会调这个方法,相当于从外界感知线程池真正关闭的通知
                    terminated();
                } finally {
                    // 最终会将线程池状态设置为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
    }
}

6.线程池关闭

shutdown:不再接受新任务,将阻塞队列的任务执行完成后,线程池关闭

shutdownNow:将线程池状态设置为STOP后不再接受新任务,然后将所有线程中断(这里的中断已经拿到任务并执行不会响应中断,是在调用getTask获取下一个任务时看线程池状态为STOP则不会再取阻塞队列任务,直接返回null,然后工作线程销毁,还有一种情况是正在阻塞等待拿任务,阻塞在poll或take上,都会响应中断,然后再一次循环任务返回null),并将未执行的任务返回,线程池关闭

7.线程池扩展机制

1.任务前后的处理
	扩展ThreadPoolExecutor类,重写其beforeExecute()和afterExecute()方法,来实现任务执行前后的处理
2.自定义拒绝策略
	RejectedExecutionHandler接口,自定义拒绝策略
  • 线程
    121 引用 • 111 回帖 • 3 关注

相关帖子

回帖

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...