ThreadPoolExecutor 是 JDK 内置的线程池实现类, 最初随 JDK1.5 发布. 最近花了点时间看了下 ThreadPoolExecutor 的源码, JDK 版本是 JDK1.8.0_71.
整体结构
- @startuml
- class LinkedBlockingQueue
- class SynchronousQueue
- class Thread
- class Worker
- interface BlockingQueue
- interface ThreadFactory
- class ThreadPoolExecutor
- abstract class AbstractExecutorService
- interface ExecutorService
- interface Executor
- Executor <|-- ExecutorService
- ExecutorService <|.. AbstractExecutorService
- AbstractExecutorService <|-- ThreadPoolExecutor
- ThreadPoolExecutor o-- ThreadFactory
- ThreadPoolExecutor o-- BlockingQueue
- ThreadPoolExecutor o-- Worker
- BlockingQueue <|-- SynchronousQueue
- BlockingQueue <|-- LinkedBlockingQueue
- Worker o-- Thread
- ThreadFactory <|-- Executors.DefaultThreadFactory
- @enduml
上面这张图展示了 ThreadPoolExecutor 的一个大致的类结构.
ThreadPoolExecutor 当然依赖很多类, 但最主要的还是三个大类 ThreadFactory,BlockingQueue,Thread:
ThreadFactory. 主要定义了创建线程的方式. 这一块, 用户可以自己定义线程的创建行为, 例如调整线程的栈大小等. 这应该是设计模式里的模版模式.
BlockingQueue. 用来存放用户提交的, 尚未开始执行的 Runnable 任务. 当然 BlockingQueue 也未必真的有存储容量, 例如 SynchronousQueue.
Thread. 线程池里当然有线程, 这是句废话. 不过 ThreadPoolExecutor 把 Thread 封装在一个私有类 Worker 里面. ThreadPoolExecutor 和 Worker 既保持了良好的封装性, 又是紧耦合. 这一点不确定好还是不好, 应该再体会体会.
创建方式
JDK 在 Executors 类里面提供了若干工厂方法, 可以方便地创建线程池. 其中和 ThreadPoolExecutor 关联的工厂方法有:
newSingleThreadExecutor: 创建单线程的线程池. 主要用来实现异步调用.
newFixedThreadPool: 创建固定线程数量的线程池. 主要用于处理计算密集型任务.
newCachedThreadPool: 这个线程池, 能够按需创建线程. 闲置的线程可以复用, 避免创建开销. 闲置超过 60s 的线程会被回收. 主要用于处理能够快速返回的 IO 密集型任务.
ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的子类, 可以实现周期性的执行任务, 这里暂不涉及.
这些工厂方法虽然方便, 但是在业界被吐槽.阿里 Java 编码规范指出这些工厂方法的没什么可变参数, 没什么调优的余地, 而且自身的参数也不直观. 试想一下, 如果 CachedThreadPool 遇到密集的 IO 请求任务时, 线程不断创建, 最后会不会内存溢出呢? 对于 FixedThreadPool, 如果任务量并不大, 时刻保持固定数量的线程是不是有点浪费资源呢? 所以说, 创建线程池的最好的方法还是直接用 ThreadPoolExecutor 的构造器方法.
ThreadPoolExecutor 的构造器有这些参数:
corePoolSize: 线程池核心线程数量.
maximumPoolSize: 线程池最大线程数量.
keepAliveTime: 线程的闲置时间.
unit: 线程限制时间单位.
workQueue: 线程工作队列.
threadFactory: 线程工厂.
RejectedExecutionHandler:RejectedExecution 的捕获器.
运行时行为
ThreadPoolExecutor 把所管理的线程分为两类. 一类是核心线程, 一类是非核心线程.
当一个 ThreadPoolExecutor 刚刚创建出来的时候, 里面没有任何正在运行的线程. 当开始提交任务的时候, 线程池会首先创建核心线程. 即是当前有闲置线程, 如果线程总数低于 corePoolSize, 那么依然会创建新的线程来运行任务. 这个阶段可以视为线程池处于 "热身" 阶段. ThreadPoolExecutor 也提供了 prestartCoreThread,ensurePrestart,prestartAllCoreThreads 三个方法来手动实现 "无任务热身".
当线程总数达到 corePoolSize 同时所有线程都在运行时, 继续提交的任务, 就会交给 workQueue 来处理. 其实调用的就是 workQueue 的 offer 方法. 通常是将任务留在队列里面, 开始排队.
随着任务持续提交, 当队列中任务达到最大值的时候, 线程池开始创建新的线程, 来处理队列中的任务. 随着线程不增加, 当线程总数达到 maximumPoolSize 时, 继续提交的任务会被线程池拒绝, 抛出 RejectedExecutionException 运行时异常. 默认情况下, 异常交由任务提交方来处理. 但是, 用户也可以自定义 RejectedExecutionHandler 来统一处理该异常.
当任务处理完毕后, 线程就会闲置起来. 一直闲置着当然不是个事儿. 线程会闲置 keepAliveTime 时间, 随后销毁回收.
默认情况下, 核心线程不会因为闲置超时而被回收. 但这可以通过 allowCoreThreadTimeOut 方法来修改设置.
总的来讲, ThreadPoolExecutor 会积极创建核心线程, 对于非核心线程则是消极创建.
保持足够数量的核心线程是出于及时响应任务提交的需要. 这个很好理解.
但是, 消极创建非核心线程就不好理解了. 我一直觉得, 池内线程的数量应该与队列里的任务数量保持一种台阶式的准线性关系比较好. 随着队列任务的增加, 逐渐增加线程数量, 这样系统能够保持在一个稳定的状态, 比来回变化的线程数量和任务数量, 应当更有利于 GC 的表现. 当然了, 这一块一直没有真正的去测试一下, 就留作以后的一个课题吧.
源码阅读心得
这里说下我觉得值得注意的几点.
线程池状态管理
ThreadPoolExecutor 对象有五个运行状态, 使用 int 来表示:
状态 | 二进制数值 | 意义 |
---|---|---|
RUNNING | 11100000000000000000000000000000 | 正常运行 |
SHUTDOWN | 00000000000000000000000000000000 | 不再接受新任务,但仍然在处理队列中的任务 |
STOP | 00100000000000000000000000000000 | 不接受新任务,不处理队列任务,终止正在处理的任务 |
TIDYING | 01000000000000000000000000000000 | 所有任务终止,线程数量归零 |
TERMINATED | 01100000000000000000000000000000 | terminated() 方法执行完毕 |
RUNNING 状态可以到达 SHUTDOWN 状态, 也可以直接到达 STOP 状态.
当然 SHUTDOWN 状态, 也可以到达 STOP 状态.
SHUTDOWN 状态和 STOP 状态都可以到达 TIDYING 状态.
TIDYING 状态只能到达 TERMINATED 状态.
Java 中整形变量最大是 32 个 bit.ThreadPoolExecutor 使用了最高的 3 个 bit 来表示当前线程池的状态, 低位的 29 个 bit 用来表示当前池内的线程数. 所以说, ThreadPoolExecutor 支持的最大线程数为 2^29=536_870_911.5 亿多, 想来应该是够用了.
ThreadPoolExecutor 使用一个 AtomicInteger 变量来维护线程池状态. 使用原子类型可以保证该变量能够高效的并发修改 (相比于锁对象) 并及时发布(这个有点类似于 volatile 的作用).
ThreadPoolExecutor 通过位的求或, 求与, 求反来实现对线程池状态和线程数量的查询. 位运算, 相比于算术运算, 有更好的执行效率. 这个可以好好研究下.
无阻塞算法的使用
ThreadPoolExecutor 对象调用私有的 addWorker 方法来创建线程.
这个方法的内部最先是一个两层嵌套的 for 循环(labeled for-loop).
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- ...
- }
外部循环带有一个 retry 标签, 而内部循环可以是正常退出, 也可以是在外层循环框架下执行 break,continue 动作. 这个语法很少用到, 但在某些场景下会很有用, 值得记一笔.
其实观察一下内层循环里, 要想能够打破循环, 继续执行下面的内容, 必须成功的完成函数 compareAndIncrementWorkerCount 调用. 这个方法是对线程池状态变量的原子化替换. 多个外部线程同时调用该函数的时候, 有且只会有一个线程成功执行, 然后继续增加内部线程. 成功完成原子量变更的前提是, 从读取初始值, 完成新值计算到准备变更之间, 该原子量不能有变化, 否则就会重现执行外层循环.
这实际上是一种乐观的无锁算法.
乐观的意思是, 函数的执行倾向于认为变量的并发修改频率不大, 自己总能足够快的完成函数执行.
无锁, 就很好理解了. 各个线程不需要阻塞在锁对象上. 如果一次执行失败, 那就再来一次好了. 少量 for 循环的开销比若干线程的阻塞和唤醒所带来的开销划算多了.
其实注意观察一下, 这个方法和 ReentrantLock 类的 tryLock 方法都是乐观算法. 两者共同的套路是, 无限循环中包含一个试错条件, 也就是使用一个原子变量的 CAS(Coompare & Swap)操作作为成功与否的标记. 如果成功, 就会退出循环; 如果失败, 就会再循环一次.
在并发量不够高的场景下, 无阻塞算法可以显著降低同步开销, 与使用锁的算法相比, 有更高的吞吐量. 但是, 如果并发量极高的时候, 有锁反而更好. 因为在后者场景下, 无阻塞算法会有很多轮无效 for 循环, 反而不如有序地获取锁来的高效. 其实, 这个可以用道路交通来类比: 在车流量不大的路口, 环岛的通过率更高; 但当车流量很大的时候, 红绿灯的效率更高.
线程池对象的全局锁对象
ThreadPoolExecutor 对象内部有一把全局锁. 这把锁会在, 调用内部线程增加或者回收的时候调用, 也会在读取线程池状态的时候调用. 因此要注意这之间内部存在的阻塞关系.
此外, 设想一种场景. 所有线程全都在运行, 队列满负荷运行, 外部还有任务持续提交. 如果此时线程运行中大量的抛异常退出, 那么线程池会不停的创建线程, 处理线程退出, 这个行为就会被全局锁阻塞, 导致线程池的低效运转.
要小心这种情况的发生.
队列任务的获取
ThreadPoolExecutor 对象通过 BlockingQueue 获取任务. 正如其名, BlockingQueue 是一种阻塞式的队列. ArrayBlockingQueue,LinkedBlockQueue,PriorityBlockingQueue 使用显式的锁对象, 保护每一次队列任务的读取和放置; 而 SynchronousQueue 则是将访问线程排队 park 起来, 然后依次唤醒 (unpark) 提交或者获取任务.
频繁的读取 BlockingQueue 里的任务, 会造成池内线程消耗大量时间在队列内部的同步机制上. 因此, 为了降低同步开销的比例, 队列内部的任务不易快速返回. 而出于吞吐量的考虑, 队列内部的任务又不易消耗过长时间. 这两者需要小心的平衡.
JDK1.7 引入了新的 LinkedTransferQueue 据说有更好的并发效率. 这个随后得花时间看看.
不要中途修改基础参数
尽管 ThreadPoolExecutor 对象可以在对象运行过程中, 实时地修改 corePoolSize,keepAliveTime 等参数. 但是修改这些参数, 会在内部 interupt 所有线程, 包括正在运行的线程. 通常情况下, 一般的业务逻辑不大会特别的处理 InterruptedException. 所以应该尽量避免在运行时修改线程池设置参数.
来源: https://www.cnblogs.com/rim99/p/9142265.html