从 CompletableFuture 到异步编程设计, 笔者就分为 2 部分来分享 CompletableFuture 异步编程设计, 前半部分总结下 CompletableFuture 使用实践, 后半部分分享下 CompletableFuture 实现原理和异步编程设计机制.
(ps: 本文内容较多, 请耐心阅读. 如果读者了解 CompletableFuture 使用的话, 可以直接看后半部分内容; 如果熟悉 CompletableFuture 及异步编程设计的话, 可以直接翻到文档末尾点个 "推荐" 就好了, 因为你已经掌握了 Java 异步设计精髓了 :) , 若有不正确地方, 感谢评论区指正交流~ )
Java8 新增了 CompletableFuture 类, 该类实现了 CompletionStage 和 Future 接口, 简化了 Java 异步编程能力, 该类方法较多, 其实套路只有一个, 那就是任务执行完成之后执行 "回调".
CompletableFuture 使用实践
Java8 新增的 CompletableFuture 提供对异步计算的支持, 可以通过回调的方式处理计算结果. CompletableFuture 类实现了 CompletionStage 和 Future 接口, 所以还可以像之前使用 Future 那样使用 CompletableFuture , 尽管已不再推荐这样用了.
CompletableFuture 的创建
- // 创建一个带 result 的 CompletableFuture
- CompletableFuture<String> future = CompletableFuture.completedFuture("result");
- future.get();
- // 默认创建的 CompletableFuture 是没有 result 的, 这时调用 future.get() 会一直阻塞下去知道有 result 或者出现异常
- future = new CompletableFuture<>();
- try {
- future.get(1, TimeUnit.SECONDS);
- } catch (Exception e) {
- // no care
- }
- // 给 future 填充一个 result
- future.complete("result");
- assert "result".equals(future.get());
- // 给 future 填充一个异常
- future = new CompletableFuture<>();
- future.completeExceptionally(new RuntimeException("exception"));
- try {
- future.get();
- } catch (Exception e) {
- assert "exception".equals(e.getCause().getMessage());
- }
上面的示例是自己设置 future 的 result, 一般情况下我们都是让其他线程或者线程池来执行 future 这些异步任务. 除了直接创建 CompletableFuture 对象外 (不推荐这样使用), 还可以使用如下 4 个方法创建 CompletableFuture 对象:
- // runAsync 是 Runnable 任务, 不带返回值的, 如果入参有 executor, 则使用 executor 来执行异步任务
- public static CompletableFuture<Void> runAsync(Runnable runnable)
- public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
- // supplyAsync 是待返回结果的异步任务
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
- // 使用示例
- CompletableFuture.runAsync(() -> {
- System.out.println("hello world");
- }, executor);
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- });
如果入参不带 executor, 则默认使用 ForkJoinPool.commonPool() 作为执行异步任务的线程池; 否则使用 executor 执行任务.
CompletableFuture 的完成动作
- public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
- // 使用示例
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).whenCompleteAsync((result, e) -> {
- System.out.println(result + " " + e);
- }).exceptionally((e) -> {
- System.out.println("exception" + e);
- return "exception";
- });
action 是 Action 类型, 从上面可以看出它既可以处理正常返回值也可以处理异常, whenComplete 会在任务执行完成后直接在当前线程内执行 action 动作, 后缀带 Async 的方法是交给其他线程执行 action(如果是线程池, 执行 action 的可能和之前执行异步任务的是同一个线程), 入参带 executor 的交给 executor 线程池来执行 action 动作, 当发生异常时, 会在当前线程内执行 exceptionally 方法.
除了用上面的 whenComplete 来执行完成动作之外, 还可以使用 handle 方法, 该方法可以返回一个新的 CompletableFuture 的返回类型.
- public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
- public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
- public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
- // handle 方法示例:
- CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- });
- CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
- System.out.println("handle");
- return 1;
- });
除了使用 handle 方法来执行 CompletableFuture 返回类型转换之外, 还可以使用 thenApply 方法, 二者不同的是前者会处理正常返回值和异常, 因此可以屏蔽异常, 避免继续抛出; 而后者只能处理正常返回值, 一旦有异常就会抛出.
- public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
- public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
- public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- // thenApply 方法示例:
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).thenApply((r) -> {
- System.out.println(r);
- return "aaa";
- }).thenApply((r) -> {
- System.out.println(r);
- return 1;
- });
注意, 上面的 handle,thenApply 都是返回新的 CompletableFuture 类型, 如果只是为了在 CompletableFuture 完成之后执行某些消费动作, 而不返回新的 CompletableFuture 类型, 则可以使用 thenAccept 方法.
- public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
- // thenAccept 方法示例:
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).thenAccept(r -> {
- System.out.println(r);
- }).thenAccept(r -> {
- // 这里的 r 为 Void(null) 了
- System.out.println(r);
- });
上面的 handle,thenApply 和 thenAppept 都是对上一个 CompletableFuture 执行完的结果进行某些操作. 那么可不可以同时对 2 个 CompletableFuture 执行结果执行某些操作呢? 其实也是可以的, 使用 thenAppeptBoth 方法即可. 注意, thenAppeptBoth 和 handle/thenApply/thenAppep 的流程是一样的, 只不过 thenAppeptBoth 中包含了另一个 CompletableFuture 对象 (注意, 这里另一个 CompletableFuture 对象的执行可并不是上一个 CompletableFuture 执行结束才开始执行的).
- public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
- public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
- public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
- public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
- // thenAcceptBoth 方法示例:
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
- System.out.println(r1 + "-" + r2);
- });
注意, thenAcceptBoth 方法是没有返回值的 (CompletableFuture<Void>), 如果想用 thenAcceptBoth 这样的功能并且还带有返回值的 CompletableFuture, 那么 thenCombine 方法就该上场了.
- public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
- public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
- public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
- // thenCombine 方法示例
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
- System.out.println(r1 + "-" + r2);
- return r1 + "-" + r2;
- });
thenAcceptBoth 和 runAfterBoth 是当两个 CompletableFuture 都计算完成, 而下面的方法是当任意一个 CompletableFuture 计算完成的时候就会执行.
- public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
- public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
- public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
- public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
- public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
- public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)
如果当想在多个 CompletableFuture 都计算完成或者多个 CompletableFuture 中的一个计算完成后执行某个动作, 可使用方法 allOf 和 anyOf.
- public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
- public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
如果当任务完成时并不想用 CompletableFuture 的结果, 可以使用 thenRun 方法来执行一个 Runnable.
- public CompletableFuture<Void> thenRun(Runnable action)
- public CompletableFuture<Void> thenRunAsync(Runnable action)
- public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
以上方法都是在方法中返回一个值 (或者不返回值), 其实还可以返回一个 CompletableFuture, 是不是很像类的组合一样.
- public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
- public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
- public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)
- // thenCompose 方法示例:
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).thenCompose(r -> {
- System.out.println(r);
- return CompletableFuture.supplyAsync(() -> {
- System.out.println(r + "result2");
- return r + "result2";
- });
- });
- // 上面的代码和下面的代码效果是一样的
- CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world");
- return "result";
- }).thenApply(r -> {
- System.out.println(r);
- return r;
- }).thenApplyAsync(r -> {
- System.out.println(r + "result2");
- return r + "result2";
- });
CompletableFuture 实现机制
先抛开 CompletableFuture 不谈, 如果程序中使用了线程池, 如何才能在某个任务执行完成之后执行某些动作呢? 其实 Java 线程池本身已经提供了任务执行前后的 hook 方法 (beforeExecute 和 afterExecute), 如下:
- public class ThreadPoolExecutor extends AbstractExecutorService {
- // ...
- protected void beforeExecute(Thread t, Runnable r) { }
- protected void afterExecute(Runnable r, Throwable t) { }
- // ...
- }
我们只需要自定义线程池继承 ThreadPoolExecutor , 然后重写 beforeExecute 和 afterExecute 方法即可, 在 afterExecute 里可以执行一些动作. 关于重写 ThreadPoolExecutor 的一个示例可点击查看.
那么 CompletableFuture 的实现机制是怎样的呢? 其实, 和上面的所说的 "afterExecute 机制" 是类似的 (本质是一样的, 回调机制), 也是在任务执行完成后执行某些动作, 如下代码:
- CompletableFuture.supplyAsync(() -> {
- // callable 任务
- System.out.println("hello world");
- return "result";
- }).thenApply(r -> {
- // 任务完成之后的动作 (回调方法), 类似于 ThreadPoolExecutor.afterExecute 方法
- System.out.println(r);
- return r;
- });
上面的示例代码其实主要完成了 3 个步骤, 这 3 个步骤其实也是 CompletableFuture 的实现流程:
执行任务
添加任务完成之后的动作 (回调方法)
执行回调
下面笔者就以上面的示例代码, 按照这 3 个步骤依次进行分析, 此时建议读者打开 idea, 写个 demo 进行 debug, 这里篇幅有限, 笔者就只讲解主要流程代码, 其他代码自行阅读即可 :)
1, 执行任务
执行任务的主要逻辑就是 AsyncSupply.run 方法:
- public void run() {
- CompletableFuture<T> d; Supplier<T> f;
- // dep 是当前 CompletableFuture,fn 是任务执行逻辑
- if ((d = dep) != null && (f = fn) != null) {
- dep = null; fn = null;
- if (d.result == null) {
- try {
- // 1 任务执行 & result cas 设置
- d.completeValue(f.get());
- } catch (Throwable ex) {
- // 1.1 result cas 异常设置
- d.completeThrowable(ex);
- }
- }
- // 2 任务完成, 可能涉及到回调的执行
- d.postComplete();
- }
- }
2, 添加回调
添加回调方法的流程是从 thenApply 开始的:
- public <U> CompletableFuture<U> thenApply(
- Function<? super T,? extends U> fn) {
- return uniApplyStage(null, fn);
- }
- private <V> CompletableFuture<V> uniApplyStage(
- Executor e, Function<? super T,? extends V> f) {
- if (f == null) throw new NullPointerException();
- CompletableFuture<V> d = new CompletableFuture<V>();
- if (e != null || !d.uniApply(this, f, null)) {
- // 当上一个 CompletableFuture 未完成时, 将该 CompletableFuture 添加
- // 到上一个 CompletableFuture 的 statck 中
- UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
- push(c);
- c.tryFire(SYNC);
- }
- return d;
- }
CompletableFuture.statck 是 UniCompletion 类型的, 该类型如下:
- UniCompletion<T,V> {
- volatile Completion next; // Treiber stack link
- Executor executor; // executor to use (null if none)
- CompletableFuture<V> dep; // the dependent to complete
- CompletableFuture<T> src; // source for action
- }
3, 执行回调
执行回调是从 CompletableFuture.postComplete 开始的:
- final void postComplete() {
- /*
- * On each step, variable f holds current dependents to pop
- * and run. It is extended along only one path at a time,
- * pushing others to avoid unbounded recursion.
- */
- CompletableFuture<?> f = this; Completion h;
- while ((h = f.stack) != null ||
- (f != this && (h = (f = this).stack) != null)) {
- CompletableFuture<?> d; Completion t;
- // cas 设置 h.next 到当前 CompletableFuture.statck
- if (f.casStack(h, t = h.next)) {
- if (t != null) {
- if (f != this) {
- pushStack(h);
- continue;
- }
- h.next = null; // detach
- }
- f = (d = h.tryFire(NESTED)) == null ? this : d;
- }
- }
- }
- // UniAccept
- final CompletableFuture<Void> tryFire(int mode) {
- CompletableFuture<Void> d; CompletableFuture<T> a;
- if ((d = dep) == null ||
- !d.uniAccept(a = src, fn, mode> 0 ? null : this)) // 执行回调
- return null;
- dep = null; src = null; fn = null;
- // 返回当前 CompletableFuture 或者 递归调用 postComplete
- return d.postFire(a, mode);
- }
看完上面 3 个步骤, 是不是还不太清楚多个 CompletableFuture 之间的执行流程呢, 说实话笔者第一次看的时候也是这样的 :(, 下面我们换个例子并给出图示来看:
- CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
- System.out.println("hello world f1");
- sleep(1); // TimeUnit.SECONDS.sleep(1)
- return "result f1";
- });
- CompletableFuture<String> f2 = f1.thenApply(r -> {
- System.out.println(r);
- sleep(1);
- return "f2";
- });
- CompletableFuture<String> f3 = f2.thenApply(r -> {
- System.out.println(r);
- sleep(1);
- return "f2";
- });
- CompletableFuture<String> f4 = f1.thenApply(r -> {
- System.out.println(r);
- sleep(1);
- return "f2";
- });
- CompletableFuture<String> f5 = f4.thenApply(r -> {
- System.out.println(r);
- sleep(1);
- return "f2";
- });
- CompletableFuture<String> f6 = f5.thenApply(r -> {
- System.out.println(r);
- sleep(1);
- return "f2";
- });
上面代码对应的 CompletableFuture 及其 Completion 关系如下图:
结合上图和 postComplete 流程, 可以看出执行回调的顺序是: f1 -> f4 -> f5 -> f6 -> f2 -> f3.(如果这里没看懂, 可以回过头再看下 postComplete 方法的源码~)
异步编程设计
分析完了 CompletableFuture, 相信大家都已经对 CompletableFuture 的设计与实现有了进一步的理解. 那么对于异步编程有哪些实际应用场景, 其本质到底是什么呢?
异步处理的本质其实就是回调 (系统层借助于指针来实现, 准确来说是函数指针), 用户提供一个回调方法, 回调函数不是由该函数的实现方直接调用, 而是在特定的事件或条件发生时由另外的一方调用的, 用于对该事件或条件进行响应. 从 "宏观" 来看, CompletableFuture 的实现其实很简单, 就是回调, 即在任务执行完成之后进行回调, 回调中可能涉及到其他操作, 比如下一个回调或者执行下一个任务.
异步编程在应用场景较多, 很多语言, 比如 Node.JS, 采用回调的方式实现异步编程. Java 的一些框架, 比如 Netty, 自己扩展了 Java 的 Future 接口, 提供了 addListener 等多个扩展方法:
- ServerBootstrap boot = new ServerBootstrap();
- boot.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .localAddress(8080)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new EchoHandler());
- }
- });
dubbo 中 consumer 对于 RPC response 的处理是基于回调机制的, Google guava 也提供了通用的扩展 Future:ListenableFuture,SettableFuture 以及辅助类 Futures 等, 方便异步编程.
- final String name = ...;
- inFlight.add(name);
- ListenableFuture<Result> future = service.query(name);
- future.addListener(new Runnable() {
- public void run() {
- processedCount.incrementAndGet();
- inFlight.remove(name);
- lastProcessed.set(name);
- logger.info("Done with {0}", name);
- }
- }, executor);
参考资料:
1,Java CompletableFuture 详解
- ,https://www.cnblogs.com/aniao/p/aniao_cf.html
- ,
来源: https://www.cnblogs.com/xiangnanl/p/9939447.html