目录
简介
继承结构
实现分析
ThreadPoolExecutor 类属性
线程池状态
构造方法
- execute(Runnable command)
- addWorker(Runnable firstTask, boolean core)
内部类 Worker
- runWorker(Worker w)
- getTask()
- processWorkerExit(Worker w, boolean completedAbruptly)
- shutdown()
- interruptIdleWorkers()
- tryTerminate()
- shutdownNow()
- interruptWorkers()
总结
简介
在 Web 开发中, 如果要密集处理多个任务时, 相对于每次都一个创建线程去执行任务, 新建线程来执行任务相对来说是个更好的选择, 体现在以下三点:
降低资源消耗. 通过重复利用已创建的线程降低线程创建和销毁造成的消耗.
提高响应速度. 当任务到达时, 任务可以不需要等到线程创建就能立即执行.
提高线程的可管理性. 线程是稀缺资源, 如果无限制的创建, 不仅会消耗系统资源, 还会降低系统的稳定性, 使用线程池可以进行统一的分配, 调优和监控.
下面从最常用的线程池 ThreadPoolExecutor 的源码分析如何实现线程池.
继承结构
Executor 是最基础的执行接口, 只提供了一个 execute(Runnable command)提交任务方法; ExecutorService 接口继承了 Executor, 在其上做了一些 shutdown(),submit()的扩展, 可以说是真正的线程池接口 AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法; TheadPoolExecutor 继承了 AbstractExecutorService, 是线程池的具体实现.
实现分析
ThreadPoolExecutor 类属性
- public class ThreadPoolExecutor extends AbstractExecutorService {
- // 线程池的控制状态 (用来表示线程池的运行状态(整形的高 3 位) 和运行的 worker 数量(低 29 位))
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 偏移量
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 最大工作线程数量(2^29 - 1)
- private static final int CAPACITY = (1 <<COUNT_BITS) - 1;
- // runState is stored in the high-order bits
- // 线程运行状态, 总共有 5 个状态, 需要 3 位来表示(所以偏移量的 29 = 32 - 3)
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
- // 阻塞队列, 存放提交给线程池的任务
- private final BlockingQueue<Runnable> workQueue;
- // 可重入锁
- private final ReentrantLock mainLock = new ReentrantLock();
- // 存放工作线程集合
- private final HashSet<Worker> workers = new HashSet<Worker>();
- // 终止条件
- private final Condition termination = mainLock.newCondition();
- // 最大线程池容量
- private int largestPoolSize;
- // 已完成任务数量
- private long completedTaskCount;
- // 线程工厂
- private volatile ThreadFactory threadFactory;
- // 拒绝执行处理器
- private volatile RejectedExecutionHandler handler;
- // 线程等待运行时间
- private volatile long keepAliveTime;
- // 是否运行核心线程超时
- private volatile boolean allowCoreThreadTimeOut;
- // 核心池的大小
- private volatile int corePoolSize;
- // 最大线程池大小
- private volatile int maximumPoolSize;
- // 默认拒绝执行处理器
- private static final RejectedExecutionHandler defaultHandler =
- new AbortPolicy();
- }
线程池状态
线程池本身有两个很重要的状态信息: 线程池的运行状态和工作线程数, 这两个状态信息都包含在变量 ctl(int 型, 32 位)中: ctl 的高 3 位表示线程状态 runState, 低 29 位表示工作线程 worker 的数量 workCount. 线程状态信息如下:
RUNNING:-1<<COUNT_BITS, 即高 3 位为 1, 低 29 位为 0, 该状态的线程池会接收新任务, 会处理在阻塞队列中等待处理的任务
SHUTDOWN:0<<COUNT_BITS, 即高 3 位为 000, 低 29 位为 0, 该状态的线程池不会再接收新任务, 但还会处理已经提交到阻塞队列中等待处理的任务
STOP:1<<COUNT_BITS, 即高 3 位为 001, 低 29 位为 0, 该状态的线程池不会再接收新任务, 不会处理在阻塞队列中等待的任务, 而且还会中断正在运行的任务
TIDYING:2<<COUNT_BITS, 即高 3 位为 010, 低 29 位为 0, 所有任务都被终止了, workerCount 为 0, 为此状态时还将调用 terminated()方法
TERMINATED:3<<COUNT_BITS, 即高 3 位为 011, 低 29 位为 0,terminated()方法调用完成后变成此状态
构造方法
核心参数含义如下:
corePoolSize: 核心线程数量
maximumPoolSize: 最大线程数量, 可能大于 corePoolSize, 也可能等于
workQueue: 必须是 BlockingQueue 阻塞队列. 当线程池中的线程数超过它的 corePoolSize 的时候, 线程会进入阻塞队列进行阻塞等待. 通过 workQueue, 线程池实现了阻塞功能
keepAliveTime: 线程池维护线程所允许的空闲时间. 当线程池中的线程数量大于 corePoolSize 的时候, 如果这时没有新的任务提交, 核心线程外的线程不会立即销毁, 而是会等待, 直到等待的时间超过了 keepAliveTime.
threadFactory: 它是 ThreadFactory 类型的变量, 用来创建新线程. 默认使用 Executors.defaultThreadFactory() 来创建线程. 使用默认的 ThreadFactory 来创建线程时, 会使新创建的线程具有相同的 NORM_PRIORITY 优先级并且是非守护线程, 同时也设置了线程的名称.
handler: 它是 RejectedExecutionHandler 类型的变量, 表示线程池的饱和策略. 如果阻塞队列满了并且没有空闲的线程, 这时如果继续提交任务, 就需要采取一种策略处理该任务.
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
- execute(Runnable command)
execute 方法是向线程池提交任务的, 此时线程池的状态为 RUNNING(其他状态不接收新提交的任务), 主要判断:
如果运行的线程少于 corePoolSize, 则创建新的工作线程来处理任务, 即使线程池中的其他线程是空闲的;
如果线程池中的线程数量大于等于 corePoolSize, 且阻塞队列未满, 将任务加入阻塞队列 workQueue;
如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize, 则只有当 workQueue 满时才创建新的线程去处理任务;
如果运行的线程数量大于等于 maximumPoolSize, 这时如果 workQueue 已经满了, 则通过 handler 所指定的策略来拒绝任务提交;
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- //ctl 记录线程池状态信息和线程池线程数
- int c = ctl.get();
- // 比较当前线程数是否小于 corePoolSize, 如果小于则新建一个线程放入线程池中
- if (workerCountOf(c) < corePoolSize) {
- // 成功加入则返回
- if (addWorker(command, true))
- return;
- // 加入失败, 重新获取 ctl
- c = ctl.get();
- }
- // 如果当前线程数大于等于 corePoolSize, 判断线程池是否仍在运行, 是的话加入阻塞队列
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 再次检查线程池是否仍在运行
- if (! isRunning(recheck) && remove(command))
- reject(command);
- /** 线程池在运行但是工作线程数为 0, 此时可能阻塞队列有任务但线程池没有工作线程池,
- * 如果配置了参数 allowCoreThreadTimeOut(默认是 false)为 true 可能因为核心线程执行
- * 完任务且阻塞队列也没有线程等待获取任务, 此时属于空闲线程, 由于超时会回收核心线程
- **/
- else if (workerCountOf(recheck) == 0)
- /** 传 false 将会在 addWorker 方法中判断线程池的工作线程数量和最大线程数量做比较
- * 传一个空的任务, 开启一个工作线程, 但这个工作线程会发现当前的任务是空, 然后会去队列中取任务
- * 这样就避免了线程池的状态是 running, 而且队列中还有任务, 但线程池却不执行队列中的任务
- **/
- addWorker(null, false);
- }
- /**
- * 如果执行到这里, 有两种情况:
- * 1. 线程池已经不是 RUNNING 状态;
- * 2. 线程池是 RUNNING 状态, 但 workerCount>= corePoolSize 并且 workQueue 已满.
- * 这时, 再次调用 addWorker 方法, 但第二个参数传入为 false, 将线程池的有限线程数量的上限设置为
- * maximumPoolSize; 如果失败则拒绝该任务
- **/
- else if (!addWorker(command, false))
- reject(command);
- }
- addWorker(Runnable firstTask, boolean core)
addWorker 方法用与创建工作线程, firstTask 表示第一个任务, core 为 true 那么线程数受 corePoolSize 制约, 为 false 则受 maximumPoolSize 制约. 执行流程:
检查线程池状态决定是否新建工作线程
新建 Worker 对象并加入到集合中
启动工作线程
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- // 运行状态
- int rs = runStateOf(c);
- /**
- * 如果 rs>= SHUTDOWN, 则表示此时不再接收新任务
- * 满足 rs>= SHUTDOWN 条件后接着判断以下 3 个条件, 只要有 1 个不满足, 则返回 false:
- * 1. rs == SHUTDOWN, 这时表示关闭状态, 不再接受新提交的任务, 但却可以继续处理阻塞队列中已保
- * 存的任务 2. firsTask 为空 3. 阻塞队列不为空
- **/
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- // 当前工作线程数
- int wc = workerCountOf(c);
- // 检查线程数量是否超出
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 尝试 CAS 增加 workerCount, 如果成功, 则跳出第一个 for 循环
- if (compareAndIncrementWorkerCount(c))
- break retry;
- //CAS 失败, 重新获取 ctl 的值
- c = ctl.get(); // Re-read ctl
- // 如果当前的运行状态不等于 rs, 说明状态已被改变, 返回第一个 for 循环继续执行
- if (runStateOf(c) != rs)
- continue retry;
- }
- }
- //CAS 增加 workCount 成功, 退出循环进入到这里
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 根据 firstTask 来创建 Worker 对象
- w = new Worker(firstTask);
- // 每一个 Worker 对象都会创建一个线程
- final Thread t = w.thread;
- if (t != null) {
- // 上锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- int rs = runStateOf(ctl.get());
- // rs <SHUTDOWN 表示是 RUNNING 状态;
- // 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null, 向线程池中添加线程.
- // 因为在 SHUTDOWN 时不会在添加新的任务, 但还是会执行 workQueue 中的任务
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive())
- throw new IllegalThreadStateException();
- // 将工作线程 work 加入到 HashSet 对象 workers
- workers.add(w);
- int s = workers.size();
- if (s> largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- // 启动线程
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
内部类 Worker
线程池的工作线程是通过包装成 Worker 对象, Worker 类本身既实现了 Runnable 接口, 又继承了同步器 AQS, 实现了一个简易的不可重入的互斥锁, 通过同步状态 state 控制中断:
初始 AQS 状态为 - 1, 此时不允许中断 interrupt(), 只有在 worker 线程启动了, 执行了 runWoker(), 将 state 置为 0, 才能中断, 不允许中断体现在:
shutdown()线程池时, 会对每个 worker tryLock()上锁, 而 Worker 类这个 AQS 的 tryAcquire()方法是固定将 state 从 0->1, 故初始状态 state==-1 时 tryLock()失败, 无法 interrupt()
shutdownNow()线程池时, 不用 tryLock()上锁, 但调用 worker.interruptIfStarted()终止 worker,interruptIfStarted()也有 state>0 才能 interrupt 的逻辑
为了防止某种情况下, 在运行中的 worker 被中断, runWorker()每次运行任务时都会 lock()上锁, 而 shutdown()这类可能会终止 worker 的操作需要先获取 worker 的锁, 这样就防止了中断正在运行的线程
- private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{
- private static final long serialVersionUID = 6138294804551838833L;
- // 工作线程
- final Thread thread;
- // 新建 Worker 传入的任务 command, 可能为 null
- Runnable firstTask;
- // 执行完的任务数量
- volatile long completedTasks;
- // 同步状态 state 为 0 代表为锁定, state 为 1 代表锁定, state 为 - 1 代表初始状态
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- // 创建线程
- this.thread = getThreadFactory().newThread(this);
- }
- public void run() {
- runWorker(this);
- }
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
- }
- runWorker(Worker w)
runWork 是工作线程执行任务的方法, 执行过程如下:
通过 while 循环步断获取任务
检查线程池运行状态, 如果处于 STOP 及以上, 中断线程; 如果是 RUNNING 或 SHUTDOWN, 不中断工作线程
task.run()执行任务
当取得的任务 task 为 null 退出循环, 执行 processWorkerExit 方法, 此时 Work 的工作线程 run()方法执行完毕, 线程销毁
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- // 同步状态 state 设置为 0, 允许中断
- w.unlock(); // allow interrupts
- // 用于标识是否工作线程由于异常突然终止, 在执行任务抛出异常或线程被中断两种情况为 true
- boolean completedAbruptly = true;
- try {
- // 循环取任务执行
- while (task != null || (task = getTask()) != null) {
- // 上锁, 表示正在工作线程正在执行任务, 不能响应中断
- w.lock();
- /**
- * 确保在线程池状态在 STOP 及以上时, 才会被设置中断标示, 否则清除中断标示, 判断以下两个条件:
- * 1, 如果线程池状态>=stop, 且当前线程没有设置中断状态, wt.interrupt()
- * 2, 如果一开始判断线程池状态 < stop, 但 Thread.interrupted()为 true, 即线程已经被中断, 又
- * 清除了中断标示, 再次判断线程池状态是否>=stop(可能调用了 shutdownNow 关闭线程池)
- **/
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- // 执行任务
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
- getTask()
当工作线程数达到 corePoolSize, 后续提交的任务就会放到阻塞队列 workQueue 中, 工作线程通过 getTask 方法从阻塞队列取出任务, 执行以下步骤:
检查线程池状态及阻塞队列是否为空
控制核心线程数(使工作线程数不超过 corePoolSize)
从阻塞队列取任务
- private Runnable getTask() {
- // timeOut 变量的值表示上次从阻塞队列中取任务时是否超时
- boolean timedOut = false;
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- /**
- * 1.rs>SHUTDOWN 所以 rs 至少等于 STOP, 这时不再处理队列中的任务, 不管 workQueue 是否为空都返回 null
- * 2.rs = SHUTDOWN 所以 rs>=STOP 肯定不成立, 这时还需要处理队列中的任务除非 workQueue 为空
- * 如果以上条件满足, 则将 workerCount 减 1 并返回 null. 因为如果当前线程池状态的值是 SHUTDOWN
- * 或以上时, 不允许再向阻塞队列中添加任务.
- */
- if (rs>= SHUTDOWN && (rs>= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- /**
- * timed 表示工作线程是否需要剔除, 为 true
- * allowCoreThreadTimeOut 默认为 false, 表示核心线程不做超时控制
- * wc> corePoolSize 超过核心线程数
- * timed 为 true 下面的 if 条件通过返回 null, 从而剔除掉超过 corePoolSize 数目的线程, 使线程数
- * 回复 corePoolSize
- **/
- boolean timed = allowCoreThreadTimeOut || wc> corePoolSize;
- /**
- * 条件 1:
- * wc> maximumPoolSize 检查是否超出 maximumPoolSize, 线程池可能重置了 maximumPoolSize
- * timed && timedOut 当前线程需要超时控制且上次取任务超时为 true
- * 条件 2: 如果线程数量大于 1, 或者阻塞队列是空的
- * 两个条件都为 true 把 workCount 减一, 返回 null
- **/
- if ((wc> maximumPoolSize || (timed && timedOut))
- && (wc> 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- //CAS 失败重新循环
- continue;
- }
- try {
- /**
- * 根据 timed 来判断, 如果为 true, 则通过阻塞队列的 poll 方法进行超时控制, 如果在
- * keepAliveTime 时间内没有获取到任务, 则返回 null;
- * 否则通过 take 方法, 如果这时队列为空, 则 take 方法会阻塞直到队列不为空.
- **/
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- // 如果 r==null, 说明是超时了
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
- processWorkerExit(Worker w, boolean completedAbruptly)
当 getTask 返回 null, 会跳出 runWork 的 while 循环, 此时工作线程的 run 方法执行完毕, 线程会终止, 同时会执行 processWorkerExit 方法, 步骤如下:
根据 completedAbruptly 参数调整线程池的工作线程数
统计完成的任务数并从集合中移出 Worker 对象
根据线程池状态进行判断是否结束线程池
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 如果是突然终止, 重新调整 workCount
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 统计完成的任务数
- completedTaskCount += w.completedTasks;
- // 从集合中移出 Worker 对象
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 根据线程池状态进行判断是否结束线程池
- tryTerminate();
- int c = ctl.get();
- // 线程状态小于 STOP, 即线程池处于 RUNNING 或 SHUTDOWN 状态
- if (runStateLessThan(c, STOP)) {
- // 检查是否异常终止
- if (!completedAbruptly) {
- // 如果 allowCoreThreadTimeOut=true, 并且等待队列有任务, 至少保留一个 worker;
- // 如果 allowCoreThreadTimeOut=false,workerCount 不少于 corePoolSize.
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c)>= min)
- return; // replacement not needed
- }
- // 突然终止, 添加一个 Worker
- addWorker(null, false);
- }
- }
- shutdown()
关闭线程池, 线程池状态由 RUNNING 变为 SHUTDOWN, 只处理已有任务不再接收新提交的任务, 中断空闲线程.
为什么要中断空闲线程: 当线程池状态为 RUNNING 但是阻塞队列为空, allowCoreThreadTimeOut 为默认值 false(既不支持核心线程超时回收), 那么工作线程必然堵塞在 workQueue.take()方法上, 而调用了 shutdown()方法后线程池状态变为 SHUTDOWN 不接收新提交的任务, 那么阻塞队列永远为空, 所以需要通过中断让线程由阻塞状态返回 null.
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 检查是否有关闭线程池权限
- checkShutdownAccess();
- // 把线程池运行状态切换为 SHUTDOWN
- advanceRunState(SHUTDOWN);
- // 中断空闲线程
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- }
- interruptIdleWorkers()
中断空闲线程.
- private void interruptIdleWorkers() {
- //false 表明中断所有空闲线程
- interruptIdleWorkers(false);
- }
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- // t.isInterrupted()检查线程是否已经中断过
- // w.tryLock() runWork 在执行任务会上锁, 执行完解锁去阻塞队列获得任务, 如果 tryLock 成功
- // 说明没有执行任务, 是空闲线程.
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
- tryTerminate()
根据线程池状态尝试关闭线程池. 这里解释一下 interruptIdleWorkers(ONLY_ONE):
当到达 workerCountOf(c) != 0 这个判断时, 说明线程池处于 SHUTDOWN 状态, 且阻塞队列已经为空, 这是若判断成立, 那么还有工作线程等待在线程池上, 会中断一个空闲线程, 这个被中断的空闲线程的 Worker 返回 null 又会调用 tryTerminate, 从而把线程池关闭的消息传给每个线程, 回收空闲线程.
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为以下几种情况时, 直接返回: * 1. RUNNING, 因为还在运行中, 不能停止; * 2. TIDYING 或 TERMINATED, 因为线程池中已经没有正在运行的线程了; * 3. SHUTDOWN 并且等待队列非空, 这时要执行完 workQueue 中的 task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 工作线程数不为 0 if (workerCountOf(c) != 0) { // Eligible to terminate // 中断一个空闲线程(等待在阻塞队列上获取任务的线程) // 中断的线程在回收 Worker 时还会调用 tryTerminate 方法, 从而回收空闲线程 interruptIdleWorkers(ONLY_ONE); return; } // 到这里说明工作线程数 workCount 为 0, 线程池状态置为 TIDYING final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } shutdownNow()
关闭线程池, 运行状态修改为 STOP, 中断所有线程; 并返回未处理的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 更改线程池状态 advanceRunState(STOP); // 中断所有工作线程, 无论是否空闲 interruptWorkers(); // 取出阻塞队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } interruptWorkers()
不论线程是否空闲, 中断所有线程.
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; /** * getState()>= 0 同步状态 state=-1 线程还没启动, 大于等于 0 说明线程以及启动, 处于 * 执行任务或空闲状态. * (t = thread) != null 线程不为 null * !t.isInterrupted() 检查线程是否被中断过. **/ if (getState()>= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
总结
本文分析了线程池 ThreadPoolExecutor 的实现, 主要从向线程池提交任务和关闭线程池这两个方法分析的, 了解了线程池复用线程资源减少线程创建和切换的开销背后的秘密.
来源: https://www.cnblogs.com/rain4j/p/10301993.html