现在 CPU 都是有多个核心, 并行已经成为事实, 一方面我们希望最大限度利用机器性能(利用多线程提高吞吐率), 另一方面机器的硬件资源是有限的, 我们也不能无限制的去申请, 幸运的是, JDK 已经为我们提供了 ExecutorService 的实现, 还提供了 Executors 工厂类方便我们生成模板线程池, 但是简单背后一定是复杂, 这篇文章就是深入线程池的源码去一探究竟.
开始之前, 需要明确几个概念, 方便后面理解线程池的运行原理. 文末有免费福利哦
核心线程(corePool): 线程池最终执行任务的角色肯定还是线程, 同时我们也会限制线程的数量, 所以我们可以这样理解核心线程, 有新任务提交时, 首先检查核心线程数, 如果核心线程都在工作, 而且数量也已经达到最大核心线程数, 则不会继续新建核心线程, 而会将任务放入等待队列.
等待队列 (workQueue): 等待队列用于存储当核心线程都在忙时, 继续新增的任务, 核心线程在执行完当前任务后, 也会去等待队列拉取任务继续执行, 这个队列一般是一个线程安全的阻塞队列, 它的容量也可以由开发者根据业务来定制.
非核心线程: 当等待队列满了, 如果当前线程数没有超过最大线程数, 则会新建线程执行任务, 那么核心线程和非核心线程到底有什么区别呢? 说出来你可能不信, 本质上它们没有什么区别, 创建出来的线程也根本没有标识去区分它们是核心还是非核心的, 线程池只会去判断已有的线程数 (包括核心和非核心) 去跟核心线程数和最大线程数比较, 来决定下一步的策略.
线程活动保持时间 (keepAliveTime): 线程空闲下来之后, 保持存活的持续时间, 超过这个时间还没有任务执行, 该工作线程结束.
饱和策略 (RejectedExecutionHandler): 当等待队列已满, 线程数也达到最大线程数时, 线程池会根据饱和策略来执行后续操作, 默认的策略是抛弃要加入的任务.
一图剩千言, 上一张图概括线程池的基本运作流程.
按我的习惯, 先提出几个问题, 然后带着问题去寻找答案.
线程池的线程是如何做到复用的.
线程池是如何做到高效并发的.
从线程池的设计中, 我们能学到什么?
ThreadPoolExecutor
JDK 中线程池的核心实现类是 ThreadPoolExecutor, 先看这个类的第一个成员变量 ctl,AtomicInteger 这个类可以通过 CAS 达到无锁并发, 效率比较高, 这个变量有双重身份, 它的高三位表示线程池的状态, 低 29 位表示线程池中现有的线程数, 这也是 Doug Lea 一个天才的设计, 用最少的变量来减少锁竞争, 提高并发效率.
- //CAS, 无锁并发
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 表示线程池线程数的 bit 数
- private static final int COUNT_BITS = Integer.SIZE - 3;
- // 最大的线程数量, 数量是完全够用了
- private static final int CAPACITY = (1 <<COUNT_BITS) - 1;
- // runState is stored in the high-order bits
- //1110 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
- private static final int RUNNING = -1 << COUNT_BITS;
- //0000 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- //0010 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
- private static final int STOP = 1 << COUNT_BITS;
- //0100 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
- private static final int TIDYING = 2 << COUNT_BITS;
- //0110 0000 0000 0000 0000 0000 0000 0000(很耿直的我)
- private static final int TERMINATED = 3 << COUNT_BITS;
- // Packing and unpacking ctl
- // 获取线程池的状态
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- // 获取线程的数量
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 组装状态和数量, 成为 ctl
- private static int ctlOf(int rs, int wc) { return rs | wc; }
- /*
- * Bit field accessors that don't require unpacking ctl.
- * These depend on the bit layout and on workerCount being never negative.
- * 判断状态 c 是否比 s 小, 下面会给出状态流转图
- */
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
- // 判断状态 c 是否不小于状态 s
- private static boolean runStateAtLeast(int c, int s) {
- return c>= s;
- }
- // 判断线程是否在运行
- private static boolean isRunning(int c) {
- return c <SHUTDOWN;
- }
关于线程池的状态, 有 5 种,
RUNNING, 运行状态, 值也是最小的, 刚创建的线程池就是此状态.
SHUTDOWN, 停工状态, 不再接收新任务, 已经接收的会继续执行
STOP, 停止状态, 不再接收新任务, 已经接收正在执行的, 也会中断
清空状态, 所有任务都停止了, 工作的线程也全部结束了
TERMINATED, 终止状态, 线程池已销毁
它们的流转关系如下:
execute/submit
向线程池提交任务有这 2 种方式, execute 是 ExecutorService 接口定义的, submit 有三种方法重载都在 AbstractExecutorService 中定义, 都是将要执行的任务包装为 FutureTask 来提交, 使用者可以通过 FutureTask 来拿到任务的执行状态和执行最终的结果, 最终调用的都是 execute 方法, 其实对于线程池来说, 它并不关心你是哪种方式提交的, 因为任务的状态是由 FutureTask 自己维护的, 对线程池透明.
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task, result);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
重点看 execute 的实现
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- // 第一步, 获取 ctl
- int c = ctl.get();
- // 检查当前线程数是否达到核心线程数的限制, 注意线程本身是不区分核心还是非核心, 后面会进一步验证
- if (workerCountOf(c) <corePoolSize) {
- // 如果核心线程数未达到, 会直接添加一个核心线程, 也就是说在线程池刚启动预热阶段,
- // 提交任务后, 会优先启动核心线程处理
- if (addWorker(command, true))
- return;
- // 如果添加任务失败, 刷新 ctl, 进入下一步
- c = ctl.get();
- }
- // 检查线程池是否是运行状态, 然后将任务添加到等待队列, 注意 offer 是不会阻塞的
- if (isRunning(c) && workQueue.offer(command)) {
- // 任务成功添加到等待队列, 再次刷新 ctl
- int recheck = ctl.get();
- // 如果线程池不是运行状态, 则将刚添加的任务从队列移除并执行拒绝策略
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 判断当前线程数量, 如果线程数量为 0, 则添加一个非核心线程, 并且不指定首次执行任务
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 添加非核心线程, 指定首次执行任务, 如果添加失败, 执行异常策略
- else if (!addWorker(command, false))
- reject(command);
- }
- /*
- * addWorker 方法申明
- * @param core if true use corePoolSize as bound, else
- * maximumPoolSize. (A boolean indicator is used here rather than a
- * value to ensure reads of fresh values after checking other pool
- * state).
- * @return true if successful
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- //.....
- }
这里有 2 个细节, 可以深挖一下. ~ 文末有免费福利哦~
可以看到 execute 方法中没有用到重量级锁, ctl 虽然可以保证本身变化的原子性, 但是不能保证方法内部的代码块的原子性, 是否会有并发问题?
上面提到过, addWorker 方法可以添加工作线程(核心或者非核心), 线程本身没有核心或者非核心的标识, core 参数只是用来确定 当前线程数的比较对象是线程池设置的核心线程数还是最大线程数, 真实情况是不是这样?
addWorker
添加线程的核心方法, 直接看源码
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 相当于 goto, 虽然不建议滥用, 但这里使用又觉得没一点问题
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 如果线程池的状态到了 SHUTDOWN 或者之上的状态时候, 只有一种情况还需要继续添加线程,
- // 那就是线程池已经 SHUTDOWN, 但是队列中还有任务在排队, 而且不接受新任务(所以 firstTask 必须为 null)
- // 这里还继续添加线程的初衷是, 加快执行等待队列中的任务, 尽快让线程池关闭
- // Check if queue empty only if necessary.
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- // 传入的 core 的参数, 唯一用到的地方, 如果线程数超过理论最大容量, 如果 core 是 true 跟最大核心线程数比较, 否则跟最大线程数比较
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- // 通过 CAS 自旋, 增加线程数 + 1, 增加成功跳出双层循环, 继续往下执行
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 检测当前线程状态如果发生了变化, 则继续回到 retry, 重新开始循环
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // 走到这里, 说明我们已经成功的将线程数 + 1 了, 但是真正的线程还没有被添加
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 添加线程, Worker 是继承了 AQS, 实现了 Runnable 接口的包装类
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- // 到这里开始加锁
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
- // 检查线程状态, 还是跟之前一样, 只有当线程池处于 RUNNING, 或者处于 SHUTDOWN 并且 firstTask==null 的时候, 这时候创建 Worker 来加速处理队列中的任务
- if (rs <SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- // 线程只能被 start 一次
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- //workers 是一个 HashSet, 添加我们新增的 Worker
- workers.add(w);
- int s = workers.size();
- if (s> largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // 启动 Worker
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
分析完 addWorker 的源码实现, 我们可以回答上面留下的二个疑问,
execute 方法虽然没有加锁, 但是在 addWorker 方法内部, 加锁了, 这样可以保证不会创建超过我们预期的线程数, 大师在设计的时候, 做到了在最小的范围内加锁, 尽量减少锁竞争,
可以看到, core 参数, 只是用来判断当前线程数是否超量的时候跟 corePoolSize 还是 maxPoolSize 比较, Worker 本身无核心或者非核心的概念. #### 继续看 Worker 是怎么工作的
- //Worker 的 run 方法调用的是 ThreadPoolExecutor 的 runWorker 方法
- public void run() {
- runWorker(this);
- }
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- // 取出需要执行的任务,
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 如果 task 不是 null, 或者去队列中取任务, 注意这里会阻塞, 后面会分析 getTask 方法
- while (task != null || (task = getTask()) != null) {
- // 这个 lock 在这里是为了如果线程被中断, 那么会抛出 InterruptedException, 而退出循环, 结束线程
- w.lock();
- // 判断线程是否需要中断
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 任务开始执行前的 hook 方法
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- //// 任务开始执行后的 hook 方法
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- //Worker 退出
- processWorkerExit(w, completedAbruptly);
- }
- }
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- // 检查线程池的状态, 如果已经是 STOP 及以上的状态, 或者已经 SHUTDOWN, 队列也是空的时候, 直接 return null, 并将 Worker 数量 - 1
- if (rs>= SHUTDOWN && (rs>= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // 注意这里的 allowCoreThreadTimeOut 参数, 字面意思是否允许核心线程超时, 即如果我们设置为 false, 那么只有当线程数 wc 大于 corePoolSize 的时候才会超时
- // 更直接的意思就是, 如果设置 allowCoreThreadTimeOut 为 false, 那么线程池在达到 corePoolSize 个工作线程之前, 不会让闲置的工作线程退出
- boolean timed = allowCoreThreadTimeOut || wc> corePoolSize;
- // 确认超时, 将 Worker 数 - 1, 然后返回
- if ((wc> maximumPoolSize || (timed && timedOut))
- && (wc> 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 从队列中取任务, 根据 timed 选择是有时间期限的等待还是无时间期限的等待
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
现在我们可以回答文章一开始提出的三个问题中的前 2 个了
线程池的线程是如何做到复用的. 线程池中的线程在循环中尝试取任务执行, 这一步会被阻塞, 如果设置了 allowCoreThreadTimeOut 为 true, 则线程池中的所有线程都会在 keepAliveTime 时间超时后还未取到任务而退出. 或者线程池已经 STOP, 那么所有线程都会被中断, 然后退出.
线程池是如何做到高效并发的. 看整个线程池的工作流程, 有以下几个需要特别关注的并发点. 1: 线程池状态和工作线程数量的变更. 这个由一个 AtomicInteger 变量 ctl 来解决原子性问题. 2: 向工作 Worker 容器 workers 中添加新的 Worker 的时候. 这个线程池本身已经加锁了. 3: 工作线程 Worker 从等待队列中取任务的时候. 这个由工作队列本身来保证线程安全, 比如 LinkedBlockingQueue 等.
怎么用好 Executors
JDK 已经给我们提供了很方便的线程池工厂类 Executors, 方便我们快速创建线程池, 可能在阅读源码之前, 我们在面对具体的业务场景时, 到底该选择哪种线程池配置是有疑问的, 我们来看一下.
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
newFixedThreadPool, 可以看到我们需要传入一个线程数量的参数 nThreads, 这样线程池的核心线程数和最大线程数都会设成 nThreads, 而它的等待队列是一个 LinkedBlockingQueue, 它的容量限制是 Integer.MAX_VALUE, 可以认为是没有边界的. 核心线程 keepAlive 时间 0,allowCoreThreadTimeOut 默认 false. 所以这个方法创建的线程池适合能估算出需要多少核心线程数量的场景.
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>()));
- }
newSingleThreadExecutor, 有且只有一个线程在工作, 适合任务顺序执行, 缺点但是不能充分利用 CPU 多核性能
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
newCachedThreadPool, 核心线程数 0, 最大线程数 Integer.MAX_VALUE, 线程 keepAlive 时间 60s, 用的队列是 SynchronousQueue, 这种队列本身不会存任务, 只做转发, 所以 newCachedThreadPool 适合执行大量的, 轻量级任务.
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
newScheduledThreadPool, 执行周期性任务, 类似定时器.
最后的问题: 从线程池的设计中, 我们能学到什么?
以我的个人体会, 大概有下面四点
清楚实现原理, 可以指导我们更好的使用.
在写并发程序的时候, 尽可能的缩小锁的范围, 提高代码的吞吐率.
goto, 不是一定不能用, 而不是滥用, 有些场景有奇效.
如果你需要多个线程安全的 int 型变量, 考虑利用位运算把它们合并为一个.
全文完, 水平有限, 有疑问, 欢迎交流!
最后给大家分享一份非常系统和全面的 Android 进阶技术大纲及进阶资料, 及面试题集
想学习更多 Android 知识, 请加入 Android 技术开发企鹅交流 7520 16839
进群与大牛们一起讨论, 还可获取 Android 高级架构资料, 源码, 笔记, 视频
包括 高级 UI,Gradle,RxJava, 小程序, Hybrid, 移动架构, React Native, 性能优化等全面的 Android 高级实践技术讲解性能优化架构思维导图, 和 BATJ 面试题及答案!
群里免费分享给有需要的朋友, 希望能够帮助一些在这个行业发展迷茫的, 或者想系统深入提升以及困于瓶颈的朋友, 在网上博客论坛等地方少花些时间找资料, 把有限的时间, 真正花在学习上, 所以我在这免费分享一些架构资料及给大家. 希望在这些资料中都有你需要的内容.
来源: http://www.jianshu.com/p/431b329cfbdd