java 中的所说的线程池, 一般都是围绕着 ThreadPoolExecutor 来展开的. 其他的实现基本都是基于它, 或者模仿它的. 所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓.
其实要理解一个东西, 一般地, 我们最好是要抱着自己的疑问或者理解去的. 否则, 往往收获甚微.
理解 ThreadPoolExecutor, 我们可以先理解一个线程池的意义: 本质上是提供预先定义好的 n 个线程, 供调用方直接运行任务的一个工具.
线程池解决的问题:
1. 提高任务执行的响应速度, 降低资源消耗. 任务执行时, 直接立即使用线程池提供的线程运行, 避免了临时创建线程的 CPU / 内存开销, 达到快速响应的效果.
2. 提高线程的可管理性. 线程总数可预知, 避免用户主动创建无限多线程导致死机风险, 还可以进行线程统一的分配, 调优和监控.
3. 避免对资源的过度使用. 在超出预期的请求任务情况, 响应策略可控.
线程池提供的核心接口:
要想使用线程池, 自然是要理解其接口的. 一般我们使用 ExecotorService 进行线程池的调用. 然而, 我们并不针对初学者.
整体的接口如下:
我们就挑几个常用接口探讨下:
submit(Runnable task): 提交一个无需返回结果的任务.
submit(Callable<T> task): 提交一个有返回结果的任务.
invokeAll(Collection<? extends Callable<T>> tasks, long, TimeUnit): 同时执行 n 个任务并返回结果列表.
shutdown(): 关闭线程程池.
awaitTermination(long timeout, TimeUnit unit): 等待关闭结果, 最长不超过 timeout 时间.
以上是 ThreadPoolExector 提供的特性, 针对以上特性.
我们应该要有自己的几个实现思路或疑问:
1. 线程池如何接受任务?
2. 线程如何运行任务?
3. 线程池如何关闭?
接下来, 就让我们带着疑问去看实现吧.
ThreadPoolExecutor 核心实现原理
1. 线程池的处理流程
我们首先重点要看的是, 如何执行提交的任务. 我可以通过下图来看看.
总结描述下就是:
1. 判断核心线程池是否已满, 如果不是, 则创建线程执行任务
2. 如果核心线程池满了, 判断队列是否满了, 如果队列没满, 将任务放在队列中
3. 如果队列满了, 则判断线程池是否已满, 如果没满, 创建线程执行任务
4. 如果线程池也满了, 则按照拒绝策略对任务进行处理
另外, 我们来看一下 ThreadPoolExecutor 的构造方法, 因为这里会体现出每个属性的含义.
- /**
- * Creates a new {@code ThreadPoolExecutor} with the given initial
- * parameters.
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param maximumPoolSize the maximum number of threads to allow in the
- * pool
- * @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the {@code keepAliveTime} argument
- * @param workQueue the queue to use for holding tasks before they are
- * executed. This queue will hold only the {@code Runnable}
- * tasks submitted by the {@code execute} method.
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- * @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached
- * @throws IllegalArgumentException if one of the following holds:<br>
- * {@code corePoolSize <0}<br>
- * {@code keepAliveTime <0}<br>
- * {@code maximumPoolSize <= 0}<br>
- * {@code maximumPoolSize <corePoolSize}
- * @throws NullPointerException if {@code workQueue}
- * or {@code threadFactory} or {@code handler} is null
- */
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize <0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个, 在其注释上也有说明功能, 咱们翻译下每个参数的功能:
corePoolSize: 线程池核心线程数 (平时保留的线程数), 使用时机: 在初始时刻, 每次请求进来都会创建一个线程直到达到该 size
maximumPoolSize: 线程池最大线程数, 使用时机: 当 workQueue 都放不下时, 启动新线程, 直到最大线程数, 此时到达线程池的极限
keepAliveTime/unit: 超出 corePoolSize 数量的线程的保留时间, unit 为时间单位
workQueue: 阻塞队列, 当核心线程数达到或者超出后, 会先尝试将任务放入该队列由各线程自行消费;
ArrayBlockingQueue: 构造函数一定要传大小
LinkedBlockingQueue: 构造函数不传大小会默认为 65536(Integer.MAX_VALUE ), 当大量请求任务时, 容易造成 内存耗尽.
SynchronousQueue: 同步队列, 一个没有存储空间的阻塞队列 , 将任务同步交付给工作线程.
PriorityBlockingQueue: 优先队列
threadFactory: 线程工厂, 用于线程需要创建时, 调用其 newThread() 生产新线程使用
handler: 饱和策略, 当队列已放不下任务, 且创建的线程已达到 maximum 后, 则不能再处理任务, 直接将任务交给饱和策略
AbortPolicy: 直接抛弃 (默认)
CallerRunsPolicy: 用调用者的线程执行任务
DiscardOldestPolicy: 抛弃队列中最久的任务
DiscardPolicy: 抛弃当前任务
2. submit 流程详解
当调用 submit 方法, 就是向线程池中提交一个任务, 处理流程如步骤 1 所示. 但是我们需要更深入理解.
submit 方法是定义在 AbstractExecutorService 中, 最终调用 ThreadPoolExecutor 的 execute 方法, 即是模板方法模式的应用.
- // java.util.concurrent.AbstractExecutorService#submit(java.lang.Runnable, T)
- /**
- * @throws RejectedExecutionException {@inheritDoc}
- * @throws NullPointerException {@inheritDoc}
- */
- public <T> Future<T> submit(Runnable task, T result) {
- if (task == null) throw new NullPointerException();
- // 封装任务和返回结果为 RunnableFuture, 统一交由具体的子类执行
- RunnableFuture<T> ftask = newTaskFor(task, result);
- // execute 将会调用 ThreadPoolExecutor 的实现, 是我们讨论的重要核心
- execute(ftask);
- return ftask;
- }
- // FutureTask 是个重要的线程池组件, 它承载了具体的任务执行流
- /**
- * Returns a {@code RunnableFuture} for the given runnable and default
- * value.
- *
- * @param runnable the runnable task being wrapped
- * @param value the default value for the returned future
- * @param <T> the type of the given value
- * @return a {@code RunnableFuture} which, when run, will run the
- * underlying runnable and which, as a {@code Future}, will yield
- * the given value as its result and provide for cancellation of
- * the underlying task
- * @since 1.6
- */
- protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
- return new FutureTask<T>(runnable, value);
- }
- // ThreadPoolExecutor 的任务提交过程
- // java.util.concurrent.ThreadPoolExecutor#execute
- /**
- * Executes the given task sometime in the future. The task
- * may execute in a new thread or in an existing pooled thread.
- *
- * If the task cannot be submitted for execution, either because this
- * executor has been shutdown or because its capacity has been reached,
- * the task is handled by the current {@code RejectedExecutionHandler}.
- *
- * @param command the task to execute
- * @throws RejectedExecutionException at discretion of
- * {@code RejectedExecutionHandler}, if the task
- * cannot be accepted for execution
- * @throws NullPointerException if {@code command} is null
- */
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps:
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- *
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- *
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- */
- // ctl 是一个重要的控制全局状态的数据结构, 定义为一个线程安全的 AtomicInteger
- // ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- int c = ctl.get();
- // 当还没有达到核心线程池的数量时, 直接添加 1 个新线程, 然后让其执行任务即可
- if (workerCountOf(c) <corePoolSize) {
- // 2.1. 添加新线程, 且执行 command 任务
- // 添加成功, 即不需要后续操作了, 添加失败, 则说明外部环境变化了
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 当核心线程达到后, 则尝试添加到阻塞队列中, 具体添加方法由阻塞队列实现
- // isRunning => c <SHUTDOWN;
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 2.2. 添加队列成功后, 还要再次检测线程池的运行状态, 决定启动线程或者状态过期
- // 2.2.1. 当线程池已关闭, 则将刚刚添加的任务移除, 走 reject 策略
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 2.2.2. 当一个 worker 都没有时, 则添加 worker
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 当队列满后, 则直接再创建新的线程运行, 如果不能再创建线程了, 则 reject
- else if (!addWorker(command, false))
- // 2.3. 拒绝策略处理
- reject(command);
- }
通过上面这一小段代码, 我们就已经完整地看到了. 通过一个 ctl 变量进行全局状态控制, 从而保证了线程安全性. 整个框架并没有使用锁, 但是却是线程安全的.
整段代码刚好完整描述了线程池的执行流程:
1. 判断核心线程池是否已满, 如果不是, 则创建线程执行任务;
2. 如果核心线程池满了, 判断队列是否满了, 如果队列没满, 将任务放在队列中;
3. 如果队列满了, 则判断线程池是否已满, 如果没满, 创建线程执行任务;
4. 如果线程池也满了, 则按照拒绝策略对任务进行处理;
2.1. 添加新的 worker
一个 worker, 即是一个工作线程.
- /**
- * Checks if a new worker can be added with respect to current
- * pool state and the given bound (either core or maximum). If so,
- * the worker count is adjusted accordingly, and, if possible, a
- * new worker is created and started, running firstTask as its
- * first task. This method returns false if the pool is stopped or
- * eligible to shut down. It also returns false if the thread
- * factory fails to create a thread when asked. If the thread
- * creation fails, either due to the thread factory returning
- * null, or due to an exception (typically OutOfMemoryError in
- * Thread.start()), we roll back cleanly.
- *
- * @param firstTask the task the new thread should run first (or
- * null if none). Workers are created with an initial first task
- * (in method execute()) to bypass queuing when there are fewer
- * than corePoolSize threads (in which case we always start one),
- * or when the queue is full (in which case we must bypass queue).
- * Initially idle threads are usually created via
- * prestartCoreThread or to replace other dying workers.
- *
- * @param core if true use corePoolSize as bound, else
- * maximumPoolSize. (A boolean indicator is used here rather than a
- * value to ensure reads of fresh values after checking other pool
- * state).
- * @return true if successful
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 为确保线程安全, 进行 CAS 反复重试
- retry:
- for (;;) {
- int c = ctl.get();
- // 获取 runState , c 的高位存储
- // c & ~CAPACITY;
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- // 已经 shutdown, firstTask 为空的添加并不会成功
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- // 如果超出最大允许创建的线程数, 则直接失败
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- // CAS 更新 worker+1 数, 成功则说明占位成功退出 retry, 后续的添加操作将是安全的, 失败则说明已有其他线程变更该值
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- // runState 变更, 则退出到 retry 重新循环
- if (runStateOf(c) != rs)
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // 以下为添加 worker 过程
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 使用 Worker 封闭 firstTask 任务, 后续运行将由 Worker 接管
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- // 添加 worker 的过程, 需要保证线程安全
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int rs = runStateOf(ctl.get());
- // SHUTDOWN 情况下还是会创建 Worker, 但是后续检测将会失败
- if (rs <SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- // 既然是新添加的线程, 就不应该是 alive 状态
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- // workers 只是一个工作线程的容器, 使用 HashSet 承载
- // private final HashSet<Worker> workers = new HashSet<Worker>();
- workers.add(w);
- int s = workers.size();
- // 维护一个全局达到过的最大线程数计数器
- if (s> largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- // worker 添加成功后, 进行将 worker 启起来, 里面应该是有一个 死循环, 一直在获取任务
- // 不然怎么运行添加到队列里的任务呢?
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- // 如果任务启动失败, 则必须进行清理, 返回失败
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
- // 大概添加 worker 的框架明白了, 重点对象是 Worker, 我们稍后再讲
- // 现在先来看看, 添加失败的情况, 如何进行
- /**
- * Rolls back the worker thread creation.
- * - removes worker from workers, if present
- * - decrements worker count
- * - rechecks for termination, in case the existence of this
- * worker was holding up termination
- */
- private void addWorkerFailed(Worker w) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (w != null)
- workers.remove(w);
- // ctl 中的 workerCount - 1 , CAS 实现
- decrementWorkerCount();
- // 尝试处理空闲线程
- tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }
- /**
- * Decrements the workerCount field of ctl. This is called only on
- * abrupt termination of a thread (see processWorkerExit). Other
- * decrements are performed within getTask.
- */
- private void decrementWorkerCount() {
- do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- }
- // 停止可能启动的 worker
- /**
- * Transitions to TERMINATED state if either (SHUTDOWN and pool
- * and queue empty) or (STOP and pool empty). If otherwise
- * eligible to terminate but workerCount is nonzero, interrupts an
- * idle worker to ensure that shutdown signals propagate. This
- * method must be called following any action that might make
- * termination possible -- reducing worker count or removing tasks
- * from the queue during shutdown. The method is non-private to
- * allow access from ScheduledThreadPoolExecutor.
- */
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- // 线程池正在运行, 正在清理, 已关闭但队列还未处理完, 都不会进行 terminate 操作
- if (isRunning(c) ||
- // c>= TIDYING
- runStateAtLeast(c, TIDYING) ||
- (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
- return;
- if (workerCountOf(c) != 0) { // Eligible to terminate
- // 停止线程的两个方式之一, 只中断一个 worker
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- // 以下为整个线程池的后置操作
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 设置正在清理标识
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- // 线程池已终止的钩子方法, 默认实现为空
- terminated();
- } finally {
- ctl.set(ctlOf(TERMINATED, 0));
- // 此处 termination 为唤醒等待关闭的线程
- termination.signalAll();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
- /**
- * Interrupts threads that might be waiting for tasks (as
- * indicated by not being locked) so they can check for
- * termination or configuration changes. Ignores
- * SecurityExceptions (in which case some threads may remain
- * uninterrupted).
- *
- * @param onlyOne If true, interrupt at most one worker. This is
- * called only from tryTerminate when termination is otherwise
- * enabled but there are still other workers. In this case, at
- * most one waiting worker is interrupted to propagate shutdown
- * signals in case all threads are currently waiting.
- * Interrupting any arbitrary thread ensures that newly arriving
- * workers since shutdown began will also eventually exit.
- * To guarantee eventual termination, it suffices to always
- * interrupt only one idle worker, but shutdown() interrupts all
- * idle workers so that redundant workers exit promptly, not
- * waiting for a straggler task to finish.
- */
- private void interruptIdleWorkers(boolean onlyOne) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 迭代所有 worker
- for (Worker w : workers) {
- Thread t = w.thread;
- // 获取到 worker 的锁之后, 再进行 interrupt
- if (!t.isInterrupted() && w.tryLock()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- } finally {
- w.unlock();
- }
- }
- // 只中断一个 worker, 立即返回, 不保证 interrupt 成功
- if (onlyOne)
- break;
- }
- } finally {
- mainLock.unlock();
- }
- }
2.2. 当添加队列成功后, 发现线程池状态变更, 需要进行移除队列操作
- /**
- * Removes this task from the executor's internal queue if it is
- * present, thus causing it not to be run if it has not already
- * started.
- *
- * <p>This method may be useful as one part of a cancellation
- * scheme. It may fail to remove tasks that have been converted
- * into other forms before being placed on the internal queue. For
- * example, a task entered using {@code submit} might be
- * converted into a form that maintains {@code Future} status.
- * However, in such cases, method {@link #purge} may be used to
- * remove those Futures that have been cancelled.
- *
- * @param task the task to remove
- * @return {@code true} if the task was removed
- */
- public boolean remove(Runnable task) {
- // 此移除不一定能成功
- boolean removed = workQueue.remove(task);
- // 上面已经看过, 它会尝试停止一个 worker 线程
- tryTerminate(); // In case SHUTDOWN and now empty
- return removed;
- }
3. 添加失败进行执行拒绝策略
- /**
- * Invokes the rejected execution handler for the given command.
- * Package-protected for use by ScheduledThreadPoolExecutor.
- */
- final void reject(Runnable command) {
- // 拒绝策略是在构造方法时传入的, 默认为 RejectedExecutionHandler
- // 即用户只需实现 rejectedExecution 方法, 即可以自定义拒绝策略了
- handler.rejectedExecution(command, this);
- }
4. Worker 的工作机制
从上面的实现中, 我们可以看到, 主要是对 Worker 的添加和 workQueue 的添加, 所以具体的工作是由谁完成呢? 自然就是 Worker 了.
- // Worker 的构造方法, 主要是接受一个 task, 可以为 null, 如果非 null, 将在不久的将来被执行
- // private final class Worker extends AbstractQueuedSynchronizer implements Runnable
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- // 将 Worker 自身当作一个 任务, 绑定到 worker.thread 中
- // thread 启动时, worker 就启动了
- this.thread = getThreadFactory().newThread(this);
- }
- // Worker 的主要工作实现, 通过一个循环扫描实现
- /** Delegates main run loop to outer runWorker */
- public void run() {
- // 调用 ThreadPoolExecutor 外部实现的 runWorker 方法
- runWorker(this);
- }
- /**
- * Main worker run loop. Repeatedly gets tasks from queue and
- * executes them, while coping with a number of issues:
- *
- * 1. We may start out with an initial task, in which case we
- * don't need to get the first one. Otherwise, as long as pool is
- * running, we get tasks from getTask. If it returns null then the
- * worker exits due to changed pool state or configuration
- * parameters. Other exits result from exception throws in
- * external code, in which case completedAbruptly holds, which
- * usually leads processWorkerExit to replace this thread.
- *
- * 2. Before running any task, the lock is acquired to prevent
- * other pool interrupts while the task is executing, and then we
- * ensure that unless pool is stopping, this thread does not have
- * its interrupt set.
- *
- * 3. Each task run is preceded by a call to beforeExecute, which
- * might throw an exception, in which case we cause thread to die
- * (breaking loop with completedAbruptly true) without processing
- * the task.
- *
- * 4. Assuming beforeExecute completes normally, we run the task,
- * gathering any of its thrown exceptions to send to afterExecute.
- * We separately handle RuntimeException, Error (both of which the
- * specs guarantee that we trap) and arbitrary Throwables.
- * Because we cannot rethrow Throwables within Runnable.run, we
- * wrap them within Errors on the way out (to the thread's
- * UncaughtExceptionHandler). Any thrown exception also
- * conservatively causes thread to die.
- *
- * 5. After task.run completes, we call afterExecute, which may
- * also throw an exception, which will also cause thread to
- * die. According to JLS Sec 14.20, this exception is the one that
- * will be in effect even if task.run throws.
- *
- * The.NET effect of the exception mechanics is that afterExecute
- * and the thread's UncaughtExceptionHandler have as accurate
- * information as we can provide about any problems encountered by
- * user code.
- *
- * @param w the worker
- */
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 不停地从 workQueue 中获取任务, 然后执行, 就是这么个逻辑
- // getTask() 会阻塞式获取, 所以 Worker 往往不会立即退出
- while (task != null || (task = getTask()) != null) {
- // 执行过程中是不允许并发的, 即同时只能一个 task 在运行, 此时也不允许进行 interrupt
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- // 检测是否已被线程池是否停止 或者当前 worker 被中断
- // STOP = 1 <<COUNT_BITS;
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- // 中断信息传递
- wt.interrupt();
- try {
- // 任务开始前 切点, 默认为空执行
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- // 直接调用任务的 run 方法, 具体的返回结果, 会被 FutureTask 封装到 某个变量中
- // 可以参考以前的文章 (FutureTask 是怎样获取到异步执行结果的? https://www.cnblogs.com/yougewe/p/11666284.html)
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 任务开始后 切点, 默认为空执行
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- // 正常退出, 有必要的话, 可能重新将 Worker 添加进来
- completedAbruptly = false;
- } finally {
- // 处理退出后下一步操作, 可能重新添加 Worker
- processWorkerExit(w, completedAbruptly);
- }
- }
- /**
- * Performs cleanup and bookkeeping for a dying worker. Called
- * only from worker threads. Unless completedAbruptly is set,
- * assumes that workerCount has already been adjusted to account
- * for exit. This method removes thread from worker set, and
- * possibly terminates the pool or replaces the worker if either
- * it exited due to user task exception or if fewer than
- * corePoolSize workers are running or queue is non-empty but
- * there are no workers.
- *
- * @param w the worker
- * @param completedAbruptly if the worker died due to user exception
- */
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- // 在 Worker 正常退出的情况下, 检查是否超时导致, 维持最小线程数
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 如果满足最小线程要求, 则直接返回
- if (workerCountOf(c)>= min)
- return; // replacement not needed
- }
- // 否则再添加一个 Worker 到线程池中备用
- // 非正常退出, 会直接再添加一个 Worker
- addWorker(null, false);
- }
- }
- /**
- * Performs blocking or timed wait for a task, depending on
- * current configuration settings, or returns null if this worker
- * must exit because of any of:
- * 1. There are more than maximumPoolSize workers (due to
- * a call to setMaximumPoolSize).
- * 2. The pool is stopped.
- * 3. The pool is shutdown and the queue is empty.
- * 4. This worker timed out waiting for a task, and timed-out
- * workers are subject to termination (that is,
- * {@code allowCoreThreadTimeOut || workerCount> corePoolSize})
- * both before and after the timed wait, and if the queue is
- * non-empty, this worker is not the last thread in the pool.
- *
- * @return task, or null if the worker must exit, in which case
- * workerCount is decremented
- */
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- // 如果进行了 shutdown, 且队列为空, 则需要将 worker 退出
- if (rs>= SHUTDOWN && (rs>= STOP || workQueue.isEmpty())) {
- // do {} while (! compareAndDecrementWorkerCount(ctl.get()));
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc> corePoolSize;
- // 线程数据大于最大允许线程, 需要删除多余的 Worker
- if ((wc> maximumPoolSize || (timed && timedOut))
- && (wc> 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 如果开户了超时删除功能, 则使用 poll, 否则使用 take() 进行阻塞获取
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- // 获取到任务, 则可以进行执行了
- if (r != null)
- return r;
- // 如果有超时设置, 则会在下一循环时退出
- timedOut = true;
- }
- // 忽略中断异常
- // 在这种情况下, Worker 如何响应外部的中断请求呢??? 思考
- catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
所以, Worker 的作用就体现出来了, 一个循环取任务执行任务过程:
1. 有一个主循环一直进行任务的获取;
2. 针对有超时的设置, 会使用 poll 进行获取任务, 如果超时, 则 Worker 将会退出循环结束线程;
3. 无超时的设置, 则会使用 take 进行阻塞式获取, 直到有值;
4. 获取任务执行前置 + 业务 + 后置任务;
5. 当获取到 null 的任务之后, 当前 Worker 将会结束;
6. 当前 Worker 结束后, 将会判断是否有必要维护最低 Worker 数, 从而决定是否再添加 Worker 进来.
还是借用一个网上同学比较通用的一个图来表述下 Worker/ThreadPoolExecutor 的工作流程吧 (已经很完美, 不需要再造这轮子了)
5. shutdown 操作实现
ThreadPoolExecutor 是通过 ctl 这个变量进行全局状态维护的, shutdown 在线程池中也是表现为一个状态, 所以应该是比较简单的.
- /**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- *
- * <p>This method does not wait for previously submitted tasks to
- * complete execution. Use {@link #awaitTermination awaitTermination}
- * to do that.
- *
- * @throws SecurityException {@inheritDoc}
- */
- public void shutdown() {
- // 为保证线程安全, 使用 mainLock
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // SecurityManager 检查
- checkShutdownAccess();
- // 设置状态为 SHUTDOWN
- advanceRunState(SHUTDOWN);
- // 中断空闲的 Worker, 即相当于依次关闭每个空闲线程
- interruptIdleWorkers();
- // 关闭钩子, 默认实现为空操作, 为方便子类实现自定义清理功能
- onShutdown(); // hook for ScheduledThreadPoolExecutor
- } finally {
- mainLock.unlock();
- }
- // 再
- tryTerminate();
- }
- /**
- * Transitions runState to given target, or leaves it alone if
- * already at least the given target.
- *
- * @param targetState the desired state, either SHUTDOWN or STOP
- * (but not TIDYING or TERMINATED -- use tryTerminate for that)
- */
- private void advanceRunState(int targetState) {
- for (;;) {
- int c = ctl.get();
- // 自身 CAS 更新成功或者被其他线程更新成功
- if (runStateAtLeast(c, targetState) ||
- ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
- break;
- }
- }
- // 关闭空闲线程 (非 running 状态)
- /**
- * Common form of interruptIdleWorkers, to avoid having to
- * remember what the boolean argument means.
- */
- private void interruptIdleWorkers() {
- // 上文已介绍, 此处 ONLY_ONE 为 false, 即是最大可能地中断所有 Worker
- interruptIdleWorkers(false);
- }
与 shutdown 对应的, 有一个 shutdownNow, 其语义是 立即停止所有任务.
- /**
- * Attempts to stop all actively executing tasks, halts the
- * processing of waiting tasks, and returns a list of the tasks
- * that were awaiting execution. These tasks are drained (removed)
- * from the task queue upon return from this method.
- *
- * <p>This method does not wait for actively executing tasks to
- * terminate. Use {@link #awaitTermination awaitTermination} to
- * do that.
- *
- * <p>There are no guarantees beyond best-effort attempts to stop
- * processing actively executing tasks. This implementation
- * cancels tasks via {@link Thread#interrupt}, so any task that
- * fails to respond to interrupts may never terminate.
- *
- * @throws SecurityException {@inheritDoc}
- */
- public List<Runnable> shutdownNow() {
- List<Runnable> tasks;
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- checkShutdownAccess();
- // 与 shutdown 的差别, 设置的状态不一样
- advanceRunState(STOP);
- // 强行中断线程
- interruptWorkers();
- // 将未完成的任务返回
- tasks = drainQueue();
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- return tasks;
- }
- /**
- * Interrupts all threads, even if active. Ignores SecurityExceptions
- * (in which case some threads may remain uninterrupted).
- */
- private void interruptWorkers() {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (Worker w : workers)
- // 调用 worker 的提供的中断方法
- w.interruptIfStarted();
- } finally {
- mainLock.unlock();
- }
- }
- // ThreadPoolExecutor.Worker#interruptIfStarted
- void interruptIfStarted() {
- Thread t;
- if (getState()>= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- // 直接调用任务的 interrupt
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
6. invokeAll 的实现方式
invokeAll, 望文生义, 即是调用所有给定的任务. 想来应该是一个个地添加任务到线程池队列吧.
- // invokeAll 的方法直接在抽象方便中就实现了, 它的语义是同时执行 n 个任务, 并同步等待结果返回
- // java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- if (tasks == null)
- throw new NullPointerException();
- ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
- boolean done = false;
- try {
- for (Callable<T> t : tasks) {
- RunnableFuture<T> f = newTaskFor(t);
- futures.add(f);
- // 依次调用各子类的实现, 添加任务
- execute(f);
- }
- for (int i = 0, size = futures.size(); i <size; i++) {
- Future<T> f = futures.get(i);
- if (!f.isDone()) {
- try {
- // 依次等待执行结果
- f.get();
- } catch (CancellationException ignore) {
- } catch (ExecutionException ignore) {
- }
- }
- }
- done = true;
- return futures;
- } finally {
- if (!done)
- for (int i = 0, size = futures.size(); i <size; i++)
- futures.get(i).cancel(true);
- }
- }
实现很简单, 都是些外围调用.
7. ThreadPoolExecutor 的状态值的设计
通过上面的过程, 可以看到, 整个 ThreadPoolExecutor 非状态的依赖是非常强的. 所以一个好的状态值的设计就显得很重要了, runState 代表线程池或者 Worker 的运行状态. 如下:
- // runState is stored in the high-order bits
- // 整个状态使值使用 ctl 的高三位值进行控制, COUNT_BITS=29
- // 1110 0000 0000 0000
- private static final int RUNNING = -1 << COUNT_BITS;
- // 0000 0000 0000 0000
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- // 0010 0000 0000 0000
- private static final int STOP = 1 << COUNT_BITS;
- // 0100 0000 0000 0000
- private static final int TIDYING = 2 << COUNT_BITS;
- // 0110 0000 0000 0000
- private static final int TERMINATED = 3 << COUNT_BITS;
- // 整个状态值的大小顺序主: RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
- // 而低 29 位, 则用来保存 worker 的数量, 当 worker 增加时, 只要将整个 ctl 增加即可.
- // 0001 1111 1111 1111, 即是最大的 worker 数量
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
- // 整个 ctl 描述为一个 AtomicInteger, 功能如下:
- /**
- * The main pool control state, ctl, is an atomic integer packing
- * two conceptual fields
- * workerCount, indicating the effective number of threads
- * runState, indicating whether running, shutting down etc
- *
- * In order to pack them into one int, we limit workerCount to
- * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
- * billion) otherwise representable. If this is ever an issue in
- * the future, the variable can be changed to be an AtomicLong,
- * and the shift/mask constants below adjusted. But until the need
- * arises, this code is a bit faster and simpler using an int.
- *
- * The workerCount is the number of workers that have been
- * permitted to start and not permitted to stop. The value may be
- * transiently different from the actual number of live threads,
- * for example when a ThreadFactory fails to create a thread when
- * asked, and when exiting threads are still performing
- * bookkeeping before terminating. The user-visible pool size is
- * reported as the current size of the workers set.
- *
- * The runState provides the main lifecycle control, taking on values:
- *
- * RUNNING: Accept new tasks and process queued tasks
- * SHUTDOWN: Don't accept new tasks, but process queued tasks
- * STOP: Don't accept new tasks, don't process queued tasks,
- * and interrupt in-progress tasks
- * TIDYING: All tasks have terminated, workerCount is zero,
- * the thread transitioning to state TIDYING
- * will run the terminated() hook method
- * TERMINATED: terminated() has completed
- *
- * The numerical order among these values matters, to allow
- * ordered comparisons. The runState monotonically increases over
- * time, but need not hit each state. The transitions are:
- *
- * RUNNING -> SHUTDOWN
- * On invocation of shutdown(), perhaps implicitly in finalize()
- * (RUNNING or SHUTDOWN) -> STOP
- * On invocation of shutdownNow()
- * SHUTDOWN -> TIDYING
- * When both queue and pool are empty
- * STOP -> TIDYING
- * When pool is empty
- * TIDYING -> TERMINATED
- * When the terminated() hook method has completed
- *
- * Threads waiting in awaitTermination() will return when the
- * state reaches TERMINATED.
- *
- * Detecting the transition from SHUTDOWN to TIDYING is Less
- * straightforward than you'd like because the queue may become
- * empty after non-empty and vice versa during SHUTDOWN state, but
- * we can only terminate if, after seeing that it is empty, we see
- * that workerCount is 0 (which sometimes entails a recheck -- see
- * below).
- */
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
8. awaitTermination 等待关闭完成
从上面的 shutdown, 可以看到, 只是写了 SHUTDOWN 标识后, 尝试尽可能地中断停止 Worker 线程, 但并不保证中断成功. 要想保证停止完成, 需要有另外的机制来保证. 从 awaitTermination 的语义来说, 它是能保证任务停止完成的, 那么它是如何保证的呢?
- // ThreadPoolExecutor.awaitTermination()
- public boolean awaitTermination(long timeout, TimeUnit unit)
- throws InterruptedException {
- long nanos = unit.toNanos(timeout);
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- for (;;) {
- // 只是循环 ctl 状态, 只要 状态为 TERMINATED 状态, 则说明已经关闭成功
- // 此处 termination 的状态触发是在 tryTerminate 中触发的
- if (runStateAtLeast(ctl.get(), TERMINATED))
- return true;
- if (nanos <= 0)
- return false;
- nanos = termination.awaitNanos(nanos);
- }
- } finally {
- mainLock.unlock();
- }
- }
看起来, awaitTermination 并没有什么特殊操作, 而是一直在等待. 所以 TERMINATED 是 Worker 自行发生的动作.
那是在哪里做的操作呢? 其实是在获取任务的时候, 会检测当前状态是否是 SHUTDOWN, 如果是 SHUTDOWN 且 队列为空, 则会触发获取任务的返回 null. 从而结束当前 Worker.
Worker 在结束前会调用 processWorkerExit() 方法, 里面会再次调用 tryTerminate(), 当所有 Worker 都运行到这个点后, awaitTermination() 就会收到通知了.(注意: processWorkerExit() 会在每次运行后进行 addWorker() 尝试, 但是在 SHUTDOWN 状态的添加操作总是失败的, 所以不用考虑)
到此, 你是否可以解答前面的几个问题了呢?
来源: https://www.cnblogs.com/yougewe/p/12267274.html