Future
Future 是 Java5 增加的类, 它用来描述一个异步计算的结果. 你可以使用 isDone 方法检查计算是否完成, 或者使用 get 方法阻塞住调用线程, 直到计算完成返回结果. 你也可以使用 cancel 方法停止任务的执行. 下面来一个栗子:
- public class FutureDemo {
- public static void main(String[] args) {
- ExecutorService es = Executors.newFixedThreadPool(10);
- Future<Integer> f = es.submit(() ->{
- Thread.sleep(10000);
- // 结果
- return 100;
- });
- // do something
- Integer result = f.get();
- System.out.println(result);
- // while (f.isDone()) {
- // System.out.println(result);
- // }
- }
- }
在这个例子中, 我们往线程池中提交了一个任务并立即返回了一个 Future 对象, 接着可以做一些其他操作, 最后利用它的 get 方法阻塞等待结果或 isDone 方法轮询等待结果(关于 Future 的原理可以参考之前的文章:[并发编程] Future 模式及 JDK 中的实现 https://juejin.im/post/5b948b11f265da0aa949f0b2 )
虽然这些方法提供了异步执行任务的能力, 但是对于结果的获取却还是很不方便, 只能通过阻塞或者轮询的方式得到任务的结果.
阻塞的方式显然和我们的异步编程的初衷相违背, 轮询的方式又会耗费无谓的 CPU 资源, 而且也不能及时的得到计算结果, 为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言, 比如 Node.JS, 采用 Callback 的方式实现异步编程. Java 的一些框架, 比如 Netty, 自己扩展了 Java 的 Future 接口, 提供了 addListener 等多个扩展方法. Google 的 guava 也提供了通用的扩展 Future:ListenableFuture , SettableFuture 以及辅助类 Futures 等, 方便异步编程. 为此, Java 终于在 JDK1.8 这个版本中增加了一个能力更强的 Future 类: CompletableFuture . 它提供了非常强大的 Future 的扩展功能, 可以帮助我们简化异步编程的复杂性, 提供了函数式编程的能力, 可以通过回调的方式处理计算结果. 下面来看看这几种方式.
Netty-Future
引入 Maven 依赖:
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.1.29.Final</version>
- </dependency>
- public class NettyFutureDemo {
- public static void main(String[] args) throws InterruptedException {
- EventExecutorGroup group = new DefaultEventExecutorGroup(4);
- System.out.println("开始:" + DateUtils.getNow());
- Future<Integer> f = group.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- System.out.println("开始耗时计算:" + DateUtils.getNow());
- Thread.sleep(10000);
- System.out.println("结束耗时计算:" + DateUtils.getNow());
- return 100;
- }
- });
- f.addListener(new FutureListener<Object>() {
- @Override
- public void operationComplete(Future<Object> objectFuture) throws Exception {
- System.out.println("计算结果:" + objectFuture.get());
- }
- });
- System.out.println("结束:" + DateUtils.getNow());
- // 不让守护线程退出
- new CountDownLatch(1).await();
- }
- }
输出结果:
开始: 2019-05-16 08:25:40:779
结束: 2019-05-16 08:25:40:788
开始耗时计算: 2019-05-16 08:25:40:788
结束耗时计算: 2019-05-16 08:25:50:789
计算结果: 100
从结果可以看出, 耗时计算结束后自动触发 Listener 的完成方法, 避免了主线程无谓的阻塞等待, 那么它究竟是怎么做到的呢? 下面看源码
DefaultEventExecutorGroup 实现了 EventExecutorGroup 接口, 而 EventExecutorGroup 则是实现了 JDK ScheduledExecutorService 接口的线程组接口, 所以它拥有线程池的所有方法. 然而它却把所有返回 java.util.concurrent.Future 的方法重写为返回 io.netty.util.concurrent.Future , 把所有返回 java.util.concurrent.ScheduledFuture 的方法重写为返回 io.netty.util.concurrent.ScheduledFuture .
- public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
- /**
- * 返回一个 EventExecutor
- */
- EventExecutor next();
- Iterator<EventExecutor> iterator();
- Future<?> submit(Runnable task);
- <T> Future<T> submit(Runnable task, T result);
- <T> Future<T> submit(Callable<T> task);
- ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
- <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
- ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
- ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
- }
EventExecutorGroup 的 submit 方法因为 newTaskFor 的重写导致返回了 netty 的 Future 实现类, 而这个实现类正是 PromiseTask .
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return (Future<T>) super.submit(task);
- }
- @Override
- protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return new PromiseTask<T>(this, callable);
- }
PromiseTask 的实现很简单, 它缓存了要执行的 Callable 任务, 并在 run 方法中完成了任务调用和 Listener 的通知.
- @Override
- public void run() {
- try {
- if (setUncancellableInternal()) {
- V result = task.call();
- setSuccessInternal(result);
- }
- } catch (Throwable e) {
- setFailureInternal(e);
- }
- }
- @Override
- public Promise<V> setSuccess(V result) {
- if (setSuccess0(result)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already:" + this);
- }
- @Override
- public Promise<V> setFailure(Throwable cause) {
- if (setFailure0(cause)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already:" + this, cause);
- }
任务调用成功或者失败都会调用 notifyListeners 来通知 Listener, 所以大家得在回调的函数里调用 isSuccess 方法来检查状态.
这里有一个疑惑, 会不会 Future 在调用 addListener 方法的时候任务已经执行完成了, 这样子会不会通知就会失败了啊?
- @Override
- public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
- synchronized (this) {
- addListener0(listener);
- }
- if (isDone()) {
- notifyListeners();
- }
- return this;
- }
可以发现, 在 Listener 添加成功之后, 会立即检查状态, 如果任务已经完成立刻进行回调, 所以这里不用担心啦. OK, 下面看看 Guava-Future 的实现.
Guava-Future
首先引入 guava 的 Maven 依赖:
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>22.0</version>
- </dependency>
- public class GuavaFutureDemo {
- public static void main(String[] args) throws InterruptedException {
- System.out.println("开始:" + DateUtils.getNow());
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- ListeningExecutorService service = MoreExecutors.listeningDecorator(executorService);
- ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
- @Override
- public Integer call() throws Exception {
- System.out.println("开始耗时计算:" + DateUtils.getNow());
- Thread.sleep(10000);
- System.out.println("结束耗时计算:" + DateUtils.getNow());
- return 100;
- }
- });
- future.addListener(new Runnable() {
- @Override
- public void run() {
- System.out.println("调用成功");
- }
- }, executorService);
- System.out.println("结束:" + DateUtils.getNow());
- new CountDownLatch(1).await();
- }
- }
ListenableFuture 可以通过 addListener 方法增加回调函数, 一般用于不在乎执行结果的地方. 如果需要在执行成功时获取结果或者执行失败时获取异常信息, 需要用到 Futures 工具类的 addCallback 方法:
- Futures.addCallback(future, new FutureCallback<Integer>() {
- @Override
- public void onSuccess(@Nullable Integer result) {
- System.out.println("成功, 计算结果:" + result);
- }
- @Override
- public void onFailure(Throwable t) {
- System.out.println("失败");
- }
- }, executorService);
前面提到除了 ListenableFuture 外, 还有一个 SettableFuture 类也支持回调能力. 它实现自 ListenableFuture , 所以拥有 ListenableFuture 的所有能力.
- public class GuavaFutureDemo {
- public static void main(String[] args) throws InterruptedException {
- System.out.println("开始:" + DateUtils.getNow());
- ExecutorService executorService = Executors.newFixedThreadPool(10);
- ListenableFuture<Integer> future = submit(executorService);
- Futures.addCallback(future, new FutureCallback<Integer>() {
- @Override
- public void onSuccess(@Nullable Integer result) {
- System.out.println("成功, 计算结果:" + result);
- }
- @Override
- public void onFailure(Throwable t) {
- System.out.println("失败:" + t.getMessage());
- }
- }, executorService);
- Thread.sleep(1000);
- System.out.println("结束:" + DateUtils.getNow());
- new CountDownLatch(1).await();
- }
- private static ListenableFuture<Integer> submit(Executor executor) {
- SettableFuture<Integer> future = SettableFuture.create();
- executor.execute(new Runnable() {
- @Override
- public void run() {
- System.out.println("开始耗时计算:" + DateUtils.getNow());
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("结束耗时计算:" + DateUtils.getNow());
- // 返回值
- future.set(100);
- // 设置异常信息
- // future.setException(new RuntimeException("custom error!"));
- }
- });
- return future;
- }
- }
看起来用法上没有太多差别, 但是有一个很容易被忽略的重要问题. 当 SettableFuture 的这种方式最后调用了 cancel 方法后, 线程池中的任务还是会继续执行, 而通过 submit 方法返回的 ListenableFuture 方法则会立即取消执行, 这点尤其要注意. 下面看看源码:
和 Netty 的 Future 一样, Guava 也是通过实现了自定义的 ExecutorService 实现类 ListeningExecutorService 来重写了 submit 方法.
- public interface ListeningExecutorService extends ExecutorService {
- <T> ListenableFuture<T> submit(Callable<T> task);
- ListenableFuture<?> submit(Runnable task);
- <T> ListenableFuture<T> submit(Runnable task, T result);
- }
同样的, newTaskFor 方法也被进行了重写, 返回了自定义的 Future 类: TrustedListenableFutureTask
- @Override
- protected final <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return TrustedListenableFutureTask.create(runnable, value);
- }
- @Override
- protected final <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
- return TrustedListenableFutureTask.create(callable);
- }
任务调用会走 TrustedFutureInterruptibleTask 的 run 方法:
- @Override
- public void run() {
- TrustedFutureInterruptibleTask localTask = task;
- if (localTask != null) {
- localTask.run();
- }
- }
- @Override
- public final void run() {
- if (!ATOMIC_HELPER.compareAndSetRunner(this, null, Thread.currentThread())) {
- return; // someone else has run or is running.
- }
- try {
- // 抽象方法, 子类进行重写
- runInterruptibly();
- } finally {
- if (wasInterrupted()) {
- while (!doneInterrupting) {
- Thread.yield();
- }
- }
- }
- }
最终还是调用到 TrustedFutureInterruptibleTask 的 runInterruptibly 方法, 等待任务完成后调用 set 方法.
- @Override
- void runInterruptibly() {
- if (!isDone()) {
- try {
- set(callable.call());
- } catch (Throwable t) {
- setException(t);
- }
- }
- }
- protected boolean set(@Nullable V value) {
- Object valueToSet = value == null ? NULL : value;
- // CAS 设置值
- if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
- complete(this);
- return true;
- }
- return false;
- }
在 complete 方法的最后会获取到 Listener 进行回调.
上面提到的 SettableFuture 和 ListenableFuture 的 cancel 方法效果不同, 原因在于一个重写了 afterDone 方法而一个没有.
下面是 ListenableFuture 的 afterDone 方法:
- @Override
- protected void afterDone() {
- super.afterDone();
- if (wasInterrupted()) {
- TrustedFutureInterruptibleTask localTask = task;
- if (localTask != null) {
- localTask.interruptTask();
- }
- }
- this.task = null;
- }
wasInterrupted 用来判断是否调用了 cancel (cancel 方法会设置一个取消对象 Cancellation 到 value 中)
- protected final boolean wasInterrupted() {
- final Object localValue = value;
- return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
- }
interruptTask 方法通过线程的 interrupt 方法真正取消线程任务的执行:
- final void interruptTask() {
- Thread currentRunner = runner;
- if (currentRunner != null) {
- currentRunner.interrupt();
- }
- doneInterrupting = true;
- }
由 Callback Hell 引出 Promise 模式
如果你对 ES6 有所接触, 就不会对 Promise 这个模式感到陌生, 如果你对前端不熟悉, 也不要紧, 我们先来看看回调地狱 (Callback Hell) 是个什么概念.
回调是一种我们推崇的异步调用方式, 但也会遇到问题, 也就是回调的嵌套. 当需要多个异步回调一起书写时, 就会出现下面的代码(以 JS 为例):
- asyncFunc1(opt, (...args1) => {
- asyncFunc2(opt, (...args2) => {
- asyncFunc3(opt, (...args3) => {
- asyncFunc4(opt, (...args4) => {
- // some operation
- });
- });
- });
- });
虽然在 JAVA 业务代码中很少出现回调的多层嵌套, 但总归是个问题, 这样的代码不易读, 嵌套太深修改也麻烦. 于是 ES6 提出了 Promise 模式来解决回调地狱的问题. 可能就会有人想问: java 中存在 Promise 模式吗? 答案是肯定的.
前面提到了 Netty 和 Guava 的扩展都提供了 addListener 这样的接口, 用于处理 Callback 调用, 但其实 jdk1.8 已经提供了一种更为高级的回调方式: CompletableFuture. 首先尝试用 CompletableFuture 来重写上面回调的问题.
- public class CompletableFutureTest {
- public static void main(String[] args) throws InterruptedException {
- System.out.println("开始:" + DateUtils.getNow());
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
- System.out.println("开始耗时计算:" + DateUtils.getNow());
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("结束耗时计算:" + DateUtils.getNow());
- return 100;
- });
- completableFuture.whenComplete((result, e) -> {
- System.out.println("回调结果:" + result);
- });
- System.out.println("结束:" + DateUtils.getNow());
- new CountDownLatch(1).await();
- }
- }
使用 CompletableFuture 耗时操作没有占用主线程的时间片, 达到了异步调用的效果. 我们也不需要引入任何第三方的依赖, 这都是依赖于 java.util.concurrent.CompletableFuture 的出现. CompletableFuture 提供了近 50 多个方法, 大大便捷了 java 多线程操作, 和异步调用的写法.
使用 CompletableFuture 解决回调地狱问题:
- public class CompletableFutureDemo {
- public static void main(String[] args) throws InterruptedException {
- long l = System.currentTimeMillis();
- CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
- System.out.println("在回调中执行耗时操作...");
- Thread.sleep(10000);
- return 100;
- });
- completableFuture = completableFuture.thenCompose(i -> {
- return CompletableFuture.supplyAsync(() -> {
- System.out.println("在回调的回调中执行耗时操作...");
- Thread.sleep(10000);
- return i + 100;
- });
- });
- completableFuture.whenComplete((result, e) -> {
- System.out.println("计算结果:" + result);
- });
- System.out.println("主线程运算耗时:" + (System.currentTimeMillis() - l) + "ms");
- new CountDownLatch(1).await();
- }
- }
输出:
在回调中执行耗时操作... 主线程运算耗时: 58 ms 在回调的回调中执行耗时操作... 计算结果: 200
使用 thenCompose 或者 thenComposeAsync 等方法可以实现回调的回调, 且写出来的方法易于维护.
总的看来, 为 Future 模式增加回调功能就不需要阻塞等待结果的返回并且不需要消耗无谓的 CPU 资源去轮询处理状态, JDK8 之前使用 Netty 或者 Guava 提供的工具类, JDK8 之后则可以使用自带的 CompletableFuture 类. Future 有两种模式: 将来式和回调式. 而回调式会出现回调地狱的问题, 由此衍生出了 Promise 模式来解决这个问题. 这才是 Future 模式和 Promise 模式的相关性.
来源: https://www.cnblogs.com/weknow619/p/10873519.html