概论
线程池(英语: thread pool): 一种线程使用模式. 线程过多会带来调度开销, 进而影响缓存局部性和整体性能. 而线程池维护着多个线程, 等待着监督管理者分配可并发执行的任务. 这避免了在处理短时间任务时创建与销毁线程的代价. 线程池不仅能够保证内核的充分利用, 还能防止过分调度. 可用线程数量应该取决于可用的并发处理器, 处理器内核, 内存, 网络 sockets 等的数量. 例如, 线程数一般取 CPU 数量 +2 比较合适, 线程数过多会导致额外的线程切换开销.
Java 中的线程池是用 ThreadPoolExecutor 类来实现的. 本文就对该类的源码来分析一下这个类内部对于线程的创建, 管理以及后台任务的调度等方面的执行原理.
先看一下线程池的类图:
上图的目的主要是为了让大家知道线程池相关类之间的关系, 至少赚个眼熟, 以后看到不会有害怕的感觉.
Executor 框架接口
Executor 框架是一个根据一组执行策略调用, 调度, 执行和控制的异步任务的框架, 目的是提供一种将 "任务提交" 与 "任务如何运行" 分离开来的机制.
下面是 ThreadPoolExeCutor 类图. Executors 其实是一个工具类, 里面提供了好多静态方法, 这些方法根据用户选择返回不同的线程实例.
从上图也可以看出来, ThreadPoolExeCutor 是线程池的核心.
J.U.C 中有三个 Executor 接口:
Executor: 一个运行新任务的简单接口;
ExecutorService: 扩展了 Executor 接口. 添加了一些用来管理执行器生命周期和任务生命周期的方法;
ScheduledExecutorService: 扩展了 ExecutorService. 支持 Future 和定期执行任务.
其实通过这些接口就可以看到一些设计思想, 每个接口的名字和其任务是完全匹配的. 不会因为 Executor 中只有一个方法, 就将其放到其他接口中. 这也是很重要的单一原则.
ThreadPoolExeCutor 分析
在去具体分析 ThreadPoolExeCutor 运行逻辑前, 先看下面的流程图:
该图是 ThreadPoolExeCutor 整个运行过程的一个概括, 整个源码的核心逻辑总结起来就是:
创建线程: 要知道如何去创建线程, 控制线程数量, 线程的存活与销毁;
添加任务: 任务添加后如何处理, 是立刻执行, 还是先保存;
执行任务: 如何获取任务, 任务执行失败后如何处理?
下面将进入源码分析, 来深入理解 ThreadPoolExeCutor 的设计思想.
构造函数
先来看构造函数:
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- // 注意 workQueue, threadFactory, handler 是不可以为 null 的, 为空会直接抛出错误
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
corePoolSize 核心线程数: 表示核心线程池的大小. 当提交一个任务时, 如果当前核心线程池的线程个数没有达到 corePoolSize, 则会创建新的线程来执行所提交的任务, 即使当前核心线程池有空闲的线程. 如果当前核心线程池的线程个数已经达到了 corePoolSize, 则不再重新创建线程. 如果调用了 prestartCoreThread()或者 prestartAllCoreThreads(), 线程池创建的时候所有的核心线程都会被创建并且启动. 若 corePoolSize == 0, 则任务执行完之后, 没有任何请求进入时, 销毁线程池的线程. 若 corePoolSize> 0, 即使本地任务执行完毕, 核心线程也不会被销毁. corePoolSize 其实可以理解为可保留的空闲线程数.
maximumPoolSize: 表示线程池能够容纳同时执行的最大线程数. 如果当阻塞队列已满时, 并且当前线程池线程个数没有超过 maximumPoolSize 的话, 就会创建新的线程来执行任务. 注意 maximumPoolSize>= 1 必须大于等于 1.maximumPoolSize == corePoolSize , 即是固定大小线程池. 实际上最大容量是由 CAPACITY 控制.
keepAliveTime: 线程空闲时间. 当空闲时间达到 keepAliveTime 值时, 线程会被销毁, 直到只剩下 corePoolSize 个线程为止, 避免浪费内存和句柄资源. 默认情况, 当线程池的线程数> corePoolSize 时, keepAliveTime 才会起作用. 但当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 变量设置为 true 时, 核心线程超时后会被回收.
unit: 时间单位. 为 keepAliveTime 指定时间单位.
workQueue 缓存队列. 当请求的线程数> maximumPoolSize 时, 线程进入 BlockingQueue 阻塞队列. 可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue.
threadFactory 创建线程的工程类. 可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字, 如果出现并发问题, 也方便查找问题原因.
handler 执行拒绝策略的对象. 当线程池的阻塞队列已满和指定的线程都已经开启, 说明当前线程池已经处于饱和状态了, 那么就需要采用一种策略来处理这种情况. 采用的策略有这几种:
AbortPolicy: 直接拒绝所提交的任务, 并抛出 RejectedExecutionException 异常;
CallerRunsPolicy: 只用调用者所在的线程来执行任务;
DiscardPolicy: 不处理直接丢弃掉任务;
DiscardOldestPolicy: 丢弃掉阻塞队列中存放时间最久的任务, 执行当前任务
属性定义
看完构造函数之后, 再来看下该类里面的变量, 有助于进一步理解整个代码运行逻辑, 下面是一些比较重要的变量:
- // 用来标记线程池状态(高 3 位), 线程个数(低 29 位)
- // 默认是 RUNNING 状态, 线程个数为 0
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 线程个数掩码位数, 整型最大位数 - 3, 可以适用于不同平台
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 线程最大个数(低 29 位)00011111111111111111111111111111
- private static final int CAPACITY = (1 <<COUNT_BITS) - 1;
- //(高 3 位):11100000000000000000000000000000
- private static final int RUNNING = -1 << COUNT_BITS;
- //(高 3 位):00000000000000000000000000000000
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- //(高 3 位):00100000000000000000000000000000
- private static final int STOP = 1 << COUNT_BITS;
- //(高 3 位):01000000000000000000000000000000
- private static final int TIDYING = 2 << COUNT_BITS;
- //(高 3 位):01100000000000000000000000000000
- private static final int TERMINATED = 3 << COUNT_BITS;
- // 获取高三位 运行状态
- private static int runStateOf(int c) {
- return c & ~CAPACITY;
- }
- // 获取低 29 位 线程个数
- private static int workerCountOf(int c) {
- return c & CAPACITY;
- }
- // 计算 ctl 新值, 线程状态 与 线程个数
- private static int ctlOf(int rs, int wc) {
- return rs | wc;
- }
这里需要对一些操作做些解释.
Integer.SIZE: 对于不同平台, 其位数不一样, 目前常见的是 32 位;
(1 << COUNT_BITS) -1: 首先是将 1 左移 COUNT_BITS 位, 也就是第 COUNT_BITS + 1 位是 1, 其余都是 0;-1 操作则是将后面前面的 COUNT_BITS 位都变成 1.
-1 << COUNT_BITS:-1 的原码是 10000000 00000000 00000000 00000001 , 反码是 111111111 11111111 11111111 11111110 , 补码 +1, 然后左移 29 位是 11100000 00000000 00000000 00000000; 这里转为十进制是负数.
~CAPACITY : 取反, 最高三位是 1;
总结: 这里巧妙利用 bit 操作来将线程数量和运行状态联系在一起, 减少了变量的存在和内存的占用. 其中五种状态的十进制排序: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
线程池状态
线程池状态含义:
RUNNING: 接受新任务并且处理阻塞队列里的任务;
SHUTDOWN: 拒绝新任务但是处理阻塞队列里的任务;
STOP: 拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务;
TIDYING: 所有任务都执行完 (包含阻塞队列里面任务) 当前线程池活动线程为 0, 将要调用 terminated 方法
TERMINATED: 终止状态. terminated 方法调用完成以后的状态;
线程池状态转换:
RUNNING -> SHUTDOWN: 显式调用 shutdown() 方法, 或者隐式调用了 finalize(), 它里面调用了 shutdown()方法.
RUNNING or SHUTDOWN)-> STOP: 显式 shutdownNow() 方法;
SHUTDOWN -> TIDYING: 当线程池和任务队列都为空的时候;
STOP -> TIDYING: 当线程池为空的时候;
TIDYING -> TERMINATED: 当 terminated() hook 方法执行完成时候;
原码, 反码, 补码知识小剧场:
1. 原码: 原码就是符号位加上真值的绝对值, 即用第一位表示符号, 其余位表示值. 比如如果是 8 位二进制:
[+1]原 = 0000 0001
[-1]原 = 1000 0001
负数原码第一位是符号位.
2. 反码: 反码的表示方法是, 正数的反码是其本身, 负数的反码是在其原码的基础上, 符号位不变, 其余各个位取反.
[+1] = [0000 0001]原 = [0000 0001]反
[-1] = [1000 0001]原 = [1111 1110]反
3. 补码: 补码的表示方法是, 正数的补码就是其本身, 负数的补码是在其原码的基础上, 符号位不变, 其余各位取反, 最后 +1. (即在反码的基础上 +1)
[+1] = [0000 0001]原 = [0000 0001]反 = [0000 0001]补
[-1] = [1000 0001]原 = [1111 1110]反 = [1111 1111]补
4. 总结
在知道一个数原码的情况下:
正数: 反码, 补码 就是本身自己
负数: 反码是高位符号位不变, 其余位取反. 补码: 反码 + 1
5. 左移: 当数值左, 右移时, 先将数值转化为其补码形式, 移完后, 再转换成对应的原码
左移: 高位丢弃, 低位补零
[+1] = [00000001]补
[0000 0001]补 <<1 = [0000 0010]补 = [0000 0010]原 = [+2]
[-1] = [1000 0001]原 = [1111 1111]补
[1111 1111]补 << 1 = [1111 1110]补 = [1000 0010]原 = [-2]
其中, 再次提醒, 负数的补码是反码 + 1; 负数的反码是补码 - 1;
6. 右移: 高位保持不变, 低位丢弃
[+127] = [0111 1111]原 = [0111 1111]补
[0111 1111]补>> 1 = [0011 1111]补 = [0011 1111]原 = [+63]
[-127] = [1111 1111]原 = [1000 0001]补
[1000 0001]补>> 1 = [1100 0000]补 = [1100 0000]原 = [-64]
execute 方法分析
通过 ThreadPoolExecutor 创建线程池后, 提交任务后执行过程是怎样的, 下面来通过源码来看一看. execute 方法源码如下:
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- // 返回包含线程数及线程池状态(头 3 位)
- int c = ctl.get();
- // 如果工作线程数小于核心线程数, 则创建线程任务执行
- if (workerCountOf(c) <corePoolSize) {
- if (addWorker(command, true))
- return;
- // 如果创建失败, 防止外部已经在线程池中加入新任务, 重新获取
- c = ctl.get();
- }
- // 只有线程池处于 RUNNING 状态, 且 入队列成功
- if (isRunning(c) && workQueue.offer(command)) {
- // 后面的操作属于 double-check
- int recheck = ctl.get();
- // 如果线程池不是 RUNNING 状态, 则将刚加入队列的任务移除
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 如果之前的线程已被消费完, 新建一个线程
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 核心池和队列都满了, 尝试创建一个新线程
- else if (!addWorker(command, false))
- // 如果 addWorker 返回是 false, 即创建失败, 则唤醒拒绝策略
- reject(command);
- }
execute 方法执行逻辑有这样几种情况:
如果当前运行的线程少于 corePoolSize, 则会创建新的线程来执行新的任务;
如果运行的线程个数等于或者大于 corePoolSize, 则会将提交的任务存放到阻塞队列 workQueue 中;
如果当前 workQueue 队列已满的话, 则会创建新的线程来执行任务;
如果线程个数已经超过了 maximumPoolSize, 则会使用饱和策略 RejectedExecutionHandler 来进行处理.
这里要注意一下 addWorker(null, false)也就是创建一个线程, 但并没有传入任务, 因为任务已经被添加到 workQueue 中了, 所以 worker 在执行的时候, 会直接从 workQueue 中获取任务. 所以, 在 workerCountOf(recheck) == 0 时执行 addWorker(null, false)也是为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务.
需要注意的是, 线程池的设计思想就是使用了核心线程池 corePoolSize, 阻塞队列 workQueue 和线程池 maximumPoolSize, 这样的缓存策略来处理任务, 实际上这样的设计思想在需要框架中都会使用.
需要注意线程和任务之间的区别, 任务是保存在 workQueue 中的, 线程是从线程池里面取的, 由 CAPACITY 控制容量.
addWorker 方法分析
addWorker 方法的主要工作是在线程池中创建一个新的线程并执行, firstTask 参数用于指定新增的线程执行的第一个任务, core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize, 代码如下:
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- // 获取运行状态
- int rs = runStateOf(c);
- /*
- * 这个 if 判断
- * 如果 rs>= SHUTDOWN, 则表示此时不再接收新任务;
- * 接着判断以下 3 个条件, 只要有 1 个不满足, 则返回 false:
- * 1. rs == SHUTDOWN, 这时表示关闭状态, 不再接受新提交的任务, 但却可以继续处理阻塞队列中已保存的任务
- * 2. firsTask 为空
- * 3. 阻塞队列不为空
- *
- * 首先考虑 rs == SHUTDOWN 的情况
- * 这种情况下不会接受新提交的任务, 所以在 firstTask 不为空的时候会返回 false;
- * 然后, 如果 firstTask 为空, 并且 workQueue 也为空, 则返回 false,
- * 因为队列中已经没有任务了, 不需要再添加线程了
- */
- // Check if queue empty only if necessary.
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- // 获取线程数
- int wc = workerCountOf(c);
- // 如果 wc 超过 CAPACITY, 也就是 ctl 的低 29 位的最大值(二进制是 29 个 1), 返回 false;
- // 这里的 core 是 addWorker 方法的第二个参数, 如果为 true 表示根据 corePoolSize 来比较,
- // 如果为 false 则根据 maximumPoolSize 来比较.
- //
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 尝试增加 workerCount, 如果成功, 则跳出第一个 for 循环
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 如果增加 workerCount 失败, 则重新获取 ctl 的值
- c = ctl.get(); // Re-read ctl
- // 如果当前的运行状态不等于 rs, 说明状态已被改变, 返回第一个 for 循环继续执行
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 根据 firstTask 来创建 Worker 对象
- w = new Worker(firstTask);
- // 每一个 Worker 对象都会创建一个线程
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
- // rs <SHUTDOWN 表示是 RUNNING 状态;
- // 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null, 向线程池中添加线程.
- // 因为在 SHUTDOWN 时不会在添加新的任务, 但还是会执行 workQueue 中的任务
- if (rs < SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- // workers 是一个 HashSet
- workers.add(w);
- int s = workers.size();
- // largestPoolSize 记录着线程池中出现过的最大线程数量
- if (s> largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // 启动线程
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
这里需要注意有以下几点:
在获取锁后重新检查线程池的状态, 这是因为其他线程可可能在本方法获取锁前改变了线程池的状态, 比如调用了 shutdown 方法. 添加成功则启动任务执行.
t.start()会调用 Worker 类中的 run 方法, Worker 本身实现了 Runnable 接口. 原因在创建线程得时候, 将 Worker 实例传入了 t 当中, 可参见 Worker 类的构造函数.
wc>= CAPACITY || wc>= (core ? corePoolSize : maximumPoolSize))每次调用 addWorker 来添加线程会先判断当前线程数是否超过了 CAPACITY, 然后再去判断是否超 corePoolSize 或 maximumPoolSize, 说明线程数实际上是由 CAPACITY 来控制的.
内部类 Worker 分析
上面分析过程中, 提到了一个 Worker 类, 对于某些对源码不是很熟悉得同学可能有点不清楚, 下面就来看看 Worker 的源码:
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- /**
- * This class will never be serialized, but we provide a
- * serialVersionUID to suppress a javac warning.
- */
- private static final long serialVersionUID = 6138294804551838833L;
- /** Thread this worker is running in. Null if factory fails. */
- final Thread thread;
- /** Initial task to run. Possibly null. */
- Runnable firstTask;
- /** Per-thread task counter */
- volatile long completedTasks;
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- // 注意此处传入的是 this
- this.thread = getThreadFactory().newThread(this);
- }
- /** Delegates main run loop to outer runWorker. */
- // 这里其实会调用外部的 runWorker 方法来执行自己.
- public void run() {
- runWorker(this);
- }
- // Lock methods
- //
- // The value 0 represents the unlocked state.
- // The value 1 represents the locked state.
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- protected boolean tryAcquire(int unused) {
- // 如果已经设置过 1 了, 这时候在设置 1 就会返回 false, 也就是不可重入
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
- // 提供安全中断线程得方法
- void interruptIfStarted() {
- Thread t;
- // 一开始 setstate(-1) 避免了还没开始运行就被中断可能
- if (getState()>= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
首先看到的是 Worker 继承了(AbstractQueuedSynchronizer) AQS, 并实现了 Runnable 接口, 说明 Worker 本身也是线程. 然后看其构造函数可以发现, 内部有两个属性变量分别是 Runnable 和 Thread 实例, 该类其实就是对传进来得属性做了一个封装, 并加入了获取锁的逻辑(继承了 AQS ). 具体可参考文章: 透过 ReentrantLock 分析 AQS 的实现原理
Worker 继承了 AQS, 使用 AQS 来实现独占锁的功能. 为什么不使用 ReentrantLock 来实现呢? 可以看到 tryAcquire 方法, 它是不允许重入的, 而 ReentrantLock 是允许重入的:
lock 方法一旦获取了独占锁, 表示当前线程正在执行任务中;
如果正在执行任务, 则不应该中断线程;
如果该线程现在不是独占锁的状态, 也就是空闲的状态, 说明它没有在处理任务, 这时可以对该线程进行中断;
线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程, interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;
之所以设置为不可重入, 是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁. 如果使用 ReentrantLock, 它是可重入的, 这样如果在任务中调用了如 setCorePoolSize 这类线程池控制的方法, 会中断正在运行的线程, 因为 size 小了, 需要中断一些线程 .
所以, Worker 继承自 AQS, 用于判断线程是否空闲以及是否可以被中断.
此外, 在构造方法中执行了 setState(-1);, 把 state 变量设置为 -1, 为什么这么做呢? 是因为 AQS 中默认的 state 是 0, 如果刚创建了一个 Worker 对象, 还没有执行任务时, 这时就不应该被中断, 看一下 tryAquire 方法:
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
正因为如此, 在 runWorker 方法中会先调用 Worker 对象的 unlock 方法将 state 设置为 0.tryAcquire 方法是根据 state 是否是 0 来判断的, 所以, setState(-1); 将 state 设置为 -1 是为了禁止在执行任务前对线程进行中断.
runWorker 方法分析
前面提到了内部类 Worker 的 run 方法调用了外部类 runWorker, 下面来看下 runWork 的具体逻辑.
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // status 设置为 0, 允许中断, 也可以避免再次加锁失败
- boolean completedAbruptly = true;
- try {
- while (task != null || (task = getTask()) != null) {
- // 要派发 task 的时候, 需要上锁
- w.lock();
- // 如果线程池当前状态至少是 stop, 则设置中断标志;
- // 如果线程池当前状态是 RUNNININ, 则重置中断标志, 重置后需要重新
- // 检查下线程池状态, 因为当重置中断标志时候, 可能调用了线程池的 shutdown 方法
- // 改变了线程池状态.
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 任务执行前干一些事情
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();// 执行任务
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 任务执行完毕后干一些事情
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- // 统计当前 worker 完成了多少个任务
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- // 执行清了工作
- processWorkerExit(w, completedAbruptly);
- }
- }
总结一下 runWorker 方法的执行过程:
while 循环不断地通过 getTask() 方法从阻塞队列中取任务;
如果线程池正在停止, 那么要保证当前线程是中断状态, 否则要保证当前线程不是中断状态;
调用 task.run()执行任务;
如果 task 为 null 则跳出循环, 执行 processWorkerExit 方法;
runWorker 方法执行完毕, 也代表着 Worker 中的 run 方法执行完毕, 销毁线程.
这里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 类中是空的, 留给子类来实现.
completedAbruptly 变量来表示在执行任务过程中是否出现了异常, 在 processWorkerExit 方法中会对该变量的值进行判断.
getTask 方法分析
getTask 方法是从阻塞队列里面获取任务, 具体代码逻辑如下:
- private Runnable getTask() {
- // timeOut 变量的值表示上次从阻塞队列中取任务时是否超时
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- /*
- * 如果线程池状态 rs>= SHUTDOWN, 也就是非 RUNNING 状态, 再进行以下判断:
- * 1. rs>= STOP, 线程池是否正在 stop;
- * 2. 阻塞队列是否为空.
- * 如果以上条件满足, 则将 workerCount 减 1 并返回 null.
- * 因为如果当前线程池状态的值是 SHUTDOWN 或以上时, 不允许再向阻塞队列中添加任务.
- */
- if (rs>= SHUTDOWN && (rs>= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- // timed 变量用于判断是否需要进行超时控制.
- // allowCoreThreadTimeOut 默认是 false, 也就是核心线程不允许进行超时;
- // wc> corePoolSize, 表示当前线程池中的线程数量大于核心线程数量;
- // 对于超过核心线程数量的这些线程, 需要进行超时控制
- boolean timed = allowCoreThreadTimeOut || wc> corePoolSize;
- /*
- * wc> maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize 方法;
- * timed && timedOut 如果为 true, 表示当前操作需要进行超时控制, 并且上次从阻塞队列中获取任务发生了超时
- * 接下来判断, 如果有效线程数量大于 1, 或者阻塞队列是空的, 那么尝试将 workerCount 减 1;
- * 如果减 1 失败, 则返回重试.
- * 如果 wc == 1 时, 也就说明当前线程是线程池中唯一的一个线程了.
- */
- if ((wc> maximumPoolSize || (timed && timedOut))
- && (wc> 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- /*
- * 根据 timed 来判断, 如果为 true, 则通过阻塞队列的 poll 方法进行超时控制, 如果在 keepAliveTime 时间内没有获取到任务, 则返回 null;
- * 否则通过 take 方法, 如果这时队列为空, 则 take 方法会阻塞直到队列不为空.
- *
- */
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- // 如果 r == null, 说明已经超时, timedOut 设置为 true
- timedOut = true;
- } catch (InterruptedException retry) {
- // 如果获取任务时当前线程发生了中断, 则设置 timedOut 为 false 并返回循环重试
- timedOut = false;
- }
- }
- }
其实到这里后, 你会发现在 ThreadPoolExcute 内部有几个重要的检验:
判断当前的运行状态, 根据运行状态来做处理, 如果当前都停止运行了, 那很多操作也就没必要了;
判断当前线程池的数量, 然后将该数据和 corePoolSize 以及 maximumPoolSize 进行比较, 然后再去决定下一步该做啥;
首先是第一个 if 判断, 当运行状态处于非 RUNNING 状态, 此外 rs>= STOP(线程池是否正在 stop)或阻塞队列是否为空. 则将 workerCount 减 1 并返回 null. 为什么要减 1 呢, 因为此处其实是去获取一个 task, 但是发现处于停止状态了, 也就是没必要再去获取运行任务了, 那这个线程就没有存在的意义了. 后续也会在 processWorkerExit 将该线程移除.
第二个 if 条件目的是控制线程池的有效线程数量. 由上文中的分析可以知道, 在执行 execute 方法时, 如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize, 并且 workQueue 已满时, 则可以增加工作线程, 但这时如果超时没有获取到任务, 也就是 timedOut 为 true 的情况, 说明 workQueue 已经为空了, 也就说明了当前线程池中不需要那么多线程来执行任务了, 可以把多于 corePoolSize 数量的线程销毁掉, 保持线程数量在 corePoolSize 即可.
什么时候会销毁? 当然是 runWorker 方法执行完之后, 也就是 Worker 中的 run 方法执行完, 由 JVM 自动回收.
getTask 方法返回 null 时, 在 runWorker 方法中会跳出 while 循环, 然后会执行 processWorkerExit 方法.
processWorkerExit 方法
下面在看 processWorkerExit 方法的具体逻辑:
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 如果 completedAbruptly 值为 true, 则说明线程执行时出现了异常, 需要将 workerCount 减 1;
- // 如果线程执行时没有出现异常, 说明在 getTask()方法中已经已经对 workerCount 进行了减 1 操作, 这里就不必再减了.
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 统计完成的任务数
- completedTaskCount += w.completedTasks;
- // 从 workers 中移除, 也就表示着从线程池中移除了一个工作线程
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 根据线程池状态进行判断是否结束线程池
- tryTerminate();
- int c = ctl.get();
- /*
- * 当线程池是 RUNNING 或 SHUTDOWN 状态时, 如果 worker 是异常结束, 那么会直接 addWorker;
- * 如果 allowCoreThreadTimeOut=true, 并且等待队列有任务, 至少保留一个 worker;
- * 如果 allowCoreThreadTimeOut=false,workerCount 不少于 corePoolSize.
- */
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c)>= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
至此, processWorkerExit 执行完之后, 工作线程被销毁, 以上就是整个工作线程的生命周期. 但是这有两点需要注意:
大家想想什么时候才会调用这个方法, 任务干完了才会调用. 那么没事做了, 就需要看下是否有必要结束线程池, 这时候就会调用 tryTerminate.
如果此时线程处于 STOP 状态以下, 那么就会判断核心线程数是否达到了规定的数量, 没有的话, 就会继续创建一个线程.
tryTerminate 方法
tryTerminate 方法根据线程池状态进行判断是否结束线程池, 代码如下:
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- /*
- * 当前线程池的状态为以下几种情况时, 直接返回:
- * 1. RUNNING, 因为还在运行中, 不能停止;
- * 2. TIDYING 或 TERMINATED, 因为线程池中已经没有正在运行的线程了;
- * 3. SHUTDOWN 并且等待队列非空, 这时要执行完 workQueue 中的 task;
- */
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 如果线程数量不为 0, 则中断一个空闲的工作线程, 并返回
- if (workerCountOf(c) != 0) { // Eligible to terminate
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 这里尝试设置状态为 TIDYING, 如果设置成功, 则调用 terminated 方法
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- // terminated 方法默认什么都不做, 留给子类实现
- terminated();
- } finally {
- // 设置状态为 TERMINATED
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
interruptIdleWorkers(boolean onlyOne) 如果 ONLY_ONE = true 那么就的最多让一个空闲线程发生中断, ONLY_ONE = false 时是所有空闲线程都会发生中断. 那线程什么时候会处于空闲状态呢?
一是线程数量很多, 任务都完成了; 二是线程在 getTask 方法中执行 workQueue.take() 时, 如果不执行中断会一直阻塞.
所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程, 避免在队列为空时取任务一直阻塞的情况.
shutdown 方法
shutdown 方法要将线程池切换到 SHUTDOWN 状态, 并调用 interruptIdleWorkers 方法请求中断所有空闲的 worker, 最后调用 tryTerminate 尝试结束线程池.
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 安全策略判断
- checkShutdownAccess();
- // 切换状态为 SHUTDOWN
- advanceRunState(SHUTDOWN);
- // 中断空闲线程
- interruptIdleWorkers();
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- // 尝试结束线程池
- tryTerminate();
- }
这里思考一个问题: 在 runWorker 方法中, 执行任务时对 Worker 对象 w 进行了 lock 操作, 为什么要在执行任务的时候对每个工作线程都加锁呢?
下面仔细分析一下:
在 getTask 方法中, 如果这时线程池的状态是 SHUTDOWN 并且 workQueue 为空, 那么就应该返回 null 来结束这个工作线程, 而使线程池进入 SHUTDOWN 状态需要调用 shutdown 方法;
shutdown 方法会调用 interruptIdleWorkers 来中断空闲的线程, interruptIdleWorkers 持有 mainLock, 会遍历 workers 来逐个判断工作线程是否空闲. 但 getTask 方法中没有 mainLock;
在 getTask 中, 如果判断当前线程池状态是 RUNNING, 并且阻塞队列为空, 那么会调用 workQueue.take() 进行阻塞;
如果在判断当前线程池状态是 RUNNING 后, 这时调用了 shutdown 方法把状态改为了 SHUTDOWN, 这时如果不进行中断, 那么当前的工作线程在调用了 workQueue.take() 后会一直阻塞而不会被销毁, 因为在 SHUTDOWN 状态下不允许再有新的任务添加到 workQueue 中, 这样一来线程池永远都关闭不了了;
由上可知, shutdown 方法与 getTask 方法 (从队列中获取任务时) 存在竞态条件;
解决这一问题就需要用到线程的中断, 也就是为什么要用 interruptIdleWorkers 方法. 在调用 workQueue.take() 时, 如果发现当前线程在执行之前或者执行期间是中断状态, 则会抛出 InterruptedException, 解除阻塞的状态;
但是要中断工作线程, 还要判断工作线程是否是空闲的, 如果工作线程正在处理任务, 就不应该发生中断;
所以 Worker 继承自 AQS, 在工作线程处理任务时会进行 lock,interruptIdleWorkers 在进行中断时会使用 tryLock 来判断该工作线程是否正在处理任务, 如果 tryLock 返回 true, 说明该工作线程当前未执行任务, 这时才可以被中断.
下面就来分析一下 interruptIdleWorkers 方法.
interruptIdleWorkers 方法
- private void interruptIdleWorkers() {
- interruptIdleWorkers(false);
- }
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
interruptIdleWorkers 遍历 workers 中所有的工作线程, 若线程没有被中断 tryLock 成功, 就中断该线程.
为什么需要持有 mainLock ? 因为 workers 是 HashSet 类型的, 不能保证线程安全.
shutdownNow 方法
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- advanceRunState(STOP);
- // 中断所有工作线程, 无论是否空闲
- interruptWorkers();
- // 取出队列中没有被执行的任务
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
shutdownNow 方法与 shutdown 方法类似, 不同的地方在于:
设置状态为 STOP;
中断所有工作线程, 无论是否是空闲的;
取出阻塞队列中没有被执行的任务并返回.
shutdownNow 方法执行完之后调用 tryTerminate 方法, 该方法在上文已经分析过了, 目的就是使线程池的状态设置为 TERMINATED.
线程池的监控
通过线程池提供的参数进行监控. 线程池里有一些属性在监控线程池的时候可以使用
getTaskCount: 线程池已经执行的和未执行的任务总数;
getCompletedTaskCount: 线程池已完成的任务数量, 该值小于等于 taskCount;
getLargestPoolSize: 线程池曾经创建过的最大线程数量. 通过这个数据可以知道线程池是否满过, 也就是达到了 maximumPoolSize;
getPoolSize: 线程池当前的线程数量;
getActiveCount: 当前线程池中正在执行任务的线程数量.
通过这些方法, 可以对线程池进行监控, 在 ThreadPoolExecutor 类中提供了几个空方法, 如 beforeExecute 方法, afterExecute 方法和 terminated 方法, 可以扩展这些方法在执行前或执行后增加一些新的操作, 例如统计线程池的执行任务的时间等, 可以继承自 ThreadPoolExecutor 来进行扩展.
到此, 关于 ThreadPoolExecutor 的内容就讲完了.
参考文献
Java 中线程池 ThreadPoolExecutor 原理探究 https://www.jianshu.com/p/3cc67876375f
[Java] 之 ThreadPoolExcutor 源码浅析
线程池 ThreadPoolExecutor 实现原理 https://www.jianshu.com/p/125ccf0046f3
深入理解 Java 线程池: ThreadPoolExecutor
来源: https://www.cnblogs.com/huansky/p/12467720.html