Java 项目编程中, 为了充分利用计算机 CPU 资源, 一般开启多个线程来执行异步任务. 但不管是继承 Thread 类还是实现 Runnable 接口, 都无法获取任务执行的结果. JDK 5 中引入了 Callable 和 Future, 通过它们执行异步任务可以获取执行结果.
FutureTask 分析
JDK 5 中获取任务执行的结果主要是通过 FutureTask 类实现的. FutureTask 实现了 RunnableFuture 的接口, 它既可以作为 Runnable 被线程执行, 又可以作为 Future 得到 Callable 的返回值. 一般结合线程池 ExecutorService 类使用, 大致流程如下: 1. 调用线程将 callable 任务 submit 到线程池, 返回 Future 对象 2. 任务被封装成 FutureTask 对象
<ignore_js_op>
执行线程调用 FutureTask 的 run 方法, run 主要逻辑是执行 Callable 任务的 call 方法获取结果, 并将结果赋值给全局变量 outcome 调用线程调用 Future 对象的 get 方法来获取任务执行的结果
上述步骤 4 中, 如果任务没有执行完成, 调用 get 方法时, 调用线程将被阻塞直到任务完成.
假设这样的一个场景, 向 Executor 批量提交了 A,B 两个任务, A 任务耗时 20ms,B 任务耗时 10ms, 每个任务完成后就能根据结果继续做后面的事. 在该场景中, B 任务先完成 A 任务后完成, 由于调用线程不知道哪个任务会先完成, 只能按照任务的提交顺序调用 get 方法阻塞获取结果, 最后要耗时 20ms 才会继续做 A,B 任务后面的事.
显然这种情况下用 Future 机制是不合适的, B 任务先完成了却增加了额外的等待时间. 这个问题的原因是 Future 没有提供好的方法去判断第一个完成的任务. 当然你可以通过 Future 提供的 isDone 方法轮询的去判断第一个完成的任务, 但会消耗无谓的 CPU 资源.
从上面的分析可以看到, 单使用 Future 是不方便的, 其主要原因有: 一方面没有提供方法去判断第一个完成的任务; 另一方面是 Future 没有提供 Callback 机制, 只能通过阻塞的 get 方法去获取结果. 针对第一个问题, JAVA 引入了 CompletionService 接口.
CompletionService 分析
CompletionService 整合了 Executor 和 BlockingQueue 的功能. CompletionService 维护一个保存 Future 对象的 BlockingQueue. 只有当这个 Future 对象状态是结束的时候, 才会加入到这个 Queue 中. 这样就确保执行时间较短的任务率先被存入队列中. 与 Future 流程的不同主要是: 1. callable 任务提交后, exexute 方法执行的是封装成 QueueingFuture 的任务对象. QueueingFuture 是 FutureTask 的子类, 重写了 done 方法, 在 task 执行完成之后将当前 future 添加到阻塞队列 completionQueue
<ignore_js_op>
获取结果是通过 CompletionService 的 take 方法和 poll 方法, 这两个方法都委托给了 BlockingQueue, 它会在结果不可用时阻塞
<ignore_js_op>
由实现可知, Future 的回调行为是在 ExecutorCompletionService 中实现的, 完成的任务会封装到一个队列中, 供客户端询问时使用. Future 本身是没有提供回调方法的. 另外, 使用 CompletionService 虽然能保证任务结果按照完成先后顺序排序, 但仍存在调用阻塞的问题. Guava 的 ListenableFuture 和 JDK8 的 CompletableFuture 对 Future 进行了扩展, 当计算结果完成后可以立即执行后续逻辑.
Future 的扩展
Guava 的 ListenableFuture 扩展了 Future 接口, 是一个可以监听结果的 Future. 就是它可以监听异步执行的过程, 执行完了, 自动触发后续操作. 常用的添加监听器的方法是 Futures.addCallback 方法, 大致流程如下: 1. 每当对一个 ListenableFuture 增加回调时, 都会向线程池提交监听任务 callbackListener. 该任务的 run 方法会通过 getDone 方法阻塞获取 future 的执行结果. 代码片段见下:
<ignore_js_op>
获取结果后立即执行 callback 的 onFailure 方法或者 onSuccess 方法, 不会阻塞调用
当然 ListenableFuture 的回调机制不是通用的, 如果主任务异步执行子任务并且需等待返回结果, 此时 ListenableFuture 的作用和 Future 几乎差不多, 都是通过 get 方法阻塞获取结果. 如果主任务不需要等待子任务的返回结果, 但子任务一旦计算完成就做其他计算, 此时使用 ListenableFuture 非常合适.
CompletableFuture 是 JDK1.8 新增的的类, 提供了非常强大的 Future 的扩展功能. 可以对多个异步处理进行编排, 实现更复杂的异步处理. 它能够将回调放到与任务不同的线程中执行, 也能将回调作为继续执行的同步函数, 在与任务相同的线程中执行. 大家可以参考 CompletableFuture 的 API 了解它提供的功能.
总结
在实际项目中, 需要根据具体的业务场景选择合适的 Future 工具类来实现异步编程. 如果项目中使用 Java 8, 推荐使用 CompletableFuture 类, 它提供了更多的异步控制. 如果使用之前版本, 可以使用 Guava 等框架提供的 Future 工具类.
from: https://www.wengbi.com/thread_69744_1.html
来源: http://www.bubuko.com/infodetail-2895468.html