并发是一个难题, 但是可以通过使用强力简单的抽象来显著的简化, 为了简化问题, guava 扩展了 Future 接口, 即 ListenableFuture (可以监听的 Future).
我强烈建议你在你的所有代码里使用 ListenableFuture 去替代 Future, 原因如下:
很多的 Futures 类的方法需要它.(Futures 工具类使用)
它比后来改造为 ListenableFutrue 更简单.(早点使用比重构更简单)
工具方法的提供者不需要提供 Future 和 ListenableFuture 方法的变体.(不需要兼容两套)
接口
一个传统的 Futrue 代表一个异步计算的结果: 一个可能完成也可能没有完成输出结果的计算.
一个 Future 可以用在进度计算, 或者说是 一个提供给我们结果的服务的承诺.
一个 ListenableFuture 允许注册当你在计算完成的时候的回调, 或者计算已经完成了.
这个简单的增强让高效支持多种操作成为可能. 而 Future 接口并不能支持.
ListenbleFuture 中添加的基本操作是
addListener(Runnable , Executor ),
它指出了当未来计算完成时, 指定的 Runnable 会在指定的 Executor 中运行.
增加回调
很多用户喜欢使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor) 方法.
FutureCallback 实现了下面两个方法:
onSuccess(v) 当未来成功执行的动作, 基于计算结果
onFailure(Throwable) 当未来失败执行的动作, 基于失败
创建
相较于 jdk 提供的 ExecutorService.submit(Callable) 方法来初始化一个异步计算. 它返回一个常规的 Future,
guava 提供了 ListeningExecutorService 接口, 它返回 ListenableFuture.
把 ExecutorService 转换为 ListenableExecutorService
使用: MoreExecutors.listeningDecorator(ExecutorService)
基础用法如下:
- /**
- * 说明: 使用例子代码
- * @author carter
- * 创建时间: 2020 年 03 月 19 日 9:54 上午
- **/
- @Slf4j
- public class ListenableFutureUtils {
- public static void main(String[] args) {
- ListeningExecutorService service = MoreExecutors.listeningDecorator(
- Executors.newFixedThreadPool(10));
- final ListenableFuture<AResult> listenableFuture = service.submit(() -> {
- try {
- TimeUnit.SECONDS.sleep(5);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return new AResult(30, "male", 1);
- });
- Futures.addCallback(listenableFuture,
- new FutureCallback<AResult>() {
- @Override
- public void onSuccess(AResult aResult) {
- log.info("计算成功,{}",aResult);
- }
- @Override
- public void onFailure(Throwable throwable) {
- log.error("计算错误",throwable);
- }
- },service);
- }
- @Data
- @AllArgsConstructor
- public static class AResult{
- private Integer age;
- private String sex;
- private Integer id;
- }
- }
相对的, 如果你想从基于 FutureTask 的 API 转换过来,
Guava 提供了
ListenableFutureTask.create(Callable)
和
ListenableFutureTask.create(Runnable)
不同于 jdk,ListenableFutureTask 并不是直接扩展的.
如果你喜欢抽象的设置 future 的值, 而不是实现一个方法然后计算值, 可以考虑使用 AbstractFuture 或使用 SettableFuture ;
如果你必须转换 Future 为 ListenableFuture, 你别无选择, 必须使用 JdkFutureAdapters.listenInPoolThread(Future) 来转换 Future 为 ListenableFuture
任何时候只要可能, 推荐你修改源码让它返回一个 ListenableFuture
应用
使用 ListenablFuture 最重要的原因是可以使用链式异步操作.
代码如下:
- package com.xxx.demo;
- import com.google.common.util.concurrent.AsyncFunction;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- /**
- * 说明: 异步操作链
- * @author carter
- * 创建时间: 2020 年 03 月 19 日 10:11 上午
- **/
- public class ApplicationUtils {
- public static void main(String[] args) {
- Query query = new Query(30);
- ListenableFuture<RowKey> rowKeyFuture = lookUp(query);
- AsyncFunction<RowKey, QueryResult> queryFun = rowKey -> readData(rowKey);
- final ListenableFuture<QueryResult> queryResultListenableFuture =
- Futures.transformAsync(rowKeyFuture, queryFun);
- }
- private static ListenableFuture<QueryResult> readData(RowKey rowKey) {
- return null;
- }
- private static ListenableFuture<RowKey> lookUp(Query query) {
- return null;
- }
- @Data
- @AllArgsConstructor
- public static class RowKey {
- private String id;
- }
- @Data
- @AllArgsConstructor
- public static class Query {
- private Integer age;
- }
- @Data
- @AllArgsConstructor
- public static class QueryResult {
- private String id;
- private String age;
- }
- }
很多其他高效支持的操作 ListenableFuture 提供, 而 Future 不提供.
不同的操作可以被不同的线程池执行, 一个简单的 ListenableFuture 可以有多个操作去等待.
只要一个操作开始, 其他多个操作应该开始, fan-out, 千帆竞发.
ListenableFuture 可以实现这样的操作: 它触发了所有请求的回调.
通过少量的工作, 我们可以 fan-in.
触发一个 ListenableFuture 来获得计算结果, 当其他的 Future 结束的时候.
Futures.allAsList 是一个例子.
方法介绍:
方法 | 描述 |
---|---|
transformAsync(ListenableFuture , AsyncFunction , Executor) | 返回一个新的 ListenableFuture,它的结果是执行异步函数的返回,函数入参是 ListenableFuture 的返回结果; |
transform(ListenableFuture , Function , Executor) | 返回一个新的 ListenableFuture,它的结果是执行函数的返回,函数入参是 ListenableFuture 的返回结果; |
allAsList(Iterable | 返回一个 ListenableFuture,它的结果是一个 list, 包含每一个列表中的 ListenableFuture 的执行结果,任何一个 ListenableFuture 执行失败或者取消,最后的返回结果取消 |
successfullAsList(Iterable | 返回一个 ListenableFuture,它的结果是一个 list, 包含每一个列表中的 ListenableFuture 的执行结果,成功的是结果,失败或者取消的值使用 null 替代 |
AsyncFunction<A,B> 提供了一个方法 , ListenableFutureapply(A inpunt), 它可以用来异步的转换值.
代码如下:
- package com.xxx.demo;
- import com.google.common.collect.Lists;
- import com.google.common.util.concurrent.FutureCallback;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import java.util.List;
- /**
- * 说明: 成功执行结果汇集
- * @author carter
- * 创建时间: 2020 年 03 月 19 日 10:34 上午
- **/
- @Slf4j
- public class Test3 {
- public static void main(String[] args) {
- List<ListenableFuture<QueryResult>> querys = Lists.newLinkedList();
- final ListenableFuture<List<QueryResult>> successfulAsList =
- Futures.successfulAsList(querys);
- Futures.addCallback(successfulAsList, new FutureCallback<List<QueryResult>>() {
- @Override
- public void onSuccess(List<QueryResult> queryResults) {
- log.info("执行结果列表:{}",queryResults);
- }
- @Override
- public void onFailure(Throwable throwable) {
- log.error("执行失败",throwable);
- }
- });
- }
- @Data
- @AllArgsConstructor
- public static class QueryResult{
- private Integer age;
- }
- }
嵌套的 Future
你的代码调用一个通用接口并返回一个 Future, 很可能最终返回一个嵌套的 Future.
- package com.xxx.demo;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.ListeningExecutorService;
- import com.google.common.util.concurrent.MoreExecutors;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Executors;
- /**
- * 说明: 嵌套的 ListenableFuture
- * @author carter
- * 创建时间: 2020 年 03 月 19 日 10:43 上午
- **/
- public class Test4 {
- public static void main(String[] args) {
- final ListeningExecutorService executorService = MoreExecutors
- .listeningDecorator(Executors.newFixedThreadPool(2));
- final ListeningExecutorService otherExecutorService = MoreExecutors
- .listeningDecorator(Executors.newFixedThreadPool(2));
- Callable<Foo> otherCallback = ()->new Foo("aaa");
- final ListenableFuture<ListenableFuture<Foo>> submit =
- executorService.submit(() -> otherExecutorService.submit(otherCallback));
- }
- @Data
- @AllArgsConstructor
- public static class Foo{
- private String name;
- }
- }
例子最后返回的是: ListenableFuture<ListenableFuture> ,
这个代码不对, 因为当外层的 Future 取消的时候, 无法传播到内层的 Future,
这也是一个 使用 get() 检查别的 Future 或者 Listnener 的常规的错误,
但是, 除非特别关注 否则 otherCallback 抛出的异常会被压制.
为了避免这种情况, 所有的 guava 的 Future 处理方法 (有些从 jdk 来), 有 *Async 版本来安全的解开这个嵌套.
比如: transform,transformAsyn, submit, submitAsync 方法.
深入研究
来源: https://www.cnblogs.com/snidget/p/12523925.html