[编者的话] 在 Java 中, 使用线程池来异步执行一些耗时任务是非常常见的操作. 最初我们一般都是直接使用 new Thread().start 的方式, 但我们知道, 线程的创建和销毁都会耗费大量的资源, 关于线程可以参考之前的一篇博客《 Java 线程那点事儿 https://github.com/aCoder2013/blog/issues/4 》, 因此我们需要重用线程资源.
当然也有其他待解决方案, 比如说 coroutine, 目前 Kotlin 已经支持了, JDK 也已经有了相关的提案: Project Loom http://openjdk.java.net/projects/loom/ , 目前的实现方式和 Kotlin 有点类似, 都是基于 ForkJoinPool, 当然目前还有很多限制以及问题没解决, 比如 synchronized 还是锁住当前线程等.
继承结构
继承结构看起来很清晰, 最顶层的 Executor 只提供了一个最简单的 void execute(Runnable command)方法, 然后是 ExecutorService,ExecutorService 提供了一些管理相关的方法, 例如关闭, 判断当前线程池的状态等, 另外不同于 Executor#execute,ExecutorService 提供了一系列方法, 可以将任务包装成一个 Future, 从而使得任务提交方可以跟踪任务的状态. 而父类 AbstractExecutorService 则提供了一些默认的实现.
构造器
ThreadPoolExecutor 的构造器提供了非常多的参数, 每一个参数都非常的重要, 一不小心就容易踩坑, 因此设置的时候, 你必须要知道自己在干什么.
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.acc = System.getSecurityManager() == null ?
- null :
- AccessController.getContext();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
corePoolSize, maximumPoolSize. 线程池会自动根据 corePoolSize 和 maximumPoolSize 去调整当前线程池的大小. 当你通过 submit 或者 execute 方法提交任务的时候, 如果当前线程池的线程数小于 corePoolSize, 那么线程池就会创建一个新的线程处理任务, 即使其他的 core 线程是空闲的. 如果当前线程数大于 corePoolSize 并且小于 maximumPoolSize, 那么只有在队列 "满" 的时候才会创建新的线程. 因此这里会有很多的坑, 比如你的 core 和 max 线程数设置的不一样, 希望请求积压在队列的时候能够实时的扩容, 但如果制定了一个无界队列, 那么就不会扩容了, 因为队列不存在满的概念.
keepAliveTime. 如果当前线程池中的线程数超过了 corePoolSize, 那么如果在 keepAliveTime 时间内都没有新的任务需要处理, 那么超过 corePoolSize 的这部分线程就会被销毁. 默认情况下是不会回收 core 线程的, 可以通过设置 allowCoreThreadTimeOut 改变这一行为.
workQueue. 即实际用于存储任务的队列, 这个可以说是最核心的一个参数了, 直接决定了线程池的行为, 比如说传入一个有界队列, 那么队列满的时候, 线程池就会根据 core 和 max 参数的设置情况决定是否需要扩容, 如果传入了一个 SynchronousQueue, 这个队列只有在另一个线程在同步 remove 的时候才可以 put 成功, 对应到线程池中, 简单来说就是如果有线程池任务处理完了, 调用 poll 或者 take 方法获取新的任务的时候, 新提交的任务才会 put 成功, 否则如果当前的线程都在忙着处理任务, 那么就会 put 失败, 也就会走扩容的逻辑, 如果传入了一个 DelayedWorkQueue, 顾名思义, 任务就会根据过期时间来决定什么时候弹出, 即为 ScheduledThreadPoolExecutor 的机制.
threadFactory. 创建线程都是通过 ThreadFactory 来实现的, 如果没指定的话, 默认会使用 Executors.defaultThreadFactory(), 一般来说, 我们会在这里对线程设置名称, 异常处理器等.
handler. 即当任务提交失败的时候, 会调用这个处理器, ThreadPoolExecutor 内置了多个实现, 比如抛异常, 直接抛弃等. 这里也需要根据业务场景进行设置, 比如说当队列积压的时候, 针对性的对线程池扩容或者发送告警等策略.
看完这几个参数的含义, 我们看一下 Executors 提供的一些工具方法, 只要是为了方便使用, 但是我建议最好少用这个类, 而是直接用 ThreadPoolExecutor 的构造函数, 多了解一下这几个参数到底是什么意思, 自己的业务场景是什么样的, 比如线程池需不需要扩容, 用不用回收空闲的线程等.
- public class Executors {
- /*
- * 提供一个固定大小的线程池, 并且线程不会回收, 由于传入的是一个无界队列, 相当于队列永远不会满
- * 也就不会扩容, 因此需要特别注意任务积压在队列中导致内存爆掉的问题
- */
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
- }
- /*
- * 这个线程池会一直扩容, 由于 SynchronousQueue 的特性, 如果当前所有的线程都在处理任务, 那么
- * 新的请求过来, 就会导致创建一个新的线程处理任务. 如果线程一分钟没有新任务处理, 就会被回
- * 收掉. 特别注意, 如果每一个任务都比较耗时, 并发又比较高, 那么可能每次任务过来都会创建一个线
- * 程
- */
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
- }
- }
源码分析
既然是个线程池, 那就必然有其生命周期: 运行中, 关闭, 停止等. ThreadPoolExecutor 是用一个 AtomicInteger 去的前三位表示这个状态的, 另外又重用了低 29 位用于表示线程数, 可以支持最大大概 5 亿多, 绝逼够用了, 如果以后硬件真的发展到能够启动这么多线程, 改成 AtomicLong 就可以了.
状态这里主要分为下面几种:
RUNNING: 表示当前线程池正在运行中, 可以接受新任务以及处理队列中的任务
SHUTDOWN: 不再接受新的任务, 但会继续处理队列中的任务
STOP: 不再接受新的任务, 也不处理队列中的任务了, 并且会中断正在进行中的任务
TIDYING: 所有任务都已经处理完毕, 线程数为 0, 转为为 TIDYING 状态之后, 会调用 terminated()回调
TERMINATED:terminated()已经执行完毕
同时我们可以看到所有的状态都是用二进制位表示的, 并且依次递增, 从而方便进行比较, 比如想获取当前状态是否至少为 SHUTDOWN 等, 同时状态之前有几种转换:
RUNNING -> SHUTDOWN. 调用了 shutdown()之后, 或者执行了 finalize()
(RUNNING 或者 SHUTDOWN) -> STOP. 调用了 shutdownNow()之后会转换这个状态
SHUTDOWN -> TIDYING. 当线程池和队列都为空的时候
STOP -> TIDYING. 当线程池为空的时候
IDYING -> TERMINATED. 执行完 terminated()回调之后会转换为这个状态
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static final int COUNT_BITS = Integer.SIZE - 3;
- private static final int CAPACITY = (1 <<COUNT_BITS) - 1;
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
- // 由于前三位表示状态, 因此将 CAPACITY 取反, 和进行与操作即可
- private static int runStateOf(int c) { return c & ~CAPACITY; }
- private static int workerCountOf(int c) { return c & CAPACITY; }
- // 高三位 + 第三位进行或操作即可
- private static int ctlOf(int rs, int wc) { return rs | wc; }
- private static boolean runStateLessThan(int c, int s) {
- return c < s;
- }
- private static boolean runStateAtLeast(int c, int s) {
- return c>= s;
- }
- private static boolean isRunning(int c) {
- return c <SHUTDOWN;
- }
- // 下面三个方法, 通过 CAS 修改 worker 的数目
- private boolean compareAndIncrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect + 1);
- }
- // 只尝试一次, 失败了则返回, 是否重试由调用方决定
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
- // 跟上一个不一样, 会一直重试
- private void decrementWorkerCount() {
- do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- }
下面是比较核心的字段, 这里 workers 采用的是非线程安全的 HashSet, 而不是线程安全的版本, 主要是因为这里有些复合的操作, 比如说将 worker 添加到 workers 后, 我们还需要判断是否需要更新 largestPoolSize 等, workers 只在获取到 mainLock 的情况下才会进行读写, 另外这里的 mainLock 也用于在中断线程的时候串行执行, 否则如果不加锁的话, 可能会造成并发去中断线程, 引起不必要的中断风暴.
- private final ReentrantLock mainLock = new ReentrantLock();
- private final HashSet<Worker> workers = new HashSet<Worker>();
- private final Condition termination = mainLock.newCondition();
- private int largestPoolSize;
- private long completedTaskCount;
核心方法
拿到一个线程池之后, 我们就可以开始提交任务, 让它去执行了, 那么我们看一下 submit 方法是如何实现的.
- public Future<?> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<Void> ftask = newTaskFor(task, null);
- execute(ftask);
- return ftask;
- }
- public <T> Future<T> submit(Callable<T> task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<T> ftask = newTaskFor(task);
- execute(ftask);
- return ftask;
- }
这两个方法都很简单, 首先将提交过来的任务 (有两种形式: Callable,Runnable) 都包装成统一的 RunnableFuture, 然后调用 execute 方法, execute 可以说是线程池最核心的一个方法.
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- /*
- 获取当前 worker 的数目, 如果小于 corePoolSize 那么就扩容,
- 这里不会判断是否已经有 core 线程, 而是只要小于 corePoolSize 就会直接增加 worker
- */
- if (workerCountOf(c) <corePoolSize) {
- /*
- 调用 addWorker(Runnable firstTask, boolean core)方法扩容
- firstTask 表示为该 worker 启动之后要执行的第一个任务, core 表示要增加的为 core 线程
- */
- if (addWorker(command, true))
- return;
- // 如果增加失败了那么重新获取 ctl 的快照, 比如可能线程池在这期间关闭了
- c = ctl.get();
- }
- /*
- 如果当前线程池正在运行中, 并且将任务丢到队列中成功了,
- 那么就会进行一次 double check, 看下在这期间线程池是否关闭了,
- 如果关闭了, 比如处于 SHUTDOWN 状态, 如上文所讲的, SHUTDOWN 状态的时候,
- 不再接受新任务, remove 成功后调用拒绝处理器. 而如果仍然处于运行中的状态,
- 那么这里就 double check 下当前的 worker 数, 如果为 0, 有可能在上述逻辑的执行
- 过程中, 有 worker 销毁了, 比如说任务抛出了未捕获异常等, 那么就会进行一次扩容,
- 但不同于扩容 core 线程, 这里由于任务已经丢到队列中去了, 因此就不需要再传递 firstTask 了,
- 同时要注意, 这里扩容的是非 core 线程
- */
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- /*
- 如果在上一步中, 将任务丢到队列中失败了, 那么就进行一次扩容,
- 这里会将任务传递到 firstTask 参数中, 并且扩容的是非 core 线程,
- 如果扩容失败了, 那么就执行拒绝策略.
- */
- reject(command);
- }
这里要特别注意下防止队列失败的逻辑, 不同的队列丢任务的逻辑也不一样, 例如说无界队列, 那么就永远不会 put 失败, 也就是说扩容也永远不会执行, 如果是有界队列, 那么当队列满的时候, 会扩容非 core 线程, 如果是 SynchronousQueue, 这个队列比较特殊, 当有另外一个线程正在同步获取任务的时候, 你才能 put 成功, 因此如果当前线程池中所有的 worker 都忙着处理任务的时候, 那么后续的每次新任务都会导致扩容, 当然如果 worker 没有任务处理了, 阻塞在获取任务这一步的时候, 新任务的提交就会直接丢到队列中去, 而不会扩容.
上文中多次提到了扩容, 那么我们下面看一下线程池具体是如何进行扩容的:
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- // 获取当前线程池的状态
- int rs = runStateOf(c);
- /*
- 如果状态为大于 SHUTDOWN, 比如说 STOP,STOP 上文说过队列中的任务不处理了, 也不接受新任务,
- 因此可以直接返回 false 不扩容了, 如果状态为 SHUTDOWN 并且 firstTask 为 null, 同时队列非空,
- 那么就可以扩容
- */
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- /*
- 若 worker 的数目大于 CAPACITY 则直接返回,
- 然后根据要扩容的是 core 线程还是非 core 线程, 进行判断 worker 数目
- 是否超过设置的值, 超过则返回
- */
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- /*
- 通过 CAS 的方式自增 worker 的数目, 成功了则直接跳出循环
- */
- if (compareAndIncrementWorkerCount(c))
- break retry;
- // 重新读取状态变量, 如果状态改变了, 比如线程池关闭了, 那么就跳到最外层的 for 循环,
- // 注意这里跳出的是 retry.
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 创建 Worker
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- /*
- 获取锁, 并判断线程池是否已经关闭
- */
- int rs = runStateOf(ctl.get());
- if (rs <SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // 若线程已经启动了, 比如说已经调用了 start()方法, 那么就抛异常,
- throw new IllegalThreadStateException();
- // 添加到 workers 中
- workers.add(w);
- int s = workers.size();
- if (s> largestPoolSize) // 更新 largestPoolSize
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // 若 Worker 创建成功, 则启动线程, 这么时候 worker 就会开始执行任务了
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- // 添加失败
- addWorkerFailed(w);
- }
- return workerStarted;
- }
- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (w != null)
- workers.remove(w);
- decrementWorkerCount();
- // 每次减少 worker 或者从队列中移除任务的时候都需要调用这个方法
- tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }
这里有个貌似不太起眼的方法 tryTerminate, 这个方法会在所有可能导致线程池终结的地方调用, 比如说减少 worker 的数目等, 如果满足条件的话, 那么将线程池转换为 TERMINATED 状态. 另外这个方法没有用 private 修饰, 因为 ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor, 而 ScheduledThreadPoolExecutor 也会调用这个方法.
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- /*
- 如果当前线程处于运行中, TIDYING,TERMINATED 状态则直接返回, 运行中的没
- 什么好说的, 后面两种状态可以说线程池已经正在终结了, 另外如果处于 SHUTDOWN 状态,
- 并且 workQueue 非空, 表明还有任务需要处理, 也直接返回
- */
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- // 可以退出, 但是线程数非 0, 那么就中断一个线程, 从而使得关闭的信号能够传递下去,
- // 中断 worker 后, worker 捕获异常后, 会尝试退出, 并在这里继续执行 tryTerminate()方法,
- // 从而使得信号传递下去
- if (workerCountOf(c) != 0) {
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 尝试转换成 TIDYING 状态, 执行完 terminated 回调之后
- // 会转换为 TERMINATED 状态, 这个时候线程池已经完整关闭了,
- // 通过 signalAll 方法, 唤醒所有阻塞在 awaitTermination 上的线程
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- terminated();
- } finally {
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
- /**
- * 中断空闲的线程
- * @param onlyOne
- */
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers) {
- // 遍历所有 worker, 若之前没有被中断过,
- // 并且获取锁成功, 那么就尝试中断.
- // 锁能够获取成功, 那么表明当前 worker 没有在执行任务, 而是在
- // 获取任务, 因此也就达到了只中断空闲线程的目的.
- Thread t = w.thread;
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
Worker
下面看一下 Worker 类, 也就是这个类实际负责执行任务, Worker 类继承自 AbstractQueuedSynchronizer,AQS 可以理解为一个同步框架, 提供了一些通用的机制, 利用模板方法模式, 让你能够原子的管理同步状态, blocking 和 unblocking 线程, 以及队列, 具体的内容之后有时间会再写, 还是比较复杂的. 这里 Worker 对 AQS 的使用相对比较简单, 使用了状态变量 state 表示是否获得锁, 0 表示解锁, 1 表示已获得锁, 同时通过 exclusiveOwnerThread 存储当前持有锁的线程. 另外再简单提一下, 比如说 CountDownLatch, 也是基于 AQS 框架实现的, countdown 方法递减 state,await 阻塞等待 state 为 0.
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- /** Thread this worker is running in. Null if factory fails. */
- final Thread thread;
- /** Initial task to run. Possibly null. */
- Runnable firstTask;
- /** Per-thread task counter */
- volatile long completedTasks;
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- /** Delegates main run loop to outer runWorker */
- public void run() {
- runWorker(this);
- }
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
- void interruptIfStarted() {
- Thread t;
- if (getState()>= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
注意这里 Worker 初始化的时候, 会通过 setState(-1)将 state 设置为 - 1, 并在 runWorker()方法中置为 0, 上文说过 Worker 是利用 state 这个变量来表示锁的状态, 那么加锁的操作就是通过 CAS 将 state 从 0 改成 1, 那么初始化的时候改成 - 1, 也就是表示在 Worker 启动之前, 都不允许加锁操作, 我们再看 interruptIfStarted()以及 interruptIdleWorkers()方法, 这两个方法在尝试中断 Worker 之前, 都会先加锁或者判断 state 是否大于 0, 因此这里的将 state 设置为 - 1, 就是为了禁止中断操作, 并在 runWorker 中置为 0, 也就是说只能在 Worker 启动之后才能够中断 Worker.
另外线程启动之后, 其实就是调用了 runWorker 方法, 下面我们看一下具体是如何实现的.
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // 调用 unlock()方法, 将 state 置为 0, 表示其他操作可以获得锁或者中断 worker
- boolean completedAbruptly = true;
- try {
- /*
- 首先尝试执行 firstTask, 若没有的话, 则调用 getTask()从队列中获取任务
- */
- while (task != null || (task = getTask()) != null) {
- w.lock();
- /*
- 如果线程池正在关闭, 那么中断线程.
- */
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 执行 beforeExecute 回调
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- // 实际开始执行任务
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 执行 afterExecute 回调
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- // 这里加了锁, 因此没有线程安全的问题, volatile 修饰保证其他线程的可见性
- w.completedTasks++;
- w.unlock();// 解锁
- }
- }
- completedAbruptly = false;
- } finally {
- // 抛异常了, 或者当前队列中已没有任务需要处理等
- processWorkerExit(w, completedAbruptly);
- }
- }
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 如果是异常终止的, 那么减少 worker 的数目
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 将当前 worker 中 workers 中删除掉, 并累加当前 worker 已执行的任务到 completedTaskCount 中
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 上文说过, 减少 worker 的操作都需要调用这个方法
- tryTerminate();
- /*
- 如果当前线程池仍然是运行中的状态, 那么就看一下是否需要新增另外一个 worker 替换此 worker
- */
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- /*
- 如果是异常结束的则直接扩容, 否则的话则为正常退出, 比如当前队列中已经没有任务需要处理,
- 如果允许 core 线程超时的话, 那么看一下当前队列是否为空, 空的话则不用扩容. 否则话看一下
- 是否少于 corePoolSize 个 worker 在运行.
- */
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c)>= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
- private Runnable getTask() {
- boolean timedOut = false; // 上一次 poll()是否超时了
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 若线程池关闭了(状态大于 STOP)
- // 或者线程池处于 SHUTDOWN 状态, 但是队列为空, 那么返回 null
- if (rs>= SHUTDOWN && (rs>= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- /*
- 如果允许 core 线程超时 或者 不允许 core 线程超时但当前 worker 的数目大于 core 线程数,
- 那么下面的 poll()则超时调用
- */
- boolean timed = allowCoreThreadTimeOut || wc> corePoolSize;
- /*
- 获取任务超时了并且(当前线程池中还有不止一个 worker 或者 队列中已经没有任务了), 那么就尝试
- 减少 worker 的数目, 若失败了则重试
- */
- if ((wc> maximumPoolSize || (timed && timedOut))
- && (wc> 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 从队列中抓取任务
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- // 走到这里表明, poll 调用超时了
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
关闭线程池
关闭线程池一般有两种形式, shutdown()和 shutdownNow().
- public void shutdown() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- // 通过 CAS 将状态更改为 SHUTDOWN, 这个时候线程池不接受新任务, 但会继续处理队列中的任务
- advanceRunState(SHUTDOWN);
- // 中断所有空闲的 worker, 也就是说除了正在处理任务的 worker, 其他阻塞在 getTask()上的 worker
- // 都会被中断
- interruptIdleWorkers();
- // 执行回调
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- // 这个方法不会等待所有的任务处理完成才返回
- }
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- /*
- 不同于 shutdown(), 会转换为 STOP 状态, 不再处理新任务, 队列中的任务也不处理,
- 而且会中断所有的 worker, 而不只是空闲的 worker
- */
- advanceRunState(STOP);
- interruptWorkers();
- tasks = drainQueue();// 将所有的任务从队列中弹出
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
- private List<Runnable> drainQueue() {
- BlockingQueue<Runnable> q = workQueue;
- ArrayList<Runnable> taskList = new ArrayList<Runnable>();
- /*
- 将队列中所有的任务 remove 掉, 并添加到 taskList 中,
- 但是有些队列比较特殊, 比如说 DelayQueue, 如果第一个任务还没到过期时间, 则不会弹出,
- 因此这里通过调用 toArray 方法, 然后再一个一个的 remove 掉
- */
- q.drainTo(taskList);
- if (!q.isEmpty()) {
- for (Runnable r : q.toArray(new Runnable[0])) {
- if (q.remove(r))
- taskList.add(r);
- }
- }
- return taskList;
- }
从上文中可以看到, 调用了 shutdown()方法后, 不会等待所有的任务处理完毕才返回, 因此需要调用 awaitTermination()来实现.
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- // 线程池若已经终结了, 那么就返回
- if (runStateAtLeast(ctl.get(), TERMINATED))
- return true;
- // 若超时了, 也返回掉
- if (nanos <= 0)
- return false;
- // 阻塞在信号量上, 等待线程池终结, 但是要注意这个方法可能会因为一些未知原因随时唤醒当前线程,
- // 因此需要重试, 在 tryTerminate()方法中, 执行完 terminated()回调后, 表明线程池已经终结了,
- // 然后会通过 termination.signalAll()唤醒当前线程
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- mainLock.unlock();
- }
- }
一些统计相关的方法
- public int getPoolSize() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 若线程已终结则直接返回 0, 否则计算 works 中的数目
- // 想一下为什么不用 workerCount 呢?
- return runStateAtLeast(ctl.get(), TIDYING) ? 0
- : workers.size();
- } finally {
- mainLock.unlock();
- }
- }
- public int getActiveCount() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- int n = 0;
- for (Worker w : workers)
- if (w.isLocked())// 上锁的表明 worker 当前正在处理任务, 也就是活跃的 worker
- ++n;
- return n;
- } finally {
- mainLock.unlock();
- }
- }
- public int getLargestPoolSize() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- return largestPoolSize;
- } finally {
- mainLock.unlock();
- }
- }
- // 获取任务的总数, 这个方法慎用, 若是个无解队列, 或者队列挤压比较严重, 会很蛋疼
- public long getTaskCount() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- long n = completedTaskCount;// 比如有些 worker 被销毁后, 其处理完成的任务就会叠加到这里
- for (Worker w : workers) {
- n += w.completedTasks;// 叠加历史处理完成的任务
- if (w.isLocked())// 上锁表明正在处理任务, 也算一个
- ++n;
- }
- return n + workQueue.size();// 获取队列中的数目
- } finally {
- mainLock.unlock();
- }
- }
- public long getCompletedTaskCount() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- long n = completedTaskCount;
- for (Worker w : workers)
- n += w.completedTasks;
- return n;
- } finally {
- mainLock.unlock();
- }
- }
总结
这篇博客基本上覆盖了线程池的方方面面, 但仍然有非常多的细节可以深究, 比如说异常的处理, 可以参照之前的一篇博客:《 深度解析 Java 线程池的异常处理机制 https://github.com/aCoder2013/blog/issues/3 》, 另外还有 AQS,unsafe 等可以之后再单独总结.
来源: http://www.tuicool.com/articles/zmeAnqA