1,Future 可以让我们提前处理一些复杂的运行. 非常的方便.
这里, 我们自己去实现一个 Future!!!! 是不是很兴奋.
本人编程的思想一般都是来自于生活. 从问题出发, 再去找到对应的解决方法. 而不是先学习大量无关的解决问题的方法, 再去解决问题(当然要辩证去看这个问题)... 不知道有无人看不懂这句话呢?
------设计目的: 设计一个方法, 能在未来某一个时间点, 拿出一个结果. 如果在未来的那个时刻拿不出来, 那就让线程去等待, 一直等到拿个结果 (无论成功还是出错的结果) 才开始执行另外的事情.
- =====================================================
- // 场景: 一天, 我要去订一个蛋糕, 去到蛋糕店, 下了个订单, 我就去上班了, 下班之后, 拿到蛋糕, 回家吃啦.
- // 面向对象思想, 蛋糕店, 使用工厂模式的对象. 客人, 一个对象. 产品, 里面有订单的对象.
- // , 去到蛋糕店, 拿到蛋糕店给的订单, 使用工厂模式来模拟这个工厂,
- public class MainTest {
- public static void main(String[] args) {
- Factory pf = new Factory(); // 创建一个蛋糕店
- Future f = pf.createOrder("蛋糕"); // 蛋糕店生成什么产品, 返回一个生成什么的订单
- System.out.println("我去上班了, 下班回来拿蛋糕...");
- // 根据订单拿到做好的蛋糕, 如果没做完就要等待蛋糕店做完.
- System.out.println("下班去拿蛋糕, 拿到蛋糕了"+f.getProduct());
- }
- }
- // Factory 相当于蛋糕店, 接受订单, 开始生产蛋糕
- public class Factory {
- // 提供客人一个订单
- public Future createOrder(String name) {
- Future f = new Future(); // 创建一个订单
- new Thread(new Runnable() {
- @Override
- public void run() {
- System.out.println("开始生产蛋糕");
- Product product = new Product(name); // 生成产品
- System.out.println("生产蛋糕结束");
- f.setProduct(product);
- }
- }).start();
- return f;
- }
- }
- public class Product {
- private int id; // 订单编号
- private String name; // 生成的产品名称
- public Product(String name) {
- super();
- Random random = new Random();
- this.id = random.nextInt();
- this.name = name;
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- @Override
- public String toString() {
- return "Product [id=" + id + ", name=" + name + "]";
- }
- }
- public class Future {
- private Product product;
- private boolean flag; // true: 生产完了, false: 还没生产完
- public synchronized void setProduct(Product product) {
- this.product=product;
- if(flag) { // 如果生产完, 直接返回就可以, 不用再重新设置了.
- return;
- }
- flag=true;
- notifyAll(); // 叫醒所有处于 wait 状态的线程
- }
- public synchronized Product getProduct() {
- while(!flag) { // 当还没有生产完的时候, 就要等待.
- try {
- wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- return product;
- }
- @Override
- public String toString() {
- return "Future [product=" + product + "]";
- }
- }
三, Future 源码解读
1,Future API 以及应用场景
2,Future 的核心实现原理
3,Future 的源码
1, 怎样使用 Future API 呢?
-----如果一问到这个问题, 可以马上向起这三点.
1.1 new 一个 Callable
1.2 将刚刚创建的 Callable 的实例交给 FutureTask
1.3 将刚刚创建的 FutureTask 实例交给 Thread
2, 如果有看过源码的朋友, 都会问一个问题, 就是 Callable 和 Runnable 有什么区别?
-----Runnable 的 run 方法是被线程调用的, 在 run 方法时异步执行的
Callable 的 Call 方法, 不是异步执行的, 是由 Future 的 run 方法调用的
接口 Callable: 有返回结果并且可能抛出异常的任务;
接口 Runnable: 没有返回结果
接口 Future: 表示异步执行的结果;
类 FutureTask: 实现 Future,Runnable 等接口, 是一个异步执行的任务. 可以直接执行, 或包装成 Callable 执行;
具体详细可以参考: https://blog.csdn.net/lican19911221/article/details/78200344
四, 再次理解 Future 模式
先上一个场景: 假如你突然想做饭, 但是没有厨具, 也没有食材. 网上购买厨具比较方便, 食材去超市买更放心.
实现分析: 在快递员送厨具的期间, 我们肯定不会闲着, 可以去超市买食材. 所以, 在主线程里面另起一个子线程去网购厨具.
但是, 子线程执行的结果是要返回厨具的, 而 run 方法是没有返回值的. 所以, 这才是难点, 需要好好考虑一下.
模拟代码 1:
- package test;
- public class CommonCook {
- public static void main(String[] args) throws InterruptedException {
- long startTime = System.currentTimeMillis();
- // 第一步 网购厨具
- OnlineShopping thread = new OnlineShopping();
- thread.start();
- thread.join(); // 保证厨具送到
- // 第二步 去超市购买食材
- Thread.sleep(2000); // 模拟购买食材时间
- Shicai shicai = new Shicai();
- System.out.println("第二步: 食材到位");
- // 第三步 用厨具烹饪食材
- System.out.println("第三步: 开始展现厨艺");
- cook(thread.chuju, shicai);
- System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
- }
- // 网购厨具线程
- static class OnlineShopping extends Thread {
- private Chuju chuju;
- @Override
- public void run() {
- System.out.println("第一步: 下单");
- System.out.println("第一步: 等待送货");
- try {
- Thread.sleep(5000); // 模拟送货时间
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("第一步: 快递送到");
- chuju = new Chuju();
- }
- }
- // 用厨具烹饪食材
- static void cook(Chuju chuju, Shicai shicai) {}
- // 厨具类
- static class Chuju {}
- // 食材类
- static class Shicai {}
- }
运行结果:
第一步: 下单
第一步: 等待送货
第一步: 快递送到
第二步: 食材到位
第三步: 开始展现厨艺
总共用时 7013ms
可以看到, 多线程已经失去了意义. 在厨具送到期间, 我们不能干任何事. 对应代码, 就是调用 join 方法阻塞主线程.
有人问了, 不阻塞主线程行不行???
不行!!!
从代码来看的话, run 方法不执行完, 属性 chuju 就没有被赋值, 还是 null. 换句话说, 没有厨具, 怎么做饭.
Java 现在的多线程机制, 核心方法 run 是没有返回值的; 如果要保存 run 方法里面的计算结果, 必须等待 run 方法计算完, 无论计算过程多么耗时.
面对这种尴尬的处境, 程序员就会想: 在子线程 run 方法计算的期间, 能不能在主线程里面继续异步执行???
Where there is a will,there is a way!!!
这种想法的核心就是 Future 模式, 下面先应用一下 Java 自己实现的 Future 模式.
模拟代码 2:
- package test;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.FutureTask;
- public class FutureCook {
- public static void main(String[] args) throws InterruptedException, ExecutionException {
- long startTime = System.currentTimeMillis();
- // 第一步 网购厨具
- Callable<Chuju> onlineShopping = new Callable<Chuju>() {
- @Override
- public Chuju call() throws Exception {
- System.out.println("第一步: 下单");
- System.out.println("第一步: 等待送货");
- Thread.sleep(5000); // 模拟送货时间
- System.out.println("第一步: 快递送到");
- return new Chuju();
- }
- };
- FutureTask<Chuju> task = new FutureTask<Chuju>(onlineShopping);
- new Thread(task).start();
- // 第二步 去超市购买食材
- Thread.sleep(2000); // 模拟购买食材时间
- Shicai shicai = new Shicai();
- System.out.println("第二步: 食材到位");
- // 第三步 用厨具烹饪食材
- if (!task.isDone()) { // 联系快递员, 询问是否到货
- System.out.println("第三步: 厨具还没到, 心情好就等着(心情不好就调用 cancel 方法取消订单)");
- }
- Chuju chuju = task.get();
- System.out.println("第三步: 厨具到位, 开始展现厨艺");
- cook(chuju, shicai);
- System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
- }
- // 用厨具烹饪食材
- static void cook(Chuju chuju, Shicai shicai) {}
- // 厨具类
- static class Chuju {}
- // 食材类
- static class Shicai {}
- }
运行结果:
第一步: 下单
第一步: 等待送货
第二步: 食材到位
第三步: 厨具还没到, 心情好就等着(心情不好就调用 cancel 方法取消订单)
第一步: 快递送到
第三步: 厨具到位, 开始展现厨艺
总共用时 5005ms
可以看见, 在快递员送厨具的期间, 我们没有闲着, 可以去买食材; 而且我们知道厨具到没到, 甚至可以在厨具没到的时候, 取消订单不要了.
好神奇, 有没有.
下面具体分析一下第二段代码:
1)把耗时的网购厨具逻辑, 封装到了一个 Callable 的 call 方法里面.
- public interface Callable<V> {
- /**
- * Computes a result, or throws an exception if unable to do so.
- *
- * @return computed result
- * @throws Exception if unable to compute a result
- */
- V call() throws Exception;
- }
Callable 接口可以看作是 Runnable 接口的补充, call 方法带有返回值, 并且可以抛出异常.
2)把 Callable 实例当作参数, 生成一个 FutureTask 的对象, 然后把这个对象当作一个 Runnable, 作为参数另起线程.
- public class FutureTask<V> implements RunnableFuture<V>
- public interface RunnableFuture<V> extends Runnable, Future<V>
- public interface Future<V> {
- boolean cancel(boolean mayInterruptIfRunning);
- boolean isCancelled();
- boolean isDone();
- V get() throws InterruptedException, ExecutionException;
- V get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
这个继承体系中的核心接口是 Future.Future 的核心思想是: 一个方法 f, 计算过程可能非常耗时, 等待 f 返回, 显然不明智. 可以在调用 f 的时候, 立马返回一个 Future, 可以通过 Future 这个数据结构去控制方法 f 的计算过程.
这里的控制包括:
get 方法: 获取计算结果(如果还没计算完, 也是必须等待的)
cancel 方法: 还没计算完, 可以取消计算过程
isDone 方法: 判断是否计算完
isCancelled 方法: 判断计算是否被取消
这些接口的设计很完美, FutureTask 的实现注定不会简单, 后面再说.
3)在第三步里面, 调用了 isDone 方法查看状态, 然后直接调用 task.get 方法获取厨具, 不过这时还没送到, 所以还是会等待 3 秒. 对比第一段代码的执行结果, 这里我们节省了 2 秒. 这是因为在快递员送货期间, 我们去超市购买食材, 这两件事在同一时间段内异步执行.
通过以上 3 步, 我们就完成了对 Java 原生 Future 模式最基本的应用. 下面具体分析下 FutureTask 的实现, 先看 JDK8 的, 再比较一下 JDK6 的实现.
既然 FutureTask 也是一个 Runnable, 那就看看它的 run 方法
- public void run() {
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- try {
- Callable<V> c = callable; // 这里的 callable 是从构造方法里面传人的
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- setException(ex); // 保存 call 方法抛出的异常
- }
- if (ran)
- set(result); // 保存 call 方法的执行结果
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- int s = state;
- if (s>= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
先看 try 语句块里面的逻辑, 发现 run 方法的主要逻辑就是运行 Callable 的 call 方法, 然后将保存结果或者异常(用的一个属性 result). 这里比较难想到的是, 将 call 方法抛出的异常也保存起来了.
这里表示状态的属性 state 是个什么鬼
- * Possible state transitions:
- * NEW -> COMPLETING -> NORMAL
- * NEW -> COMPLETING -> EXCEPTIONAL
- * NEW -> CANCELLED
- * NEW -> INTERRUPTING -> INTERRUPTED
- */
- private volatile int state;
- private static final int NEW = 0;
- private static final int COMPLETING = 1;
- private static final int NORMAL = 2;
- private static final int EXCEPTIONAL = 3;
- private static final int CANCELLED = 4;
- private static final int INTERRUPTING = 5;
- private static final int INTERRUPTED = 6;
把 FutureTask 看作一个 Future, 那么它的作用就是控制 Callable 的 call 方法的执行过程, 在执行的过程中自然会有状态的转换:
1)一个 FutureTask 新建出来, state 就是 NEW 状态; COMPETING 和 INTERRUPTING 用的进行时, 表示瞬时状态, 存在时间极短(为什么要设立这种状态??? 不解);NORMAL 代表顺利完成; EXCEPTIONAL 代表执行过程出现异常; CANCELED 代表执行过程被取消; INTERRUPTED 被中断
2)执行过程顺利完成: NEW -> COMPLETING -> NORMAL
3)执行过程出现异常: NEW -> COMPLETING -> EXCEPTIONAL
4)执行过程被取消: NEW -> CANCELLED
5)执行过程中, 线程中断: NEW -> INTERRUPTING -> INTERRUPTED
代码中状态判断, CAS 操作等细节, 请读者自己阅读.
再看看 get 方法的实现:
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- if (s <= COMPLETING)
- s = awaitDone(false, 0L);
- return report(s);
- }
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- for (;;) {
- if (Thread.interrupted()) {
- removeWaiter(q);
- throw new InterruptedException();
- }
- int s = state;
- if (s> COMPLETING) {
- if (q != null)
- q.thread = null;
- return s;
- }
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- else if (q == null)
- q = new WaitNode();
- else if (!queued)
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- else if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);
- return state;
- }
- LockSupport.parkNanos(this, nanos);
- }
- else
- LockSupport.park(this);
- }
- }
get 方法的逻辑很简单, 如果 call 方法的执行过程已完成, 就把结果给出去; 如果未完成, 就将当前线程挂起等待. awaitDone 方法里面死循环的逻辑, 推演几遍就能弄懂; 它里面挂起线程的主要创新是定义了 WaitNode 类, 来将多个等待线程组织成队列, 这是与 JDK6 的实现最大的不同.
挂起的线程何时被唤醒:
- private void finishCompletion() {
- // assert state> COMPLETING;
- for (WaitNode q; (q = waiters) != null;) {
- if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
- for (;;) {
- Thread t = q.thread;
- if (t != null) {
- q.thread = null;
- LockSupport.unpark(t); // 唤醒线程
- }
- WaitNode next = q.next;
- if (next == null)
- break;
- q.next = null; // unlink to help gc
- q = next;
- }
- break;
- }
- }
- done();
- callable = null; // to reduce footprint
- }
以上就是 JDK8 的大体实现逻辑, 像 cancel,set 等方法, 也请读者自己阅读.
再来看看 JDK6 的实现.
JDK6 的 FutureTask 的基本操作都是通过自己的内部类 Sync 来实现的, 而 Sync 继承自 AbstractQueuedSynchronizer 这个出镜率极高的并发工具类
- /** State value representing that task is running */
- private static final int RUNNING = 1;
- /** State value representing that task ran */
- private static final int RAN = 2;
- /** State value representing that task was cancelled */
- private static final int CANCELLED = 4;
- /** The underlying callable */
- private final Callable<V> callable;
- /** The result to return from get() */
- private V result;
- /** The exception to throw from get() */
- private Throwable exception;
里面的状态只有基本的几个, 而且计算结果和异常是分开保存的.
- V innerGet() throws InterruptedException, ExecutionException {
- acquireSharedInterruptibly(0);
- if (getState() == CANCELLED)
- throw new CancellationException();
- if (exception != null)
- throw new ExecutionException(exception);
- return result;
- }
这个 get 方法里面处理等待线程队列的方式是调用了 acquireSharedInterruptibly 方法, 看过我之前几篇博客文章的读者应该非常熟悉了. 其中的等待线程队列, 线程挂起和唤醒等逻辑, 这里不再赘述, 如果不明白, 请出门左转.
最后来看看, Future 模式衍生出来的更高级的应用.
再上一个场景: 我们自己写一个简单的数据库连接池, 能够复用数据库连接, 并且能在高并发情况下正常工作.
实现代码 1:
- package test;
- import java.util.concurrent.ConcurrentHashMap;
- public class ConnectionPool {
- private ConcurrentHashMap<String, Connection> pool = new ConcurrentHashMap<String, Connection>();
- public Connection getConnection(String key) {
- Connection conn = null;
- if (pool.containsKey(key)) {
- conn = pool.get(key);
- } else {
- conn = createConnection();
- pool.putIfAbsent(key, conn);
- }
- return conn;
- }
- public Connection createConnection() {
- return new Connection();
- }
- class Connection {}
- }
我们用了 ConcurrentHashMap, 这样就不必把 getConnection 方法置为 synchronized(当然也可以用 Lock), 当多个线程同时调用 getConnection 方法时, 性能大幅提升.
貌似很完美了, 但是有可能导致多余连接的创建, 推演一遍:
某一时刻, 同时有 3 个线程进入 getConnection 方法, 调用 pool.containsKey(key)都返回 false, 然后 3 个线程各自都创建了连接. 虽然 ConcurrentHashMap 的 put 方法只会加入其中一个, 但还是生成了 2 个多余的连接. 如果是真正的数据库连接, 那会造成极大的资源浪费.
所以, 我们现在的难点是: 如何在多线程访问 getConnection 方法时, 只执行一次 createConnection.
结合之前 Future 模式的实现分析: 当 3 个线程都要创建连接的时候, 如果只有一个线程执行 createConnection 方法创建一个连接, 其它 2 个线程只需要用这个连接就行了. 再延伸, 把 createConnection 方法放到一个 Callable 的 call 方法里面, 然后生成 FutureTask. 我们只需要让一个线程执行 FutureTask 的 run 方法, 其它的线程只执行 get 方法就好了.
上代码:
- package test;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.FutureTask;
- public class ConnectionPool {
- private ConcurrentHashMap<String, FutureTask<Connection>> pool = new ConcurrentHashMap<String, FutureTask<Connection>>();
- public Connection getConnection(String key) throws InterruptedException, ExecutionException {
- FutureTask<Connection> connectionTask = pool.get(key);
- if (connectionTask != null) {
- return connectionTask.get();
- } else {
- Callable<Connection> callable = new Callable<Connection>() {
- @Override
- public Connection call() throws Exception {
- return createConnection();
- }
- };
- FutureTask<Connection> newTask = new FutureTask<Connection>(callable);
- connectionTask = pool.putIfAbsent(key, newTask);
- if (connectionTask == null) {
- connectionTask = newTask;
- connectionTask.run();
- }
- return connectionTask.get();
- }
- }
- public Connection createConnection() {
- return new Connection();
- }
- class Connection {
- }
- }
推演一遍: 当 3 个线程同时进入 else 语句块时, 各自都创建了一个 FutureTask, 但是 ConcurrentHashMap 只会加入其中一个. 第一个线程执行 pool.putIfAbsent 方法后返回 null, 然后 connectionTask 被赋值, 接着就执行 run 方法去创建连接, 最后 get. 后面的线程执行 pool.putIfAbsent 方法不会返回 null, 就只会执行 get 方法.
在并发的环境下, 通过 FutureTask 作为中间转换, 成功实现了让某个方法只被一个线程执行.
五, 区别
Future 是一个接口, FutureTask 是 Future 的一个实现类, 并实现了 Runnable, 因此 FutureTask 可以传递到线程对象 Thread 中新建一个线程执行. 所以可以通过 Excutor(线程池)来执行, 也可传递给 Thread 对象执行.
如果在主线程中需要执行比较耗时的操作, 但又不想阻塞主线程时, 可以把这些作业交给 Future 对象在后台完成, 当主线程将来需要时, 就可以通过 Future 对象获得后台作业的计算结果或者执行状态.
FutureTask 是为了弥补 Thread 的不足而设计的, 它可以让程序员准确地知道线程什么时候执行完成并获得到线程执行完成后返回的结果(如果有需要).
FutureTask 是一种可以取消的异步的计算任务. 它的计算是通过 Callable 实现的, 它等价于可以携带结果的 Runnable, 并且有三个状态: 等待, 运行和完成. 完成包括所有计算以任意的方式结束, 包括正常结束, 取消和异常.
Executor 框架利用 FutureTask 来完成异步任务, 并可以用来进行任何潜在的耗时的计算. 一般 FutureTask 多用于耗时的计算, 主线程可以在完成自己的任务后, 再去获取结果.
来源: https://www.cnblogs.com/pony1223/p/9502771.html