我们大家都知道, 在 Java 中创建线程主要有三种方式:
继承 Thread 类;
实现 Runnable 接口;
实现 Callable 接口.
而后两者的区别在于 Callable 接口中的 call() 方法可以异步地返回一个计算结果 Future, 并且一般需要配合 ExecutorService 来执行. 这一套操作在代码实现上似乎也并不难, 可是对于 call()方法具体怎么 (被 ExecutorService) 执行的, 以及 Future 这个结果是怎么获取的, 却又不是很清楚了.
那么本篇文章, 我们就一起来学习下 Callable 接口以及 Future 的使用, 主要面向两个问题:
承载着具体任务的 call() 方法如何被执行的?
任务的执行结果如何得到?
你可能会说, 这两个难道不是一个问题吗? 任务执行了就会有返回结果, 而返回结果也一定是任务执行了才返回的, 难道还能返回一个其他任务的结果么?? 不要着急, 耐心的看下去, 你就会发现, 这两个还真的就是一个问题.
本文将分为两个部分, 第一部分分别介绍 任务, 执行, 以及结果这三个概念在 Java API 中的实体和各自的继承关系, 第二部分通过一个简单的例子回顾他们的用法, 再理解下这两个问题的答案.
Callable,Executor 与 Future
既然是一个任务被执行并返回结果, 那么我们先来看看具体的任务, 也就是 Callable 接口.
任务: Callable
非常简单, 只包含一个有泛型返回值的 call() 方法, 需要在最后返回定义类型的结果. 如果任务没有需要返回的结果, 那么将泛型 V 设为 void 并 return null; 就可以了. 对比的是 Runnable, 另一个明显的区别则是 Callable 可以抛出异常.
- public interface Callable<V> {
- V call() throws Exception;
- }
- public interface Runnable {
- public abstract void run();
- }
执行: ExecutorService
说到线程就少不了线程池, 而说到线程池肯定离不开 Executor 接口. 下面这幅图是 Executor 的框架, 我们常用的是其中的两个具体实现类 ThreadPoolExecutor 以及 ScheduledThreadPoolExecutor, 在 Executors 类中通过静态方法获取. Executors 中包含了线程池以及线程工厂的构造, 与 Executor 接口的关系类似于 Collection 接口和 Collections 类的关系.
那么我们自顶向下, 从源码上了解一下 Executor 框架, 学习学习任务是如何被执行的. 首先是 Executor 接口, 其中只定义了 execute() 方法.
public interface Executor { void execute(Runnable command); }
ExecutorService 接口继承了 Executor 接口, 主要扩展了一系列的 submit() 方法以及对 executor 的终止和判断状态. 以第一个 < T> Future<T> submit(Callable<T> task); 为例, 其中 task 为用户定义的执行的异步任务, Future 表示了任务的执行结果, 泛型 T 代表任务结果的类型.
public interface ExecutorService extends Executor { void shutdown(); // 现有任务完成后停止线程池 List<Runnable> shutdownNow(); // 立即停止线程池 boolean isShutdown(); // 判断是否已停止 boolean isTerminated(); <T> Future<T> submit(Callable<T> task); // 提交 Callale 任务 <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); // 针对 Callable 集合的 invokeAll()等方法 }
抽象类 AbstractExecutorService 是 ThreadPoolExecutor 的基类, 在下面的代码中, 它实现了 ExecutorService 接口中的 submit() 方法. 注释中是对应的 newTaskFor() 方法的代码, 非常简单, 就是将传入的 Callable 或 Runnable 参数封装成一个 FutureTask 对象.
// 1. 第一个重载方法, 参数为 Callable public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); // return new FutureTask<T>(callable); execute(ftask); return ftask; } // 2. 第二个重载方法, 参数为 Runnable public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); // return new FutureTask<T>(task, null); execute(ftask); return ftask; } // 3. 第三个重载方法, 参数为 Runnable + 返回对象 public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); // return new FutureTask<T>(task, result); execute(ftask); return ftask; }
那么也就是说, 无论传入的是 Callable 还是 Runnable,submit() 方法其实就做了三件事
具体来说, submit() 中首先生成了一个 RunnableFuture 引用的 FutureTask 实例, 然后调用 execute() 方法来执行它, 那么我们可以推测 FutureTask 继承自 RunnableFuture, 而 RunnableFuture 又实现了 Runnable, 因为 execute() 的参数应为 Runnable 类型. 上面还涉及到了 FutureTask 的构造函数, 也来看一下.
public FutureTask(Callable<V> callable) { this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); // 通过适配器将 runnable 在 call()中执行并返回 result this.state = NEW; }
FutureTask 共有两个构造方法. 第一个构造方法比较简单, 对应上面的第一个 submit(), 采用组合的方式封装 Callable 并将状态设为 NEW; 而第二个构造方法对应上面的后两个 submit() 重载, 不同之处是首先使用了 Executors.callable 来将 Runnable 和 result 组合成 Callable, 这里采用了适配器 RunnableAdapter implements Callable, 巧妙地在 call() 中执行 Runnable 并返回结果.
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; // 返回的结果; 显然: 需要在 run()中赋值 RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
在适配器设计模式中, 通常包含 ** 目标接口 Target, 适配器 Adapter 和被适配者 Adaptee ** 三类角色, 其中目标接口代表客户端 (当前业务系统) 所需要的功能, 通常为借口或抽象类; 被适配者为现存的不能满足使用需求的类; 适配器是一个转换器, 也称 wrapper, 用于给被适配者添加目标功能, 使得客户端可以按照目标接口的格式正确访问. 对于 RunnableAdapter 来说, Callable 是其目标接口, 而 Runnable 则是被适配者. RunnableAdapter 通过覆盖 call() 方法使其可按照 Callable 的要求来使用, 同时其构造方法中接收被适配者和目标对象, 满足了 call() 方法有返回值的要求.
那么总结一下 submit() 方法执行的流程, 就是: Callable 被封装在 Runnable 的子类中传入 execute() 得以执行.
结果: Future
要说 Future 就是异步任务的执行结果其实并不准确, 因为它代表了一个任务的执行过程, 有状态, 可以被取消, 而 get() 方法的返回值才是任务的结果.
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
我们在上面中还提到了 RuunableFuture 和 FutureTask. 从官方的注释来看, RuunableFuture 就是一个可以 run 的 future, 实现了 Runnable 和 Future 两个接口, 在 run() 方法中执行完计算时应该将结果保存起来以便通过 get()获取.
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation unless it has been cancelled. */ void run(); }
FutureTask 直接实现了 RunnableFuture 接口, 作为执行过程, 共有下面这几种状态, 其中 COMPLETING 为一个暂时状态, 表示正在设置结果或异常, 对应的, 设置完成后状态变为 NORMAL 或 EXCEPTIONAL;CANCELLED,INTERRUPTED 表示任务被取消或中断. 在上面的构造方法中, 将 state 初始化为 NEW.
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
然后是 FutureTask 的主要内容, 主要是 run() 和 get(). 注意 outcome 的注释, 无论是否发生异常返回的都是这个 outcome, 因为在执行中如果执行成功就将结果设置给了它(set()), 而发生异常时将异常赋给了他(setException()), 而在获取结果时也都返回了 outcome(通过 report()).
public class FutureTask<V> implements RunnableFuture<V> { private Callable<V> callable; // target, 待执行的任务 /** 保存执行结果或异常, 在 get()方法中返回 / 抛出 */ private Object outcome; // 非 volatile, 通过 CAS 保证线程安全 public void run() { ...... Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); // 调用 call()执行用户任务并获取结果 ran = true; // 执行完成, ran 置为 true } catch (Throwable ex) { // 调用 call()出现异常, 而 run()方法继续执行 result = null; ran = false; setException(ex); // setException(Throwable t): compareAndSwapInt(NEW, COMPLETING); outcome = t; } if (ran) set(result); // set(V v): compareAndSwapInt(NEW, COMPLETING); outcome = v; } } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 加入队列等待 COMPLETING 完成, 可响应超时, 中断 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 超时等待 } private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) // 将 outcome 作为执行结果返回 return (V)x; if (s>= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); // 将 outcome 作为捕获的返回 } }
FutureTask 实现了 RunnableFuture 接口, 所以有两方面的作用.
第一, 作为 Runnable 传入 execute() 方法来执行, 同时封装 Callable 对象并在 run() 中调用其 call() 方法;
第二, 作为 Future 管理任务的执行状态, 将 call() 的返回值保存在 outcome 中以通过 get() 获取. 这似乎就能回答开头的两个问题, 并且浑然天成, 就好像是一个问题, 除非发生异常的时候返回的不是任务的结果而是异常对象.
总结一下继承关系:
二, 使用举例
文章的标题有点唬人, 说到底还是讲 Callable 的用法. 现在我们知道了 Future 代表了任务执行的过程和结果, 作为 call() 方法的返回值来获取执行结果; 而 FutureTask 是一个 Runnable 的 Future, 既是任务执行的过程和结果, 又是 call 方法最终执行的载体. 下面通过一个例子看看他们在使用上的区别.
首先创建一个任务, 即定义一个任务类实现 Callable 接口, 在 call() 方法里添加我们的操作, 这里用耗时三秒然后返回 100 模拟计算过程.
class MyTask implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("子线程开始计算..."); for (int i=0;i<3;++i){ Thread.sleep(1000); System.out.println("子线程计算中, 用时"+(i+1)+"秒"); } System.out.println("子线程计算完成, 返回: 100"); return 100; } }
然后呢, 创建一个线程池, 并实例化一个 MyTask 备用.
ExecutorService executor = Executors.newCachedThreadPool(); MyTask task = new MyTask();
现在, 分别使用 Future 和 FutureTask 来获取执行结果, 看看他们有什么区别.
使用 Future
Future 一般作为 submit() 的返回值使用, 并在主线程中以阻塞的方式获取异步任务的执行结果.
System.out.println("主线程启动线程池"); Future<Integer> future = executor.submit(task); System.out.println("主线程得到返回结果:"+future.get()); executor.shutdown();
看看输出结果:
主线程启动线程池
子线程开始计算...
子线程计算中, 用时 1 秒
子线程计算中, 用时 2 秒
子线程计算中, 用时 3 秒
子线程计算完成, 返回: 100
主线程得到返回结果: 100
由于 get() 方法阻塞获取结果, 所以输出顺序为子线程计算完成后主线程输出结果.
使用 FutureTask
由于 FutureTask 集任务与结果于一身, 所以我们可以使用 FutureTask 自身而非返回值来管理任务, 这需要首先利用 Callable 对象来构造 FutureTask, 并调用不同的 submit()重载方法.
System.out.println("主线程启动线程池"); FutureTask<Integer> futureTask = new FutureTask<>(task); executor.submit(futureTask); // 作为 Ruunable 传入 submit()中 System.out.println("主线程得到返回结果:"+futureTask.get()); // 作为 Future 获取结果 executor.shutdown();
这段程序的输出与上面中完全相同, 其实两者在实际执行中的区别也不大, 虽然前者调用了 submit(Callable<T> task)而后者调用了 submit(Runnable task), 但最终都通过 execute(futuretask)来把任务加入线程池中.
总结
上面大费周章其实只是尽可能细致地讲清楚了 Callable 中的任务是如何执行的, 总结起来就是:
线程池中, submit() 方法实际上将 Callable 封装在 FutureTask 中, 将其作为 Runnable 的子类传给 execute()真正执行;
FutureTask 在 run() 中调用 Callable 对象的 call() 方法并接收返回值或捕获异常保存在 Object outcome 中, 同时管理执行过程中的状态 state;
FutureTask 同时作为 Future 的子类, 通过 get() 返回任务的执行结果, 若未执行完成则通过等待队列进行阻塞等待完成;
FutureTask 作为一个 Runnable 的 Future, 其中最重要的两个方法如下.
来源: https://www.cnblogs.com/cxuanBlog/p/13408647.html