五一假期大雄看了一本《java 并发编程艺术》, 了解了线程池的基本工作流程, 竟然发现线程池工作原理和互联网公司运作模式十分相似.
线程池处理流程
原理解析
互联网公司与线程池的关系
这里用一个比喻来描述一下线程池, 中间有一些名词你可能不是太清楚, 后边源码解析的部分会讲到.
你可以把线程池看作是一个研发部门, 研发部门有很多程序员 (Worker), 他们在一个大办公室里(HashSet workers). 程序员干不完的需求(Runnable/Callable) 放在需求池 (workQueue) 里排队. 每个研发部都配置有骨干程序员数量 (corePoolSize) 和最大能容纳的程序员数量(maximumPoolSize). 具体要做的任务就是产品的需求.
new 一个线程池相当于创建了一个研发部, 创建研发部时需要指定骨干程序员数量, 最大能容纳的程序员数量, 需求池用哪种 (BlockingQueue), 如果忙不过来的需求怎么给产品回复(拒绝策略) 等等内容. 刚开始这个研发部一个程序员也没有.
当产品给这个研发部提一个需求时(当然肯定不会只提一个, 他们会不断的提需求. 这里以提一个需求为例)
首先会看骨干程序员招聘满了没.
如果没满, 会招聘一个骨干程序员, 招聘进来就让他不停的工作(很残酷啊), 干完刚派过来的任务他会主动在需求池找下一个需求来做(好员工), 如果需求池没有需求了, 他就停止工作了, 然后研发部会把他裁掉, 如果裁掉后发现骨干程序员数量不够了, 就会再招聘一个程序员. 裁掉后, 要是骨干程序员数量还够就不招聘了.
如果骨干程序员数量满了, 就看需求池满没满, 如果需求池没满, 就把需求扔进需求池里; 如果需求池满了, 就看程序员数量有没有达到上限, 如果达到了, 就对产品说, 这个需求我们做不了, 没资源; 如果没达到, 就招聘一个程序员, 招聘进来就让他不停的工作, 干完刚派过来的需求他会主动到需求池找下一个任务来做, 如果需求池没有任务了, 他就停止工作了, 然后研发部会把他裁掉, 如果裁掉后发现骨干程序员数量不够了, 就会再招聘一个程序员. 裁掉后, 要是骨干程序员数量还够就不招聘了.
源码解析
首先是 worker(程序员)
Worker 被装在一个 HashSet(workers)里边, 他是用来执行任务的, 他们的职责就是不断的从 workQueue 里边取任务, 然后执行. 当 workQueue(需求池)里边拿不到任务, 或者线程池达到特定状态, worker 就会从 workers 里边移走(被裁).
下边是 Worker 源码, 移除了非关键的东西
- private final class Worker
- extends AbstractQueuedSynchronizer
- implements Runnable
- {
- // 标识这个任务是在哪个线程运行
- final Thread thread;
- Runnable firstTask;
- // 完成了几个任务
- volatile long completedTasks;
- Worker(Runnable firstTask) {
- // 阻止中断, 知道 runWorker 执行
- setState(-1); // inhibit interrupts until runWorker
- this.firstTask = firstTask;
- // 直接用你提供的线程工厂搞个线程出来
- this.thread = getThreadFactory().newThread(this);
- }
- // 调用 ThreadPoolExecutor 里边的 runWorker 方法
- public void run() {
- runWorker(this);
- }
- // 以下这些是 AQS 相关的东西
- // 0 代表没有加锁
- // 1 代表加锁了
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
- public void lock() { acquire(1); }
- public boolean tryLock() { return tryAcquire(1); }
- public void unlock() { release(1); }
- public boolean isLocked() { return isHeldExclusively(); }
- void interruptIfStarted() {
- Thread t;
- if (getState()>= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
Worker 实现了 Runnable 接口, 所以他是个任务, 有 run 方法; 同时有继承了 AQS, 所以他也是一把锁.
下边是提交任务的过程
提交任务有 submit 和 execute, submit 就是首先将 Callable 或者 Runnable 包装成 FutureTask, 然后调用 execute, 所以核心是分析 execute
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- // 这个 c 里边有两个信息, 一个是现在有多少 worker, 另一个是现在线程池的状态是啥
- // workerCountOf 方法就是从里边提取 worker 的数量的
- int c = ctl.get();
- if (workerCountOf(c) <corePoolSize) { // 当前 worker 的数量比需要的核心线程数少
- // 加 worker 去执行, 加成功就完事了, 也就是说只要 worker 比核心线程数少, 就会创建 worker
- // 不管现在核心线程是否在工作, 也不管 workQueue 是不是满的
- // addWorker 的第二个参数表示是不是要加核心线程(或者叫核心 worker)
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 当前 worker 达到或超过了核心线程数, 或者加 worker 失败了, 才会走下边的流程
- // worker 已经比核心线程数多了
- // 如果 线程池没有 shutdown 的话
- // 就尝试将任务加到 workQueue 里边, 工作队列入队成功的话再往里边走
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- // 再次检查状态如果线程池要停了, 那么就拒绝任务, 并且把 worker 从工作队列扔掉
- reject(command);
- else if (workerCountOf(recheck) == 0)
- // 如果没有 worker 的话(说明没加进去, 这种场景我没想到是什么情况), 加一个 worker
- addWorker(null, false);
- // 其他情况, 丢到工作队列就不用管了, 等着 worker 去处理
- }
- // 如果队列满了加失败了, 或者线程池状态不满足了, 就尝试加普通 worker(非核心线程)
- else if (!addWorker(command, false))
- // 加失败了就拒绝任务
- // 失败一方面可能是 worker 数量已经达到你的给的 maximumPoolSize
- // 另一方面, 可能是检查到线程池的状态不对了
- reject(command);
- }
可以发现 execute 方法就是完成了上边说的 "线程池处理流程" 这个图里描述的过程. 大雄看到这里还有几个疑问, 一个是 Woker 是如何创建并加入 workers 的, 一个是 worker 是如何启动的, 再就是 worker 是如何运行的
生活还要继续
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // 做一些校验, 线程池的状态要满足一定条件
- // 而且得提交任务过来, 再就是 workQueue 不能是空的
- if (rs>= SHUTDOWN &&
- ! (rs == SHUTDOWN &&
- firstTask == null &&
- ! workQueue.isEmpty()))
- return false;
- for (;;) {
- int wc = workerCountOf(c);
- // 看你是要创建核心 worker 还是普通 worker
- // 核心看超没超过 corePoolSize, 普通看超没超过 maximumPoolSize
- if (wc>= CAPACITY ||
- wc>= (core ? corePoolSize : maximumPoolSize))
- return false;
- if (compareAndIncrementWorkerCount(c))
- // 增加 worker 数量失败就在来
- break retry;
- c = ctl.get(); // Re-read ctl
- if (runStateOf(c) != rs)
- // 中途线程池状态发生变化了
- continue retry;
- }
- }
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // worker 就是这么创建的
- w = new Worker(firstTask);
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- // 加 worker 是要加全局锁的
- mainLock.lock();
- try {
- int rs = runStateOf(ctl.get());
- if (rs <SHUTDOWN ||
- (rs == SHUTDOWN && firstTask == null)) {
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- workers.add(w);
- int s = workers.size();
- if (s> largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- mainLock.unlock();
- }
- if (workerAdded) {
- // worker 是在这里启动的
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- if (! workerStarted)
- addWorkerFailed(w);
- }
- return workerStarted;
- }
这段代码解决了 Woker 是如何创建并加入 workers 的以及 worker 是如何启动的的问题.
addWorker 做的核心工作就是, 创建 worker, 启动 worker, 在创建之前还会做一些校验. 调用了 worker 里边线程的 start 后就要等待 CPU 调度执行 worker 的 run 方法了.
- public void run() {
- runWorker(this);
- }
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // task 是创建 worker 带进去的任务, 会先执行他, 然后从 workQueue 里边取
- // 如果没有的话跳出去
- while (task != null || (task = getTask()) != null) {
- w.lock(); // 首先加锁, 如果不加锁, 可能几个线程提交的任务同时进来了, 会导致一些共享状态出问题
- // 做一些状态的校验
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- // 执行任务前调用一下 beforeExecute, 默认是空的
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- // 这个跟我们平时理解的 Runnable 还不一样, 可以体会下, 他这个 run 就是一个普通的方法
- // 他直接调 run 是要执行任务, 线程的 start 只是把 worker 里边的那个 run 跑起来了
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- // 执行完了调一下, 里边可以拿到异常
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- // 从 while 跳出来表明没有任务可以执行了
- processWorkerExit(w, completedAbruptly);
- }
- }
这个也比较容易, 就是不断的从 workQueue 取任务, 执行, 直到没任务了跳出来. 接下来就是 worker 如何被销毁的问题了
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- // 移除掉 worker(裁员)
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- tryTerminate();
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 比核心线程数多的话, 执行完的 Worker 直接移除就好
- if (workerCountOf(c)>= min)
- return; // replacement not needed
- }
- // 小于核心线程数就会再加个 Worker, 让他继续等待接收任务(招人)
- addWorker(null, false);
- }
- }
直接从 workers 里边移除 worker, 移除后如果 worker 数量比核心线程数还少, 就再加个 worker, 否则不加.
一些体会
看源码一定不要过分纠结细节, 就像这个线程池, 我看网上很多文章去算那几个位运算的十进制数, 感觉是在浪费时间, 没有抓住重点.
当然这也不是绝对的(似乎说的矛盾了), 一些细节的设计还是非常精妙值得学习的. 还是这个位运算, 为什么只用一个 int 表示线程池状态和 worker 的数量呢.
要多多联想, 还是这个位运算, 他是不是和读写锁用一个 int 既表示写状态又表示读状态十分相似. Worker 继承 AQS, 是否能让你想起 AQS 的种种.
总之, 个人觉得第一遍看是一定不能沉溺于细节的, 他会让你迷惘和丧失信心; 第二遍, 第三遍可以关注一下细节, 感受大师级的设计的美妙之处. 当然笔者仅仅粗略看了一遍(逃~)
最后
大雄五一假期阅读了《java 并发编程艺术》这本书, 整理了一本 gitbook 笔记(还没写完), 需要的同学可以扫描文末二维码关注 "大雄和你一起学编程" 公众号, 后台回复我爱 java 领取. 这本 gitbook 还没彻底完成, 所以可能还有些小错误. 未来会大约每两天推送其中的一篇文章.
如下是这本 gitbook 的目录截图
来源: https://www.cnblogs.com/floor/p/12853748.html