1. 前言
在并发编程中, 异步回调的效率不言而喻, 在业务开发中, 如果由阻塞的任务需要执行, 必然要使用异步线程. 并且, 如果我们想在异步执行之后, 根据他的结果执行一些动作.
JDK 8 之前的 Future 只能解决上面需求的一半问题, 即异步执行, 返回一个 Future, 需要程序员调用 get 方法等待, 或者使用 isDone 轮询.
效率不高.
JDK 8 新出的 CompletableFuture API 可以解决这个问题. 但他的 API, 说实话, 不太好用.
我们只想要一个简单的 API, 能实现我们的回调功能.
我需要 3 个功能:
能通过 get 之类的方法返回结果.
能设置监听器进行回调.
可以在业务线程中设置成功或者失败.
楼主写一个简单的例子, 借鉴了 Netty 的异步 API, 希望能起到抛砖引玉的作用.
2. 设计
根据我们的需求: 第一, 我们需要一个类, 拥有 get 方法和 addListener 方法. 第二, 我们需要一个类, 能够回调我们设置的监听器. 第三, 我们需要一个类, 能够在业务线程中设置成功或者失败.
3. 初步实现
设计一个监听器接口:
- /**
- * 监听器
- * @author stateis0
- */
- public interface MyListener {
- /**
- * 子类需要重写此方法, 在异步任务完成之后会回调此方法.
- * @param promise 异步结果占位符.
- */
- void operationComplete(MyPromise promise);
- }
设计一个异步占位符, 类似 Future:
- /**
- * 异步执行结果占位符
- *
- * @author stateis0
- */
- public class MyPromise {
- /** 监听器集合 */
- List<MyListener> listeners = new ArrayList<MyListener>();
- /** 是否成功 */
- boolean success;
- /** 执行结果 **/
- Object result;
- /** 设置事变计数器 **/
- int failCount;
- /**
- * 设置成功, 并通知所有监听器.
- * @param result 结果
- * @return 是否成功
- */
- public boolean setSuccess(Object result) {
- if (success) {
- return false;
- }
- success = true;
- this.result = result;
- signalListeners();
- return true;
- }
- /**
- * 通知所有监听器, 回调监听器方法.
- */
- private void signalListeners() {
- for (MyListener l : listeners) {
- l.operationComplete(this);
- }
- }
- /**
- * 设置失败
- * @param e 异常对象
- * @return 设置是否成功
- */
- public boolean setFail(Exception e) {
- if (failCount> 0) {
- return false;
- }
- ++failCount;
- result = e;
- signalListeners();
- return true;
- }
- /**
- * 是否成功执行
- */
- public boolean isSuccess() {
- return success;
- }
- /**
- * 添加监听器
- * @param myListener 监听器
- */
- public void addListener(MyListener myListener) {
- listeners.add(myListener);
- }
- /**
- * 删除监听器
- * @param myListener 监听器
- */
- public void removeListener(MyListener myListener) {
- listeners.remove(myListener);
- }
- /**
- * 获取执行结果
- */
- public Object get() {
- return result;
- }
- }
我们希望使用线程池执行此类任务, 所以需要一个自定义的 Runnable, 而在这个 Runnable 中, 我们需要做一些简单的手脚:
- /**
- * 一个任务类, 通过重写 doWork 方法执行任务
- * @param <V> 返回值类型
- * @author stateis0
- */
- public abstract class MyRunnable<V> implements Runnable {
- final MyPromise myPromise;
- protected MyRunnable(MyPromise myPromise) {
- this.myPromise = myPromise;
- }
- @Override
- public void run() {
- try {
- V v = doWork();
- myPromise.setSuccess(v);
- } catch (Exception e) {
- myPromise.setFail(e);
- }
- }
- /**
- * 子类需要重写此方法. 并返回值, 这个值由 Promise 的 get 方法返回.
- */
- public abstract V doWork();
- }
4. 写个 Demo 测试一下
- /**
- * @author stateis0
- */
- public class MyDemo {
- public static void main(String[] args) {
- // 占位对象
- final MyPromise myPromise = new MyPromise();
- final Dto dto = new Dto();
- // 线程池
- Executor executor = Executors.newFixedThreadPool(1);
- // 异步执行任务,
- executor.execute(new MyRunnable<String>(myPromise) {
- @Override
- public String doWork() {
- return dto.doSomething();
- }
- });
- // 添加一个监听器
- myPromise.addListener(new MyListener() {
- // 当任务完成后, 就执行此方法.
- @Override
- public void operationComplete(MyPromise promise) {
- // 获取结果
- String result;
- // 如果任务成功执行了
- if (promise.isSuccess()) {
- // 获取结果并打印
- result = (String) promise.get();
- System.out.println("operationComplete ---->" + result);
- }
- // 如果失败了, 打印异常堆栈
- else {
- ((Exception) promise.get()).printStackTrace();
- }
- }
- });
- }
- }
- class Dto {
- public String doSomething() {
- System.out.println("doSomething");
- // throw new RuntimeException("cxs");
- return "result is success";
- }
- }
执行结果:
- doSomething
- operationComplete ----> result is success
符合我们的预期. 我们希望在业务对象 Dto 的 doSomething 成功返回之后, 回调监听器的 operationComplete 方法. 如果失败, 打印异常堆栈.
当然, 整体代码比较简单, 仅仅只是抛砖引玉.
实际上, 如果直接向 Callable 或者 Runnable 传入一个业务对象, 当 call 方法或者 run 方法执行完毕, 就可以根据执行结果执行我们的业务对象的方法了. 这样就是一个最简单直接的异步回调.
只是这样过于耦合.
异步任务和业务的任务耦合在了一起, 并且不能添加多个监听器, 也无法使用 promise 的 setSuccess 功能和 setFail 功能, 这两个功能可以在业务线程中设置成功或者失败, 灵活性更高.
关于异步, 我们可以多看看 Netty 的 API 设计, 易懂好用.
来源: https://juejin.im/post/5ae7559af265da0b7e0c0cfc