Java 的异步编程是一项非常常用的多线程技术.
之前通过源码详细分析了 ThreadPoolExecutor《你真的懂 ThreadPoolExecutor 线程池技术吗? 看了源码你会有全新的认识》 http://zackku.com/java-thread-threadpoolexecutor/ . 通过创建一个 ThreadPoolExecutor, 往里面丢任务就可以实现多线程异步执行了.
但之前的任务主要倾向于线程池, 并没有讲到异步编程方面的内容. 本文将通过介绍 Executor+Future 框架(FutureTask 是实现的核心), 来深入了解下 Java 的异步编程.
万事从示例开始, 我们先通过示例 Demo 有一个直观的印象, 再深入去了解概念与原理.
使用示例
Demo:
使用上比较简单,
运行结果:
任务 1 异步执行: 0
任务 2 异步执行: 0
任务 2 异步执行: 1
...
任务 2 异步执行: 45
同步代码
任务 2 异步执行: 24
...
任务 1 异步执行: 199
任务 1: 执行完成
...
任务 2 异步执行: 199
任务 2: 执行完成
假若你多次执行这个程序, 会发现结果大大的不一样, 因为两个任务和同步代码是异步由多条线程执行的, 打印的结果当然是随机的.
回顾这个 Demo 做了什么,
构建了一个线程池
往线程池里面丢两个需要执行的任务
最后获取这两个任务的结果
其中第二点是异步执行两个任务, 这两个任务和主线程分别是用了三个线程并发执行的, 第三点是在主线程中同步等待两个任务的结果.
很容易看出来, 异步编程的好处就在于可以让不相干的任务异步执行, 不阻塞主线程. 若是主线程需要异步执行的结果, 此时再去等待结果会更加高效, 提高程序的执行效率.
下面来看看整个流程的实现原理.
源码分析
一般在实际项目中, 都会有配置有自己的线程池, 建议大家在用异步编程时, 配置一个专用的线程池, 做好线程隔离, 避免异步线程影响到其他模块的工作. Demo 中为了方便, 直接调用 Exectors 的方法生成一个临时的线程池, 日常不建议使用.
我们从这个 ExecutorService.submit()方法入手, 看看整体实现.
ExecutorService.submit()
定义一个接口. 这个接口接收一个 Callable 参数(执行的任务), 返回一个 Future(计算结果).
Callable, 相当于一个需要执行的任务. 它不接收任何参数, 可以返回结果, 可以抛出异常. 相类似的还有 Runnable, 它也是不接收, 不同点在于它不返回结果, 也不抛异常, 异常需要在任务内部处理. 总结来说 Callable 更像一个方法的调用, Runnable 则是一个不需要理会结果的调用. 在 JDK 8 以后, 它们都可以通过 Lamda 表达式写法去替代内部类的写法(详见 Demo).
Future, 一个异步计算的结果. 调用 get()方法可以得到对应的计算结果, 如果调用时没有异步计算完, 会阻塞等待计算的结果. 同时它还提供方法可以尝试取消任务的执行.
看回
ExecutorService.submit()
的实现, 代码在实现类
AbstractExecutorService
中.
除了它接口的实现, 还提供了两种变形. 原来接口只接收 Callable 参数, 实现类中还新增了接收 Runnable 参数的.
如果看过之前写的《你真的懂 ThreadPoolExecutor 线程池技术吗? 看了源码你会有全新的认识》, 应该了解 ThreadPoolExecutor 执行任务是可以调用 execute()方法的. 而这里面 submit()方法则是为 Callable/Runnable 加多一层 FutureTask, 从而
使执行结果有一个存放的地方, 同时也添加一个可以取消的功能. 原本的 execute()只能执行任务, 不会返回结果的, 具体实现原理可以看看之前的文章分析.
FutureTask 是 RunnableFuture 的实现. 而 RunnableFuture 是继承 Future 和 Runnable 接口的, 定义 run()接口.
因为 FutureTask 有 run()接口, 所以可以直接用一个 Callable/Runnable 创建一个 FutureTask 单独执行. 但这样并没有异步的效果, 因为没有启用新的线程去跑, 而是在原来的线程阻塞执行的.
到这里我们清楚知道了, submit()方法重点是利用 Callable/Runnable 创建一个 FutureTask, 然后多线程执行 run()方法, 达到异步处理并且得到结果的效果. 而 FutureTask 的重点则是 run()方法如何持有保存计算的结果.
FutureTask.run()
首先判断 futureTask 对象的 state 状态, 如果不是 NEW 的话, 证明已经开始运行过了, 则退出执行. 同时 futureTask 对象通过 CAS, 把当前线程赋值给变量 runner(是 Thread 类型, 说明对象使用哪个线程执行的), 如果 CAS 失败则退出.
外层 try{}代码块中, 对 callable 判空和 state 状态必须是 NEW. 内层 try{}代码真正调用 callable, 开始执行任务. 若执行成功, 则把 ran 变量设为 true, 保存结果在 result 变量中, 证明已跑成功过了; 若抛异常了, 则设为 false,result 为空, 并且调用 setException()保存异常. 最后如果 ran 为 true 的话, 则调用 set()保存 result 结果.
看下 setException()和 set()的实现.
两者的基本流程一样, CAS 置换状态, 保存结果在 outcome 变量道中, 但 setException()保存的结果类型固定是 Throwable. 另外一个不同在于最终 state 状态, 一个是 EXCEPTION, 一个是 NORMAL.
这两个方法最后都调用了 finishCompletion(). 这个方法主要是配合线程池唤醒下一个任务.
FutureTask.get()
从上面 run()方法得知, 最后执行的结果放在了 outcome 变量中. 那最终怎么从其中取出结果来, 我们来看看 get()方法.
从源码可知, get()方法分两步. 第一步, 先判断状态, 如果计算为完成, 则需要阻塞地等待完成. 第二步, 如果完成了, 则调用 report()方法获取结果并返回.
先看看 awaitDone()阻塞等待完成. 该方法可以选用超时功能.
在自旋的 for()循环中,
先判断是否线程被中断, 中断的话抛异常退出.
然后开始判断运行的 state 值, 如果 state 大于 COMPLETING, 证明计算已经是终态了, 此时返回终态变量.
若 state 等于 COMPLETING, 证明已经开始计算, 并且还在计算中. 此时为了避免过多的 CPU 时间放在这个 for 循环的自旋上, 程序执行 Thread.yield(), 把线程从运行态降为就绪态, 让出 CPU 时间.
若以上状态都不是, 则证明 state 为 NEW, 还没开始执行. 那么程序在当前循环现在会新增一个 WaitNode, 在下一个循环里面调用 LockSupport.park()把当前线程阻塞. 当 run()方法结束的时候, 会再次唤醒此线程, 避免自旋消耗 CPU 时间.
如果选用了超时功能, 在阻塞和自旋过程中超时了, 则会返回当前超时的状态.
第二步的 report()方法比较简单.
如果状态是 NORMAL, 正常结束的话, 则把 outcome 变量返回;
如果是取消或者中断状态的, 则抛出取消异常;
如果是 EXCEPTION, 则把 outcome 当作异常抛出 (之前 setException() 保存的类型就是 Throwable). 从而整个 get()会有一个异常抛出.
总结
至此我们已经比较完整地了解 Executor+Future 的框架原理了, 而 FutureTask 则是该框架的主要实现. 下面总结下要点
Executor.sumbit()方法异步执行一个任务, 并且返回一个 Future 结果.
submit()的原理是利用 Callable 创建一个 FutureTask 对象, 然后执行对象的 run()方法, 把结果保存在 outcome 中.
调用 get()获取 outcome 时, 如果任务未完成, 会阻塞线程, 等待执行完毕.
异常和正常结果都放在 outcome 中, 调用 get()获取结果或抛出异常.
更多技术文章, 精彩干货, 请关注
博客: zackku.com
微信公众号: Zack 说码
来源: https://www.cnblogs.com/zackku/p/10069230.html