线程池顶级接口 Executor
Executor
接口为线程池的顶级接口,其 executor()
方法接收一个 Runnable
实现类对象,定义了在使用线程池时,如何调用线程中的业务逻辑。
class DirectExecutor implements Executor { public void execute(Runnable r) { r.run(); } } class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
线程池接口 ExecutorService
- 其
submit()
方法接收一个Callable
或Runnable
对象,用于指定线程的行为.返回一个Future
对象,用来取消任务或取得Callable
对象call()
方法的返回值。 - 其
shutdown()
方法和shutdownNow()
方法都可以关闭线程池,调用此方法后,线程池不能再调用submit()
shutdown()
方法会等待当前线程池内所有任务全部完成再关闭线程池。shutdownNow()
方法会尝试终止池内正在运行的线程且放弃正在等待的任务,马上关闭线程池。awaitTermination()
方法可以使当前线程阻塞一段时间等待线程池被关闭完.返回一个boolean
指示该线程池是否被关闭完。
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
一个线程池的状态有以下三种:
- Running: 活动状态,线程池中的线程正在运行且可以通过调用 submit()方法接收新任务。
- ShuttingDown: 线程池正在关闭过程中,线程池中有线程在执行任务,但不会接收新任务。
- Terminated: 线程池已经关闭,此案城池中没有线程在运行,且不接受新任务。
与线程池相关的类
Callable
- 用于定义任务及其返回值
Callable
类用于创建一个任务,类似于 Runnable
接口,其 call()
方法定义任务具体执行的行为.与 Runnalbe
类的不同之处在于 Callable
的 call()
方法可以有返回值且支持泛型。
要注意
Callable
和Runnable
定义的都是任务而不是线程,要将其传入一个线程或线程池后才可以执行。
Executors
-线程池的工厂类和工具类
Executors
为线程池的工厂类和工具类,我们使用其 newXXXPool()
方法创建各种封装好的线程池。
Future
-用于获取任务返回值
每将一个任务(Callable
或 Runnable
对象)被剑入线程池后,会返回一个 Future
对象.其主要方法和属性如下:
cancel()
方法可以取消该任务。get()
方法可以阻塞当前线程直到该任务执行完毕并获取其返回值。isDone
属性指示任务是否执行完毕,isCancelled
属性指示任务是否被取消。
public class Future { public static void main(String[] args) throws ExecutionException, InterruptedException { // 未来任务, 既是Runnable 也是 Future FutureTask<Integer> task = new FutureTask<>(() -> { TimeUnit.MILLISECONDS.sleep(500); return 100; }); new Thread(task).start(); System.out.println(task.get()); // 阻塞等待任务执行完成, 获取到返回值100 System.out.println("-------------------------------"); // 使用ExecutorService的submit替代FutureTask ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> result = service.submit(() -> { TimeUnit.MILLISECONDS.sleep(500); return 1; }); System.out.println(result.isDone()); // false 执行未完毕 System.out.println(result.get()); // 1 System.out.println(result.isDone()); // true 执行已完毕 System.out.println(result.get()); // 一直等待 System.out.println(service.shutdownNow()); // 立即停止 } }
Java 线程池的具体实现
ThreadPoolExecutor
实现的线程池
ThreadPoolExecutor
为最常见的线程池类,其构造函数如下:
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler );
各参数代表的意义如下:
- corePoolSize: 核心线程数量。
- maximumPoolSize: 最大线程数量。
- keepAliveTime: 线程的最大存活时间。一个线程超过 keepAliveTime 时间不工作则会被销毁。(除非当前池中线程个数小于 corePoolSize)
- unit: 枚举类型,表示 keepAliveTime 的单位。
- workQueue: 存放任务的队列。
- handler: 拒绝策略(添加任务失败后如何处理该任务)
线程池的运行策略如下:
- 线程池刚创建时,里面没有任何线程。
- 当调用
execute()
方法添加一个任务时,会根据当前线程池中运行线程个数做出以下不同行为:
- 若当前线程池中运行线程数小于
corePoolSize
,则在线程池中添加一个新线程执行该任务,即使当前线程池中有空闲线程。 - 若当前线程池中运行线程数大于等于
corePoolSize
,则尝试将这个任务存入任务队列workQueue
。- 若任务队列满了且当前线程池中运行线程数小于等于
maximumPoolSize
,则在线程池中添加一个新线程执行该任务。 - 若任务队列满了且当前线程池中运行线程数大于
maximumPoolSize
,则任务将被拒绝并执行handler
中的拒绝策略。
- 若任务队列满了且当前线程池中运行线程数小于等于
- 当一个线程完成任务时,它会从队列中取下一个任务来执行。
- 当一个线程超过
keepAliveTime
时间未执行任务时,线程池根据当前线程池中运行线程个数判断是否销毁该线程.若当前线程池中运行线程数大于corePoolSize
,就会销毁该线程,直到线程池收缩到corePoolSize
大小。
FixedThreadPool
:固定容量的线程池
FixedThreadPool
线程池内的最大线程个数是固定的,是一种最常见的线程池实现.通过 Executors
类的 ExecutorService newFixedThreadPool(int nThreads)
方法创建,其中 nThreads
参数指定最大线程数。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
下面程序通过一个容量为 4 的 FixedThreadPool
并行寻找质数:
public static void main(String[] args) { ExecutorService pool = Executors.newFixedThreadPool(3); // 向线程池中添加四个任务,分别对四个区间进行查找 Future<List<Integer>> future1 = pool.submit(new ComputeTask(1, 8_0000)); Future<List<Integer>> future2 = pool.submit(new ComputeTask(8_0001, 13_0000)); Future<List<Integer>> future3 = pool.submit(new ComputeTask(13_0001, 17_0000)); Future<List<Integer>> future4 = pool.submit(new ComputeTask(17_0001, 20_0000)); // 主程序阻塞等待四个任务执行完成并获取结果 List<Integer> primes = new LinkedList<>(); primes.addAll(future1.get()); primes.addAll(future2.get()); primes.addAll(future3.get()); primes.addAll(future4.get()); // 关闭线程池 pool.shutdown(); } // 定义计算任务 static class ComputeTask implements Callable<List<Integer>> { private int start, end; ComputeTask(int start, int end) {this.start = start; this.end = end; } @Override public List<Integer> call() { System.out.println(Thread.currentThread().getName() + "start"); List<Integer> returnValue = getPrime(start, end); System.out.println(Thread.currentThread().getName() + "end"); return returnValue; } } static boolean isPrime(int num) { for (int i = 2; i < num / 2; i++) { if (num % i == 0) { return false; } } return true; } /** * 返回指定范围的质数列表 */ static List<Integer> getPrime(int start, int end) { List<Integer> list = new ArrayList<>(); for (int i = 0; i < end; i++) { if (isPrime(i)) { list.add(i); } } return list; }
我们向容量为 3 的线程池中加入 4 个任务,则同一时刻只有 3 个任务并行执行.程序输出如下,我们发现线程 pool-1-thread-3
执行了两个任务。
pool-1-thread-1start pool-1-thread-2start pool-1-thread-3start pool-1-thread-3end pool-1-thread-1end pool-1-thread-2end pool-1-thread-3start pool-1-thread-3end
CachedThreadPool
:容量自动调整的线程池
CachedThreadPool
线程池中存活的线程数可以根据实际情况自动调整
- 向线程池添加新任务时,优先使用线程池中存活的可用线程;若线程池当前没有可用线程,则向线程池中添加一个新线程。
- 若线程池中的线程超过 60 秒未使用,则回收该线程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
SingleThreadExecutor
:线程数为 1 的线程池
SingleThreadExecutor
中的线程个数为 1,可以用来保证任务同步执行。
public classSingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int j = i; service.execute(() -> { System.out.println(j + " " + Thread.currentThread().getName()); }); } } }
程序输出如下:
0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
ScheduledThreadPoolExecutor
实现的线程池
ScheduledThreadPool
:定时执行任务的线程池
ScheduledThreadPool
线程池可以定时执行任务。通过 Executors
类的 ExecutorService newSingleThreadExecutor(int corePoolSize)
方法创建,其 corePoolSize
参数指定线程池的核心线程数。
其主要方法有 schedule()
,scheduleAtFixedRate()
,scheduleWithFixedDelay()
,可以设定任务的执行计划.可以根据 当前任务的到期情况 自动调整线程池中的线程数.示例程序如下:
public class ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); // 使用固定的频率执行某个任务 // 四个参数 // command: 执行的任务 // initialDelay: 第一次执行延时多久执行 // period: 每隔多久执行一次这个任务 // unit: 时间单位 service.scheduleAtFixedRate(() -> { try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); // 每隔500ms打印一下线程名称 // 线程执行1000ms,而每sleep 500 就要新启动一个线程 // 上个线程未执行完毕,会启用新的线程执行 // 如果线程池已满,只有延时 } }
ForkJoinPool
实现的线程池
ForkJoinPool
:执行递归任务的线程池
ForkJoinPool
线程池的 submit()
方法接收 ForkJoinTask
类的任务,该类支持 fork()
,join()
成员方法:
fork()
方法会将该子任务加入线程池异步执行.join()
方法会阻塞当前线程直到子任务执行完成并获取其返回值
ForkJoinTask
有两个子类: RecursiveAction
和 RecursiveTask
,其中 RecursiveAction
任务没有返回值,而 RecursiveTask
任务有返回值.它们的任务执行逻辑均写在其 compute()
方法中.
ForkJoinPool
是一个较底层的线程池,因而 Executor
中没有与其对应的构造方法,需要我们显示调用其构造函数获得该类型的线程池。
public class T12_ForkJoinPool { static int[] nums = new int[100_0000]; static final int MAX_NUM = 5_0000; // 每个线程最多可以运行5万个数字相加 static Random random = new Random(); // 初始化这100_000个数字, 每个数字范围在100之内 static { for (int i = 0; i < nums.length; i++) { nums[i] = random.nextInt(100); } // 所有数字和, 事先计算: //System.out.println(Arrays.stream(nums).sum()); // 使用单线程stream api 进行求和 } /** * RecursiveAction: 递归操作 没有返回值 * RecursiveTask: 递归操作,有返回值 */ static class AddTask extends RecursiveAction { int start, end; AddTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { // 进行计算 // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork if (end - start <= MAX_NUM) { long sum = 0; for (int i = start; i < end; i++) { sum += nums[i]; } System.out.println("sum = " + sum); } else { int middle = (end - start) / 2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } static class AddTask2 extends RecursiveTask<Long> { int start, end; AddTask2(int start, int end) { this.start = start; this.end = end; } @Override protected Long compute() { // 进行计算 // 如果计算的数的和的范围 小于 MAX_NUM, 进行计算,否则进行 fork if (end - start <= MAX_NUM) { long sum = 0; for (int i = start; i < end; i++) { sum += nums[i]; } return sum; } else { int middle = start + (end - start) / 2; // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常 AddTask2 subTask1 = new AddTask2(start, middle); AddTask2 subTask2 = new AddTask2(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } } // 运行 public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask2 task = new AddTask2(0, nums.length); fjp.execute(task); System.out.println(task.join()); //System.in.read(); } }
WorkStealingPool
:工作窃取线程池
WorkStealingPool
线程池通过 Executors
类的 ExecutorService newWorkStealingPool()
方法创建,其核心线程数为机器的核心数。
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool(Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
WorkStealingPool
线程池采用 工作窃取
模式,相比于一般的线程池实现,工作窃取
模式的优势体现在对递归任务的处理方式上。
- 在一般的线程池中,若一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。
- 而在
工作窃取
模式中,若某个子问题由于等待另外一个子问题的完成而无法继续运行,则处理该子问题的线程会主动寻找其他尚未运行的子问题(窃取过来)来执行.这种方式减少了线程的等待时间,提高了性能。
public class WorkStealingPool { public static void main(String[] args) throws IOException { // CPU 核数 System.out.println(Runtime.getRuntime().availableProcessors()); // workStealingPool 会自动启动cpu核数个线程去执行任务 ExecutorService service = Executors.newWorkStealingPool(); service.execute(new R(1000)); // 我的cpu核数为4 启动5个线程,其中第一个是1s执行完毕,其余都是2s执行完毕, // 有一个任务会进行等待,当第一个执行完毕后,会再次偷取第5个任务执行 for (int i = 0; i < Runtime.getRuntime().availableProcessors(); i++) { service.execute(new R(2000)); } // 因为work stealing 是deamon线程,即后台线程,精灵线程,守护线程 // 所以当main方法结束时, 此方法虽然还在后台运行,但是无输出 // 可以通过对主线程阻塞解决 System.in.read(); } static class R implements Runnable { int time; R(int time) { this.time = time; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + time); } } }
程序输出如下:
4 ForkJoinPool-1-worker-2 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-2 2000
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于