Java 并发工具类
Fork/Join "分而治之"
“分而治之”:就是将一个复杂的计算,按照设定的阈值分解成多个小计算,然后将各个小计算的计算结果进行汇总。
Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时,即达到阈值以下)再将一个个小任务计算的结果进行 join 汇总。
工作窃取特性
Fork/Join 具有 工作窃取(workStealing) 特性, 即每个线程会维护一个双端队列, 默认是从尾部取任务, 当一个线程的工作队列为空时, 它会去其他线程的工作线程尾部窃取一个工作任务. 这种机制可以让线程的限制时间减少, 提升程序效率, 并且减少获取工作任务的阻塞时间.
Fork/Join 使用范式
需要使用 Fork/Join 模式的类,需要继承自 RecursiveTask(无返回值)、RecursiveAction(有返回值),然后在 compute 方法内实现“分而治之”的逻辑就可以。
RecursiveTask(无返回值)、RecursiveAction(有返回值)这俩个类都是继承自 ForkJoinTask 类。
Fork/Join 使用步骤:
- 编写 RecursiveTask(无返回值)、RecursiveAction(有返回值)的 ForkJoinTask 任务类。
- 在任务类的 compute 方法实现分儿治之的模式
- fork():在任务执行过程中将大任务划分为多个小的子任务,调用子任务的 fork()方法可以将任务放到线程池中异步调度。其实这里执行子任务调用 fork 方法并不是最佳的选择,最佳的选择是 invokeAll(leftTask,rightTask)方法。
- join():调用子任务的 join()方法等待任务返回的结果。这个方法类似于 Thread.join(),区别在于前者不受线程中断机制的影响。
- 在主线程中声明 ForkJoinPool 线程池
- 将任务类传入线程池对象,然后进行 task 调用,这里有三种调用方式
- forkJoinPool.execute(forkJoinTask) 异步执行 tasks,无返回值
- forkJoinPool.invoke(forkJoinTask) 在当前线程同步执行该任务。该方法也不受中断机制影响。
- forkJoinPool.submit(forkJoinTask) 异步执行,且带 Task 返回值,可通过 task.get 实现同步到主线程
Fork/Join 实战
有返回值,同步调用 forkJoinTask 方法,统计数组和
public class SumArray {
private static class SumTask extends RecursiveTask<Integer>{
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;//阈值
private int[] src; //表示我们要实际统计的数组
private int fromIndex;//开始统计的下标
private int toIndex;//统计到哪里结束的下标
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;//开始下标
this.toIndex = toIndex;//结束下标
}
@Override
protected Integer compute() {
if(toIndex-fromIndex < THRESHOLD) {//如果结束下标-开始下标<阈值的话,就不用再进行拆分了
int count = 0;
for(int i=fromIndex;i<=toIndex;i++) {
count = count + src[i];
}
return count;
}else {//大于阈值,继续拆分
int mid = (fromIndex+toIndex)/2;
SumTask left = new SumTask(src,fromIndex,mid);//左边的任务
SumTask right = new SumTask(src,mid+1,toIndex);//右边的任务
invokeAll(left,right);//调用子任务
return left.join()+right.join();//递归调用
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] src = MakeArray.makeArray();//获得一个很大的满是数字的数组
SumTask innerFind = new SumTask(src,0,src.length-1);
long start = System.currentTimeMillis();
pool.invoke(innerFind);//同步调用,启动forkJoinTask
System.out.println("Task is Running.....");
System.out.println("The count is "+innerFind.join()
+" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
}
CountDownLatch 闭锁
- 一组线程等待其他的相关线程完成工作以后再执行,加强版 join
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
具体使用步骤
- 初始化 CountDownLatch(int count) 传入 int 值,表明有几个扣除点
- 在需要等待其它线程完成工作后才可继续执行任务的线程处,调用 countDownLatch.await()方法,该方法的作用是等待 count 为 0 时,该线程才可继续执行任务。可在多个线程内部调用 countDownLatch.await()方法,则这些线程都要等待 count 扣除点值为 0 时才可继续执行任务
- 在相关初始化线程或者业务线程内,调用 countDownLatch.countDown()来使得初始化 countDownLatch 时的 count 扣除点-1。
下面来看一下这个例子,这个例子有四个扣除点,有俩个线程设置了 await()
public class UseCountDownLatch {
static CountDownLatch countDownLatch = new CountDownLatch(4);//初始化CountDownLatch,并设置了四个扣除点
private static class initClass extends Thread{
private CountDownLatch countDownLatch;
public initClass(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("初始化相关工作。。。。。");
try {
Thread.sleep(2000);
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class businessClass extends Thread{
private CountDownLatch countDownLatch;
public businessClass(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("完成相关业务,需要其它线程的初始化完成");
countDownLatch.await();//等待点
System.out.println("其它线程工作完成,业务线程继续工作。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
try {
System.out.println("干了一些事情。。。。");
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("又干了一些事情。。。。");
Thread.sleep(1000);
countDownLatch.countDown();//每完成一步工作,扣减一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(new businessClass(countDownLatch)).start();
for (int i=0; i<2; i++){
new Thread(new initClass(countDownLatch)).start();
}
countDownLatch.await();//等待点
System.out.println("主线程接着开始工作。。。。");
}
}
输出如下:
干了一些事情。。。。
初始化相关工作。。。。。
初始化相关工作。。。。。
完成相关业务,需要其它线程的初始化完成
又干了一些事情。。。。
主线程接着开始工作。。。。
其它线程工作完成,业务线程继续工作。。。
可以看出,只有当 count 值减为 0 之后,俩个等待的线程才继续开始工作
CyclicBarrier 循环栅栏
- 让一组线程达到某个栅栏,被阻塞,一直到组内最后一个线程达到栅栏时,屏障开放,所有被阻塞的线程会继续运行。
CyclicBarrier 有俩个构造方法
- CyclicBarrier(int parties) 传入通过栅栏所需的一组线程的数量
- CyclicBarrier(int parties, Runnable barrierAction)传入通过栅栏所需的一组线程的数量,和到达一定的数量之后,新开一个线程完成相应工作
具体使用步骤:
- 初始化 CyclicBarrier (int parties, Runnable barrierAction))
- 在需要这组线程共同等待的栅栏处调用 cyclicBarrier.await()方法
- 编写通过栅栏后所需要完成什么工作的线程类
下面来看一下这个例子,这个例子将 parties 设置为 5,意为只有当五个线程都到达 cyclicBarrier.await()处时,五个线程才可以继续往下执行。当栅栏放行后,栅栏会重置!
public class UseCyclicBarrier {
private static CyclicBarrier barrier
= new CyclicBarrier(5,new CollectThread());
private static ConcurrentHashMap<String,Long> resultMap
= new ConcurrentHashMap<>();//存放子线程工作结果的容器
public static void main(String[] args) {
for(int i=0;i<=4;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
for(int i=0;i<=4;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
}
//负责屏障开放以后的工作
private static class CollectThread implements Runnable{
@Override
public void run() {
System.out.println("栅栏放行!,重置栅栏!");
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result);
System.out.println("do other business........");
}
}
//工作线程
private static class SubThread implements Runnable{
@Override
public void run() {
long id = Thread.currentThread().getId();//线程本身的处理结果
resultMap.put(Thread.currentThread().getId()+"",id);
Random r = new Random();//随机决定工作线程的是否睡眠
try {
if(r.nextBoolean()) {
Thread.sleep(2000+id);
System.out.println("Thread_"+id+" ....do something ");
}
System.out.println(id+"....is await");
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do its business ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
使用场景:
CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。
辨析 CountDownLatch 和 CyclicBarrier
- Countdownlatch 放行由第三者控制,CyclicBarrier 放行由一组线程本身控制
- Countdownlatch 放行条件 >= 线程数,CyclicBarrier 放行条件 = 线程数
- Countdownlatch 放行后的动作实施者是第三者线程组,且具有不可重复性。CyclicBarrier 放行后的动作实施者还是这个线程组本身,且可以反复执行。
Semaphore
控制同时访问某个特定资源的线程数量,主要用于流量控制。
构造方法
public Semaphore(int permits) {//可同时许可多少个线程来用 sync = new NonfairSync(permits); }
- void acquire() 拿许可
- void release() 还许可
- boolean tryAcquire() 尝试的去拿许可
- int availablePermits() 查询当前还有多少许可
- boolean hasQueuedThreads() 是否有线程在等待许可
- int getQueueLength() 查询当前在等待许可的线程数量
我们接下来用 Semaphore,简单实现一个数据库连接池。
首先先写一个数据库连接类来实现 Connection 接口
public class SqlConnectionImpl implements Connection
接下来,我们来限制一下数据库连接池的连接数量
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;//数据库连接池最大连接数量
private final Semaphore useful, useless;//useful表示可用的数据库连接,useless表示已用的数据库连接
public DBPoolSemaphore() {
this.useful = new Semaphore(POOL_SIZE);
this.useless = new Semaphore(0);
}
//存放数据库连接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
/*归还连接*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
+ "可用连接数:" + useful.availablePermits());
useless.acquire();//已用数据库连接-1
synchronized (pool) {
pool.addLast(connection);
}
useful.release();//可用数据库连接+1
}
}
/*从池子拿连接*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();//可用数据库连接-1
Connection conn;
synchronized (pool) {
conn = pool.removeFirst();
}
useless.release();//已用数据库连接+1
return conn;
}
}
测试类
public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
//业务线程
private static class BusiThread extends Thread{
@Override
public void run() {
Random r = new Random();//让每个线程持有连接的时间不一样
long start = System.currentTimeMillis();
try {
Connection connect = dbPool.takeConnect();
System.out.println("Thread_"+Thread.currentThread().getId()
+"_获取数据库连接共耗时【"+(System.currentTimeMillis()-start)+"】ms.");
SleepTools.ms(100+r.nextInt(100));//模拟业务操作,线程持有连接查询数据
System.out.println("查询数据完成,归还连接!");
dbPool.returnConnect(connect);
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 30; i++) {
Thread thread = new BusiThread();
thread.start();
}
}
}
输出如下
Thread_13_获取数据库连接共耗时【1】ms.
Thread_15_获取数据库连接共耗时【0】ms.
Thread_14_获取数据库连接共耗时【0】ms.
Thread_18_获取数据库连接共耗时【1】ms.
Thread_19_获取数据库连接共耗时【0】ms.
Thread_17_获取数据库连接共耗时【0】ms.
Thread_12_获取数据库连接共耗时【3】ms.
Thread_16_获取数据库连接共耗时【1】ms.
Thread_22_获取数据库连接共耗时【0】ms.
Thread_21_获取数据库连接共耗时【0】ms.
查询数据完成,归还连接!
当前有20个线程等待数据库连接!!可用连接数:0//当连接池为空时,需要获取连接的线程就会进入一个等待队列中等待,直到有连接释放掉
Thread_24_获取数据库连接共耗时【105】ms.
查询数据完成,归还连接!
当前有19个线程等待数据库连接!!可用连接数:0
Thread_25_获取数据库连接共耗时【132】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有18个线程等待数据库连接!!可用连接数:0
Thread_23_获取数据库连接共耗时【164】ms.
查询数据完成,归还连接!
当前有17个线程等待数据库连接!!可用连接数:0
Thread_26_获取数据库连接共耗时【164】ms.
Exchanger
Exchanger 适用于俩个线程间的数据交换。
俩个线程通过调用 exchange()方法交换数据,如果第一个线程先到达同步点(exchange()),它则会一直等待第二个线程,直到第二个线程到达同步点,然后俩个线程交换数据后,继续执行任务。
- V exchange(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象
- V exchange(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。
private static class ExchangerA extends Thread{
private Exchanger<String> message = new Exchanger<>();
public ExchangerA(Exchanger<String> exchanger){
this.message = exchanger;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("等待接收ExchangerB的消息");
String exchange = message.exchange("Hello ,I am " + Thread.currentThread().getName());
System.out.println("ExchangerB 发送的消息为:/n" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class ExchangerB extends Thread{
private Exchanger<String> message = new Exchanger<>();
public ExchangerB(Exchanger<String> exchanger){
this.message = exchanger;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println("等待接收ExchangerA的消息");
String exchange = message.exchange("你好啊 ,I am " + Thread.currentThread().getName());
System.out.println("ExchangerA 发送的消息为:/n" + exchange);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();//俩个线程传入同一个Exchanger
new ExchangerA(exchanger).start();
new ExchangerB(exchanger).start();
}
再谈 Callable、Future 和 FutureTask
callable 接口使用步骤:
- 创建 callable 接口实现类,并实现 call()方法,该 call 方法将作为线程执行体,并且有返回值。
- 创建 callable 实现类的实例,使用 FutureTask 类来包装 callable 对象,该 FutureTask 对象封装了 callable 对象的 call 方法的返回值。
- 使用 FutureTask 对象作为 Thread 对象的 target 创建并启动新线程。
- 调用 FutureTask 对象的 get()方法来获取子线程执行结束后的返回值。
由图,我们可以看到,FutureTask 实现了继承自 Runnable 接口和 Future 接口的 RunnableFuture 接口。FutureTask 类的构造函数需要传入一个 Callable 类型,
下面我们来先讲解一下 Future 接口的这些方法。
- Boolean isDone() 任务是否执行结束,执行结束 return true; 不管是正常还是异常结束或者自己取消只要执行结束都 return true;
- Boolean isCancelled() 任务完成前被取消 return true; 其它情况 return false;
- Boolean cancel(Boolean )
- 任务还没有开始,return true;
- 任务已经开始的话,调用 cancel(true)则中断正在运行的任务,中断成功 return true; 失败 return false;
- 任务已经开始的话,调用 cancel(false)不会去中断已经执行的任务,但可返回 true
- 任务已经结束,return false;
- get() 获取执行结果,在这个过程中线程会一直阻塞,直到任务执行完毕,如果在此过程中,线程被中断则直接抛出异常。
- get(long timeout,TimeUnit unit) get()的超时模式,超时则抛出 TimeoutException
接下来我们测试一下这几个方法
/*实现Callable接口,允许有返回值*/
private static class UseCallable implements Callable<Integer> {
private int sum;
@Override
public Integer call() throws Exception {
System.out.println("Callable子线程开始计算");
Thread.sleep(3000);
for(int i=0;i<5000;i++) {
sum = sum+i;
}
System.out.println("Callable子线程计算完成,结果="+sum);
return sum;
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
UseCallable useCallable = new UseCallable();
FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);
new Thread(futureTask).start();
Thread.sleep(1000);
System.out.println("中断计算");
// boolean cancel = futureTask.cancel(true);
// boolean cancel = futureTask.cancel(false);//虽然调用cancel但不会中断,响应的isCancelled也是 return true的,这里大家可以测试一下
// System.out.println("线程中断结果为"+cancel);
System.out.println("线程是否完成前被取消" + futureTask.isCancelled());
System.out.println("等待线程运行结果。。。get阻塞中。。。");
System.out.println("线程运行结果:"+ futureTask.get());
}
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于