Java 之 CompletableFuture 使用

本贴最后更新于 478 天前,其中的信息可能已经时移世异

一、 CompletableFuture 介绍

日常开发中经常会遇到一个接口中有多个方法的调用( RPC 服务或者存储介质),我们必然会想提升接口性能,在 JDK1.8 之前大都使用线程池结合 Future 进行并行调用。Future 用于异步计算,如果要减少阻塞一般会使用回调的方式,大量的回调代码就会有 回调地狱 问题,降低了 可读性可维护性 ​,且会造成系统资源浪费。

JDK1.8 引入了 CompletableFuture 工具并提供了丰富的 API 支撑日常开发,使用函数式书写风格让我们在异步化组合编排上更加便利,且充分利用了系统资源。并且 CompletableFuture 也支持传入业务自定义的线程池。

CompletableFuture 中包含两个字段: result ​stack ​。result 用于存储当前 CF 的结果,stack 表示当前 CF 完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的 CF 的计算,依赖动作可以有多个(表示有多个依赖它的 CF ),以栈的形式存储,stack 表示栈顶元素,这种方式类似“观察者模式”。

二、CompletableFuture 常见 API

  • supplyAsync​ 单个异步计算请求
  • thenAccept​ 接收上一个 CompletableFuture 结果进行后续处理
  • thenAppy​ 接收上一个 CompletableFuture 结果进行后续处理,返回 CompletableFuture 泛型结果可以改变
  • thenCompose​ 接收上一个 CompletableFuture 结果,返回一个新的 CompletableFuture
  • complete​ 可用于将方法返回值转为 CompletableFuture 类型,并添加值信息
  • completeExceptionally​ 可用于将方法返回值转为 CompletableFuture 类型,并添加异常信息
  • completedFuture​ 创建一个已处理完成的 CompletableFuture
  • thenCombine​ 用于有两个请求依赖进行编排请求
  • allOf​ 和 anyOf​ 可用于多个请求依赖的控制
  • exceptionally​ 接收异步计算发生的异常信息
  • get​ 和 join​ 都可以阻塞获取 CompletableFuture 的返回值,但 join 是抛出的是 uncheck 异常,get 方法则需要手动处理

三、各 API 使用介绍及演示 Demo

2.1 supplyAsync、completeFuture 和 complete

ExecutorService executor = Executors.newFixedThreadPool(5);
//1、使用runAsync或supplyAsync发起异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  return "result1";
}, executor);
//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");
//3、先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");

2.2 thenCombine

可用于多个 CompletableFuture 的编排,如有三个操作 step1、step2、step3 存在依赖关系,step3 依赖 step1 和 step2 的结果,则可以使用该 API 进行编排,这是由于 CompletableFuture 实现了接口 CompletionStage ,能够按步骤进行下一次 Stage 的触发,我们可以结合 thenAppy、thenCompose 等函数式编程方法使用。

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行step 1");
    return "step1 result";
}, executor);
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行step 2");
    return "step2 result";
});
cf1.thenCombine(cf2, (result1, result2) -> {
    System.out.println(result1 + " , " + result2);
    System.out.println("执行step 3");
    return "step3 result";
}).thenAccept(result3 -> System.out.println(result3));

2.3 thenApply、thenAccept、thenCompose

thenApply 和 thenAccept 都可以获取前一个任务的返回值,但 thenAccept 自身不会有返回值。thenCompose 可以返回一个新的 CF 。

CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
  //result1为CF1的结果
  //......
  return "result3";
});
CompletableFuture<Void> cf4 = cf2.thenAccept(result2 -> {
  //result2为CF2的结果
  //......
});
CompletableFuture<Void> cf4 = cf2.thenAccept(result2 -> {
  //result2为CF2的结果
  //......
});
CompletableFuture<Integer> cf5 = orderUserMobileFuture.thenCompose(result4 -> CompletableFuture.supplyAsync(() -> 123));

2.4 allOf​ 和 anyOf

allOf 是要求所有的 CompletableFuture 都完成后才会触发后续动作, anyOf 是任意一个 CompletableFuture 完成即可。

CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf1, cf2, cf3);
CompletableFuture<String> result = cf4.thenApply(v -> {
  //这里的join并不会阻塞,cf1 cf2 cf3 必须都是完成后才会进入 thenApply
  result1 = cf1.join();
  result2 = cf2.join();
  result3 = cf3.join();
  //根据result1、result2、result53组装最终result;
  return "result";
});

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3186 引用 • 8212 回帖 • 1 关注
1 操作
zhangfeibiao 在 2023-07-16 20:51:44 更新了该帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...