Java 是一门多线程的语言, 基本上生产环境的 Java 项目都离不开多线程. 而线程则是其中最重要的系统资源之一, 如果这个资源利用得不好, 很容易导致程序低效率, 甚至是出问题.
有以下场景, 有个电话拨打系统, 有一堆需要拨打的任务要执行, 首先肯定是考虑多线程异步去执行. 假如我每执行一个拨打任务都 new 一个 Thread 去执行, 当同时有 1 万个任务需要执行的时候, 那么就会新建 1 万个线程, 加上线程各种初始销毁等操作, 这个消耗是巨大的. 而其实往往实现这些功能的时候, 并不是完全需要实时马上完成, 只是希望在可控范围内尽量提高执行的并发性能.
因此线程池技术应用而生, Java 中最常用的线程池技术就是 ThreadPoolExecutor. 接下来就整体看看 ThreadPoolExecutor 的实现.
这个类的注解非常多, 很多也是重点, 所以就不从注解开始看起. 先从使用说起, 有个概念先.
基本使用
- // 核心线程
- int corePoolSize = 5;
- // 最大线程
- int maximumPoolSize = 10;
- // 线程空闲回收时间
- int keepAliveTime = 30;
- // 线程空闲回调时间单位
- TimeUnit unit = TimeUnit.SECONDS;
- // 队列大小
- int queueSize = 20;
- // 队列
- BlockingQueue workQueue = new ArrayBlockingQueue<Runnable>(queueSize);
- ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- executor.execute(() -> {
- // do something 1
- });
- executor.execute(() -> {
- // do something 2
- });
定义好一些必要的参数, 构建一个 ThreadPoolExecutor 对象. 然后调用对象的 execute() 方法即可.
参数说明:
corePoolSize, 线程池保留的最小线程数. 如果线程池中的线程少于此数目, 则在执行 execut() 时创建.
maximumPoolSize, 线程池中允许拥有的最大线程数.
keepAliveTime,unit, 当线程闲置时, 保持线程存活的时间.
workQueue, 工作队列, 存放提交的等待任务, 其中有队列大小的限制.
线程管理机制
非常多人误解了 corePoolSize,maximumPoolSize,workQueue 的相互关系. 不少人认为无论队列选择什么, corePoolSize 和 maximumPoolSize 一定是有用, 定义一定是生效的, 其实并不然啊!
看下线程基本规则注解说明
默认情况下, 线程池在初始的时候, 线程数为 0. 当接收到一个任务时, 如果线程池中存活的线程数小于 corePoolSize 核心线程, 则新建一个线程.
如果所有运行的核心线程都都在忙, 超出核心线程处理的任务, 执行器更多地选择把任务放进队列, 而不是新建一个线程.
如果一个任务提交不了到队列, 在不超出最大线程数量情况下, 会新建线程. 超出了就会报错.
另外, 如果想在线程初始化时候就有核心线程, 可以调用 prestartCoreThread() 或 prestartAllCoreThread(), 前者是初始一个, 后者是初始全部.
再看看排队策略
直接提交, 用 SynchronousQueue. 特点是不保存, 直接提交给线程, 如果没没线程, 则新建一个.
无限提交, 用类似 LinkedBlockingQueue 无界队列. 特点是保存所以核心线程处理不了的任务, 队列无上限, 最大线程也没用.
有限提交, 用类似 ArrayBlockingQueue 有界队列. 特点是可以保存超过核心线程的任务, 并且队列也是有上限的. 超过上限, 新建线程 (满了抛错). 更好地保护资源, 防止崩溃, 也是最常用的排队策略.
从以上规则可以看出来, 核心线程数和最大线程数, 还有队列结构是相互影响的, 如何排队, 队列多大, 最大线程是多少都是不一定的.
再看看保持存活机制
当超过核心线程数的线程, 线程池会让该线程保持存活 keepAliveTime 时间, 超过该时间则会销毁该线程.
另外默认对非核心线程有效, 若想核心线程也适用于这个机制, 可以调用 allowCoreThreadTimeOut() 方法. 这样的话就没有核心线程这一说了.
综合以上, 线程池在多次执行任务后, 会一直维持部分线程存活, 即使它是闲置的. 这样的目的是为了减少线程销毁创建的开销, 下次有个任务需要执行, 直接从池子里拿线程就能用了. 但核心线程不能维护太多, 因为也需要一定开销. 最大的线程数保护了整个系统的稳定性, 避免并发量大的时候, 把线程挤满. 工作队列则是保证了任务顺序和暂存, 系统的可靠性. 线程存活规则的目的和维护核心线程的目的类似, 但降低了它的存活的时间.
另外还有拒绝机制, 它提供了一些异常情况下的解决方案.
ctl 线程状态控制
这个 ctl 变量是整个线程池的核心控制状态.
这个 ctl 代表了两个变量
workerCount, 生效的线程数. 基本可以理解为存活的线程, 但某个时候有暂时性的差异.
runState, 线程池的运行状态.
其中, ctl(int32 位) 的低 29 位代表 workerCount, 所以最大线程数为 (2^29)-1. 另外 3 位表示 runState.
runState 有以下几种状态:
RUNNING: 接收新任务, 处理队列任务.
SHUTDOWN: 不接收新任务, 但处理队列任务.
STOP: 不接收新任务, 也不处理队列任务, 并且中断所有处理中的任务.
TIDYING: 所有任务都被终结, 有效线程为 0. 会触发 terminated() 方法.
TERMINATED: 当 terminated() 方法执行结束.
当调用了 shutdown(), 状态会从 RUNNING 变成 SHUTDOWN, 不再接收新任务, 此时会处理完队列里面的任务.
如果调用的是 shutdownNow(), 状态会直接变成 STOP.
当线程或者队列都是空的时候, 状态就会变成 TIDYING.
当 terminated() 执行完的时候, 就会变成 TERMINATED.
execute()
带着对上面的规则与机制的认识, 现在从就这这个入口开始看看源码, 到底整个流程是怎么实现的.
如果少于核心线程在跑, 用这个任务尝试创建一个新线程.
如果一个任务成功入队, 再次检查下线程池状态看是否需要入队, 因为可能在入队过程中, 状态发送了变化. 如果确认入队且没有存活线程, 则新建一个空线程.
如果进不了队, 则尝试新建一个线程, 如果都失败了. 拒绝这个 task
对于第二点最后为什么新建一个线程? 很容易猜想到, 会有一个轮询的机制让下个 task 出队, 直接利用这个空闲线程.
注释基本解释了所有代码, 代码也没什么特别的. 其中最主要的还是 addWoker() 这个方法, 下面来看看.
addWoker()
先了解下这个方法的整体思路
从描述可知, addwoker 失败, 会在线程池状态不对, 线程满了或者线程工厂创建线程池失败时候发生.
这个方法比较长, 分两段看. 先看第一段.
retry: 这种写法, 如果比较少看源码的, 应该是前所未见的了. 这是个循环的位置标记, 是 java 的语法之一. 看回代码, 这里面 for 循环还嵌套里一个 for 循环, 而 retry: 是标记第一个 for 循环的, 后面 break 和 continue 语句都指向到了 retry. 说明 break 和 continue 是都是操作外层的 for 循环. retry 可以是任何变量命名合法的字符.
然后看看外出 for 循环的 if 语句
这个 if 判断想要执行到 return false;, 队列为空是一个必要条件. 因为 addWork() 不单只接收新任务会调用到, 处理队列中的任务也会调用到. 而前面提到 SHUTDOWN 状态下还会处理队列中的任务的, 所以队列不为空是会让它继续执行下去的.
对于内层的 for 循环
会先判断 worker 的数据是否符合 corePoolSize 和 maximumPoolSize 的定义, 不满足则返回失败.
然后尝试 CAS 让 workerCount 自增, 如果 CAS 失败还是继续自旋去自增, 直到成功. 除非线程池状态发生了变化, 发退回到外层 for 循环重新执行, 判断线程池的状态.
第一段的代码, 就是让 workerCount 在符合条件下自增
第二段代码
这段比较好理解, 先创建一个 Worker 对象, 这个 Worker 里面包含一个由线程工厂创建的线程, 和一个需要执行的任务 (可以为空). 如果线程创建成功了, 那么就加一个重入锁去把这个新建的 Worker 对象放到 workers 成员变量中, 在加入之前需要重新判断下线程池的状态和新建线程的状态. 如果 worker 添加到 workers 成员变量中, 就启动这个新建的线程. 最后如果添加失败, 则执行 addWorkFailed(w).
如果失败了, 加锁操作回滚下 wokers,workerCount, 然后判断下状态看看是否需要终结线程池.
addWorker() 大概的流程就这样.
总结
对于其他方法, 没有什么特别的, 在此不再过多的叙述, 有兴趣的可以翻翻源码阅读下.
回顾总结下上面的核心要点
当核心线程满且忙碌时, 线程池倾向于把提交的任务放进队列, 而不是新建线程.
根据选择队列的不同, maximumPoolSize 不一定有用的. 具体有三种不同的策略.
ctl 是线程池的核心控制状态, 包含的 runState 线程池运行状态和 workCount 有效线程数.
retry: 是一种标记循环的语法, retry 可以是任何变量命名合法字符.
更多技术文章, 精彩干货, 请关注
来源: https://www.cnblogs.com/zackku/p/9995284.html