在 java 多线程编程中,我们经常使用线程池提交任务,并且通过 Future 来获取任务执行的结果,以此达到异步或者并行执行的效果在 java 多线程编程中,我们经常使用线程池提交任务,并且通过 Future 来获取任务执行的结果,以此达到异步或者并行执行的效果
通常我们是这样使用线程池:
- public class ExecutorServiceDemo {
- public static void main(String[] args) {
- // 创建一个线程池对象,控制要创建几个线程对象。
- ExecutorService pool = Executors.newFixedThreadPool(2);
- // 可以执行Runnable对象或者Callable对象代表的线程
- List < FutureTask > futures = new ArrayList < FutureTask > ();
- for (int i = 0; i < n; i++) {
- FutureTask future = pool.submit(new MyRunnable());
- futures.add(future);
- }
- //线程池不接收新的任务
- pool.shutdown();
- //做一些别的事情
- //...
- //获取结果
- //future.get()
- }
- }
FutureTask 实现了 Future 方法,从这些方法可以看出 FutureTask 的功能
- //取消线程池中的某个任务
- boolean cancel(boolean mayInterruptIfRunning);
- //判断线程池中的某个任务是否取消
- boolean isCancelled();
- //判断线程池中的某个任务是否完成
- boolean isDone();
- //获取某个任务的执行结果
- V get() throws InterruptedException,
- ExecutionException;
- //获取某个任务的执行结果
- V get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException,
- TimeoutException;
FutureTask 用 state 表示任务的状态:
- private volatile int state;
state 变量使用 volatile 修饰,使 state 能被多线程感知。
state 的值可以为下面几种:
- //任务被提交(到任务执行完之前(call方法执行完))
- private static final int NEW = 0;
- //任务已完成,只是结果还没有赋值给outcome变量
- private static final int COMPLETING = 1;
- //正常结束
- private static final int NORMAL = 2;
- //抛出异常
- private static final int EXCEPTIONAL = 3;
- //被取消
- private static final int CANCELLED = 4;
- //被中断
- private static final int INTERRUPTING = 5;
- //被中断
- private static final int INTERRUPTED = 6;
通常一个任务的生命周期可以为:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
任务被提交 (submit) 的时候,返回一个 FutureTask,
- public FutureTask(Callable < V > callable) {
- if (callable == null) throw new NullPointerException();
- this.callable = callable;
- this.state = NEW; // 状态为NEW
- }
此时任务的状态为 NEW.
当任务开始执行时,会调用 FutureTask 的 run 方法:
- public void run() {
- if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) //状态检查,只有状态为NEW的,且执行线程为null才可以执行,避免一个任务被重复执行
- return;
- try {
- Callable < V > c = callable;
- if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call(); //开始执行任务的方法体,此时任务的状态还是NEW
- ran = true;
- } catch(Throwable ex) {
- result = null;
- ran = false;
- setException(ex); //如果任务抛出了异常,任务状态进入NEW->COMPLETING->EXCEPTIONAL
- }
- if (ran) set(result); //调用了set方法设置返回结果,此时任务的状态还是NEW
- }
- } finally {
- runner = null;
- int s = state;
- if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); //有可能在任务执行期间,任务被取消了,调用了cancle(true)方法
- }
- }
- protected void set(V v) {
- //用CAS操作把任务的状态设置成COMPLETING,此时任务结果已经出来,只是没有赋值给outcome而以
- //但是把任务状态改成COMPLETING有可能会失败,因为此时任务状态有可能不是NEW了,有可以任务已经被取消(CANCELLED)。。。
- //所以这一步有可能什么都没做就直接返回了
- if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //NEW->COMPLETING
- outcome = v; //把结果赋值给outcome
- UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //此时任务的结果为NORMAL,正常结束 COMPLETING->NORMAL
- finishCompletion();
- }
- }
- protected void setException(Throwable t) {
- if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
- outcome = t;
- UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
- finishCompletion();
- }
- }
// 如果任务正在被打断,执行线程会让出 CPU,给别人一个打断的机会。但此时 run 方法已经执行完了,任务有可能已经执行了
- private void handlePossibleCancellationInterrupt(int s) {
- if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); //任务已经被打断了,线程让出CPU
- }
从上面的代码可以看出:
1. 任务在执行期间的状态都是为 NEW,结果出来到赋值给 outcome 之前是 COMPLETING, 赋值给 outcome 之后是 NORMAL
2. 任务执行期间抛了异常,任务状态变化:NEW->COMPLETING->EXCEPTIONAL
当我们调用 cancel 方法,取消一个任务:
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (! (state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING: CANCELLED))) //只有状态为NEW的任务可以取消,如果任务状态为COMPLETING或别的状态是没有办法取法的。
- //如果mayInterruptIfRunning=false,那么取消一个任务只是把state置为CANCELLED而以,其它的什么都没做
- return false;
- try { // in case call to interrupt throws exception
- if (mayInterruptIfRunning) {
- try {
- Thread t = runner;
- if (t != null) t.interrupt(); //mayInterruptIfRunning=true,取消一个任务,只执行调用这个任务的执行线程的interrupt()方法而以。
- } finally { // final state
- //调用interrupt()方法中断的任务,任务状态为INTERRUPTED
- //此处有一个疑问:调用线程的interrupt()方法,线程就会中止执行吗?
- UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
- }
- }
- } finally {
- finishCompletion(); //finishCompletion方法删除所有等待结果的线程队列
- }
- return true;
- }
//finishCompletion 方法删除所有等待结果的线程队列
- private void finishCompletion() {
- // assert state > COMPLETING;
- for (WaitNode q; (q = waiters) != null;) {
- if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
- for (;;) {
- Thread t = q.thread;
- if (t != null) {
- q.thread = null;
- LockSupport.unpark(t);
- }
- WaitNode next = q.next;
- if (next == null) break;
- q.next = null; // unlink to help gc
- q = next;
- }
- break;
- }
- }
- done();
- callable = null; // to reduce footprint
- }
下面看下线程的 interrupt 方法:
- public void interrupt() {
- if (this != Thread.currentThread()) checkAccess();
- synchronized(blockerLock) {
- Interruptible b = blocker;
- if (b != null) {
- interrupt0(); // Just to set the interrupt flag
- b.interrupt(this);
- return;
- }
- }
- interrupt0(); // Just to set the interrupt flag
- }
interrupt() 方法只是调用 interrupt0 方法设置了线程的 interrupt 标志位
如果我们的任务没用作 interrupt 标志位判断:
- if (Thread.currentThread().isInterrupted()) {
- // throw new Exception("...."); 抛出异常或者 do something else
- }
判断线程是否被打断,抛出异常或是做别的处理,或者任务代码中没有调用 sleep,join,wait 等可以被中断的代码,任务其实还是会继续执行。
综上所述:
如果调用 cancel(true) 中止一个在正运行任务,只是调用了执行线程的 interrupt() 方法,而 interrupt 方法可以中断 sleep ,join ,wait 方法,
所以,如果你的任务没有阻塞在 sleep,join ,wait 方法,也没有作 interrupt 状态位判断然后作出相应处理,是不可能中止一个正在运行的任务的。
如果你调用 cancel(false) 中止一个正在运行的任务,只是把这个任务的状态置为 CANCELLED 而以,如果任务已经在运行了是中止不了的。
- //一直等,直到出结果
- public V get() throws InterruptedException,
- ExecutionException {
- int s = state;
- if (s <= COMPLETING) //如果s>COMPLETING,说明要么已经正常结束了,要么已经取消了,要么已经异常了
- s = awaitDone(false, 0L); //等待结果
- return report(s); //返回结果
- }
- //等一段时间,如果还没出结果就抛异常
- public V get(long timeout, TimeUnit unit) throws InterruptedException,
- ExecutionException,
- TimeoutException {
- if (unit == null) throw new NullPointerException();
- int s = state;
- if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //我等一段时间,如果换回的结果还是没有结果,我就抛异常
- throw new TimeoutException();
- return report(s);
- }
- private V report(int s) throws ExecutionException {
- Object x = outcome;
- if (s == NORMAL) return (V) x; //正常结束,返回结果
- if (s >= CANCELLED) //被取消 返回取消异常
- throw new CancellationException();
- throw new ExecutionException((Throwable) x); //执行任务出现异常
- }
- //等待结果
- private int awaitDone(boolean timed, long nanos) throws InterruptedException {
- final long deadline = timed ? System.nanoTime() + nanos: 0L;
- WaitNode q = null;
- boolean queued = false;
- for (;;) { //循环,直到任务结束或超时
- if (Thread.interrupted()) { //我在等结果时,别人打断了我
- removeWaiter(q);
- throw new InterruptedException();
- }
- int s = state;
- if (s > COMPLETING) { //如果s>COMPLETING,说明要么已经正常结束了,要么已经取消了,要么已经异常了
- if (q != null) q.thread = null;
- return s; //直接返回状态
- } else if (s == COMPLETING) //结果已经出来了,只是还没有赋值给outcome而以
- Thread.yield(); //让出CPU,再等一会,说不定下次循环outcome就有了
- else if (q == null) q = new WaitNode(); //创建一个等待结点,如果下次循环结果还没出来,就把我加入等待列队
- else if (!queued) //结果还没出来,就把我加入等待列队
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
- else if (timed) { //等待有一段时间
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) { //过了等待时间了,我不等了,把我重等待列队中删除,返回任务状态
- removeWaiter(q);
- return state;
- }
- LockSupport.parkNanos(this, nanos); //结果还没出来,我先休眠一段时间
- } else LockSupport.park(this);
- }
- }
从上面的代码可以看出,get 通过 for (;;) 无限循环查看结果状态,如果结果还没出来,线程就加入等待队列休眠一段时间,直到超时。
答案是:没有。
最后给出测试代码:
- package com.phl.threadpool;
- import java.util.concurrent. * ;
- import java.util.concurrent.locks.LockSupport;
- import static java.lang.Thread.sleep;
- /**
- * Created by panhl on 2017-06-02.
- */
- public class ThreadPoolTaskTest {
- public static void main(String[] args) throws Exception {
- ExecutorService s = Executors.newFixedThreadPool(1);
- final A a = new A();
- a.setSs("=================");
- Future f = s.submit(new Runnable() {@Override public void run() {
- int i = 0;
- while (i < 10000000000000000L) {
- i++;
- try {
- sleep(1); //调用f.cancel(true)可以被打断
- //if(Thread.currentThread().isInterrupted()){ //使程序可以被终止
- // throw new RuntimeException("interrupted");
- //}
- } catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- System.out.println(a.getSs());
- }
- }
- });
- s.shutdown(); // 不加这个会导致程序不能结束,等待新的任务加入
- try {
- /**
- * 获取结果时超时了。只是等待的线程抛出异常,执行任务的线程并不会中止
- */
- f.get(1000, TimeUnit.MILLISECONDS);
- } catch(Exception e) {
- int k = 0;
- boolean flag = true;
- while (flag) {
- LockSupport.parkNanos(500);
- k++;
- a.setSs(k + "=======================");
- if (k > 1000) {
- flag = false;
- }
- }
- /**
- * f.cancel() cancel一个正在执行的任务
- * s.shutdownNow() ,停止所有正在执行的任务
- * 以上两个方法都是通过interrupt线程的sleep,join ,wait 方法来实现的
- * 如果这个任务中没有没用sleep.wait,join 方法,这个任务将取消不了,直到这个任务运行结束
- */
- f.cancel(true);
- //s.shutdownNow();
- a.setSs("cance" + a.getSs());
- }
- }
- public static class A {
- volatile String ss;
- public String getSs() {
- return ss;
- }
- public void setSs(String ss) {
- this.ss = ss;
- }
- }
- }
来源: http://blog.csdn.net/abountwinter/article/details/75277519