前言
本文是承接上文 [Java 源码][并发 J.U.C]--- 解析线程池之 ThreadPoolExecutor(1) 的思路继续进行分析的, 主要会涉及到线程池的关闭等方法.
关闭线程池主要有两个方法分别为 shutdown()和 shutdownNow(), 他们的区别在于 shutdown()让线程池不接受新任务并且要处理队列中的任务, 然而 shutdownNow()让线程池不接受新任务也不处理队列中的任务, 并且中断正在执行任务的线程.(只是名义上的中断, 关于中断的话题可以参考我的另一篇博客. [并发 J.U.C] 用例子理解线程中断)
辅助方法
在讨论这两个方法之前, 我们先看看几个辅助方法.
interruptIdleWorkers(boolean onlyOne)
该方法代码比较容易理解, 此时也可以看到为什么需要将 Worker 包装成一个互斥不可重入锁了. 因为需要先获得锁然后才去中断此线程.
onlyOne: true 只中断一个空闲线程, false 中断所有空闲线程.
作用: 中断空闲的线程(正在等待任务的线程也就是那些没有被锁住的线程).
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
- advanceRunState(int targetState)
作用: 将 runState 转换为给定目标, 或者如果已经至少已经给定目标则将其保留.
- private void advanceRunState(int targetState) {
- for (;;) {
- int c = ctl.get();
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
- interruptWorkers()
作用: 对所有的线程池中的线程发出中断.
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers)
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
- tryTerminate()
作用: 尝试去 terminate 线程池. 主要是通过线程池的状态判断是否需要 terminate 线程池, 如果条件满足, 但是线程池中含有线程, 则尝试对其线程发出中断, 反之更新线程池状态从 TIDYING 到 TERMINATED.
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- /**
- * 从线程池的状态来进行分析
- * 1. 如果线程池处于 RUNNING 状态 直接 return 因为不需要 terminate 线程池
- * 2. 如果线程池大于等于 TIDYING 状态(也就是 TIDYING 或者 TERMINATION) 此时已经 terminate 过了所以直接返回
- * -> 不是 1 或者 2 就表明线程池状态只能是 SHUTDOWN 或者是 STOP
- * 如果是 STOP 此时则需要 terminate 线程池 所以不能直接 return
- * 如果是 SHUTDOWN
- * a. 阻塞队列不为空, 则不能 terminate 线程池, 因为要处理阻塞队列的任务 所以不能直接 return
- * b. 阻塞队列为空, 则需要 terminate 线程池
- */
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- /**
- * 进入到这里有两种可能性
- * 1. 线程池状态为 STOP
- * 2. 线程池状态为 SHUTDOWN 并且队列为空
- *
- * 如果 workerCount 大于 0, 则中断一个线程 进而引发的操作是
- * 1. 运行 getTask()中的某个线程会返回 null
- * 2. 进而该线程返回到 runWorker 中退出 while 循环
- * 3. 进而进入到 processWorkerExit 方法中
- * 4. processWorkerExit 中会调用 tryTerminate 方法进而进入了一个循环效应
- *
- * 当线程池状态是 SHUTDOWN 或者 STOP 时会慢慢得到线程池的最终终止状态.
- * 因为 getTask()方法中数量会减 1.
- */
- if (workerCountOf(c) != 0) { // Eligible to terminate
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- /**
- * 进入到这里有两种可能性
- * 1. 线程池状态为 STOP 并且 workerCount 为 0
- * 2. 线程池状态为 SHUTDOWN 并且队列为空并且 workerCount 为 0
- *
- * 在该两种情况下就可以把线程池的状态设置为 TIDYING 继而设置为 TERMINATED
- */
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- /** 先设置成 TIDYING 状态 然后调用子类实现的 terminated 方法
- * 最后设置成 TERMINATED 状态 并唤醒那些在等待线程池 TERMINATED 的线程 */
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- terminated();
- } finally {
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
- processWorkerExit(Worker w, boolean completedAbruptly)
作用: 处理因为某种原因退出的 Worker w 并且尝试终止线程池进而判断是否需要给线程池增加线程.
- /** 只被 runWorker 调用 */
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- /**
- * 1. 如果异常退出 runWorker, 那么正常工作的 worker 线程数量需要减少 1
- * 2. 如果正常退出 runWorker, 表明 getTask()==null, 已经在 getTask()返回 null 前减 1 了
- */
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- /** 做一个尝试终止线程池的操作 只是有益于早发现线程池的终止指令进而真正去终止线程池 */
- tryTerminate();
- /**
- * 因为有一个线程因为异常或者正常退出, 所以检查线程池是否需要增加 worker 线程
- * 1. 如果线程池状态>=STOP 则肯定不需要添加 worker 了
- * 2. 线程池只能是 RUNNING 或者 SHUTDOWN 状态的时候来检查
- * 2.1 如果是异常退出, 则直接增加一个 worker 进入到线程池中
- * 2.2 如果是正常退出, 则判断一下是否需要增加 worker 到线程池中
- */
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c)>= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
关闭线程池
shutdown()
作用: 将线程池状态调整为 SHUTDOWN, 进而中断所有空闲线程并尝试终止线程池.
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- /** 判断调用者是否有权限 shutdown 线程池 */
- checkShutdownAccess();
- /** 使用 CAS 将线程池状态设置为 SHUTDOWN, 此时线程池将不会接受新任务不处理队列任务 */
- advanceRunState(SHUTDOWN);
- /** 中断所有空闲线程 */
- interruptIdleWorkers();
- /** hook for ScheduledThreadPoolExecutor 暂时无实现 */
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- /** 尝试终止线程池 */
- tryTerminate();
- }
- shutdownNow()
作用: 将线程池状态调整为 STOP, 进而中断线程池中所有线程并尝试终止线程池. 最终返回所有未处理的任务.
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- /** 判断调用者是否有权限 shutdown 线程池 */
- checkShutdownAccess();
- /** 使用 CAS 将线程池状态设置为 STOP, 此时线程池将不会接受新任务不处理队列任务并且中断所有线程 */
- advanceRunState(STOP);
- /** 尝试中断线程池中所有线程 */
- interruptWorkers();
- /** 取出队列中所有的任务 */
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- /** 尝试终止线程池 */
- tryTerminate();
- /** 返回所有未处理的任务 */
- return tasks;
- }
参考
1. java 1.8 源码
来源: http://www.jianshu.com/p/0a0e93e77809