如何优雅地实现异步编程一直都是一个难题, 异步编程的通常做法就是采用 callback 的方法, 但是这种方法通常会把代码嵌套在正常流程的代码中, 而且当有多层嵌套的时候代码更加难以维护.
另外还有一点, 异步编程的异常处理也是难以未维护, 特别是在 Java 中, 异步编程通常由新的线程完成, 而子线程的异常是无法在父线程捕获的, 那么对于异步执行结果的获取就需要付出更大的代价, 比如通过: 轮询, 事件驱动等来完成.
从 Future 说起
Java5 之后就引入 Future 用于异步编程, 通过 get() 方法来对异步执行结果的同步等待和结果获取:
- Future<String> doSomething = Executors.newSingleThreadExecutor().submit(() -> {
- try {
- Thread.sleep(1000 * 3);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "success";
- });
- String result = doSomething.get();
- System.out.println(result);
Future 的 Api 比较简单, 而已对异常处理不友好, 如果有同时有多个异步操作需要同时进行是就不好处理了
假设有这么一个场景, 用户登录拿到登录凭证 (token), 登录之后获取用户信息
- ExecutorService executors = Executors.newFixedThreadPool(10);
- Future<String> login = executors.submit(()->login());
- String token = login.get();
- Future<String> userInfo = executors.submit(() -> userInfo(token));
- String userInfoResult = userInfo.get();
- System.out.println(userInfoResult);
这种实现方法还是不能实现真正的异步编程或者说不是我们所期望的, 我们期望的是登录后获取用户信息和好友列表, 但这两件事情完成后统一对结果进行处理, 而这种方式是先等待用户信息拿到之后等待好友列表, 这就与我们的设想不符.
CompletableFuture
初识 CompletableFuture
在 Java8 中引入了 CompletableFuture 类, 同时实现了 Future 接口和 CompletionStage 接口, 提供了一套用于异步编程的 Api 接口并且提供了异步处理
CompletableFuture 提供了许多异步编程的操作, 可以说是 Java 中的 Promise 了, 下面通过 CompletableFuture 来实现上面提到的例子:
- String userInfo = CompletableFuture.supplyAsync(() -> login())
- .thenApplyAsync(token -> userInfo(token))
- .get();
- System.out.println(userInfo);
- CompletableFuture API
CompletableFuture 方法很多, 功能也很丰富, 这里不一一说明, 主要可以分为这几类来使用:
1. 把 CompletableFuture 当 Future 使用
CompletableFuture 实现了 Future 接口, 也就是 Future 能做的 CompletableFuture 也同样能使用, 加上 complete 和
completeExceptionally
方法可以控制结果的结束:
- CompletableFuture<String> f = new CompletableFuture<>();
- Executors.newSingleThreadExecutor().submit(()->{
- f.complete("hello");
- //f.completeExceptionally(new RuntimeException("error"));
- });
- String result = f.get();
- System.out.println(result);
可以通过 CompletableFuture 来控制多个异步操作同时执行:
- CompletableFuture<String> f = new CompletableFuture<>();
- new Thread(() -> {
- try {
- System.out.println("thread1:" + f.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }).start();
- new Thread(() -> {
- try {
- System.out.println("thread2:" + f.get());
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- }).start();
- f.complete("hello");
2. 异步操作
创建异步操作的方法主要是:
- public static CompletableFuture<Void> runAsync(Runnable runnable)
- public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
- public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
使用如下:
- CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "hello";
- });
- String result = f.get();
- System.out.println(result);
3. 连续异步操作
- public CompletableFuture<Void> thenRun(Runnable action)
- public CompletableFuture<Void> thenRunAsync(Runnable action)
- public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
- public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
- public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
- public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
- public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
- public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
使用如下:
- CompletableFuture<Void> f = CompletableFuture
- .supplyAsync(() -> "hello")
- .thenApplyAsync(res -> res + "world!")
- .thenAcceptAsync(System.out::println);
- // wait for job done
- f.get();
4. 等待操作完成
- public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
- public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
使用如下:
- CompletableFuture<String> f = CompletableFuture
- .supplyAsync(() -> "hello")
- .thenApplyAsync(res -> res + "world!")
- .whenComplete((res, err) -> {
- if (err != null) {
- err.printStackTrace();
- } else {
- System.out.println(res);
- }
- });
- // wait for job done
- f.get();
5. 组合
- public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
- public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
- public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
- public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
- public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
- public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
使用如下:
- CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Hello")
- .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + "World,"))
- .thenCombine(CompletableFuture.supplyAsync(() -> "CompletableFuture!"), (a, b) -> a + b);
- String result = f.get();
- System.out.println(result);//Hello World,CompletableFuture!
6. 结果 & 异常处理
- public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
- public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
- public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
- public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
使用如下:
- // 异常处理
- CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> "Hello")
- .thenApplyAsync(res -> res + "World")
- .thenApplyAsync(res -> {
- throw new RuntimeException("error");
- })
- .exceptionally(e -> {
- //handle exception here
- e.printStackTrace();
- return null;
- });
- f.get();
- // 执行结果处理
- CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> "Hello")
- .thenApplyAsync(res -> res + "World")
- .thenApplyAsync(res -> {
- throw new RuntimeException("error");
- })
- .handleAsync((res, err) -> {
- if (err != null) {
- //handle exception here
- return null;
- } else {
- return res;
- }
- });
- Object result = f2.get();
- System.out.println(result);
7. 并行执行异步操作并统一处理结果
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
使用如下:
- CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");
- CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "world");
- CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "!");
- // 使用 allOf 方法
- CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
- all.get();
- System.out.println(f1.get());
- System.out.println(f2.get());
- System.out.println(f3.get());
- // 结合 StreamAPI
- List<String> result = Stream.of(f1, f2, f3)
- .map(CompletableFuture::join)
- .collect(Collectors.toList());
- System.out.println(result);
总结
在 Java7 之前, Java 中对于异步编程的实现都可能比较复杂或者实现得不够优雅, 而 CompletableFuture 的出现则提供了异步编程的强大能力, 虽然 API 有点多但是只要稍加理解和使用还是很好应用的, 通过链式调用使原本通过回调的方式变得更加优雅, 代码的可阅读性和可维护性也得到一定的提高.
参考
Guide To CompletableFuture https://www.baeldung.com/java-completablefuture
Java CompletableFuture 详解 https://colobu.com/2016/02/29/Java-CompletableFuture/
来源: http://www.jianshu.com/p/cacb8162f409