承接 Spring 源码情操陶冶 - 自定义节点的解析线程池是 jdk 的一个很重要的概念, 在很多的场景都会应用到, 多用于处理多任务的并发处理, 此处借由 spring 整合 jdk 的 cocurrent 包的方式来进行深入的分析
spring 配置文件样例
配置简单线程池
- <task-executor keep-alive="60" queue-capacity="20" pool-size="5" rejection-policy="DISCARD">
- </task-executor>
以上属性是 spring 基于 task 命令空间提供的对外属性, 一般是线程池的基础属性, 我们可以剖析下相应的解析类具体了解下 spring 是如何整合线程池的
ExecutorBeanDefinitionParser - 解析 task-executor 节点
我们可以直接看下解析的具体方法 doParse(), 代码如下
- @Override
- protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
- String keepAliveSeconds = element.getAttribute("keep-alive");
- if (StringUtils.hasText(keepAliveSeconds)) {
- builder.addPropertyValue("keepAliveSeconds", keepAliveSeconds);
- }
- String queueCapacity = element.getAttribute("queue-capacity");
- if (StringUtils.hasText(queueCapacity)) {
- builder.addPropertyValue("queueCapacity", queueCapacity);
- }
- configureRejectionPolicy(element, builder);
- String poolSize = element.getAttribute("pool-size");
- if (StringUtils.hasText(poolSize)) {
- builder.addPropertyValue("poolSize", poolSize);
- }
- }
很简单就是读取我们第一部分所罗列的属性值, 并塞入至 BeanDefinitionBuilder 对象的属性集合中借此
对任务的拒绝策略属性解析方法
configureRejectionPolicy()
我们也可以简单的看下
- private void configureRejectionPolicy(Element element, BeanDefinitionBuilder builder) {
- String rejectionPolicy = element.getAttribute("rejection-policy");
- if (!StringUtils.hasText(rejectionPolicy)) {
- return;
- }
- String prefix = "java.util.concurrent.ThreadPoolExecutor.";
- if (builder.getRawBeanDefinition().getBeanClassName().contains("backport")) {
- prefix = "edu.emory.mathcs.backport." + prefix;
- }
- String policyClassName;
- if (rejectionPolicy.equals("ABORT")) {
- policyClassName = prefix + "AbortPolicy";
- }
- else if (rejectionPolicy.equals("CALLER_RUNS")) {
- policyClassName = prefix + "CallerRunsPolicy";
- }
- else if (rejectionPolicy.equals("DISCARD")) {
- policyClassName = prefix + "DiscardPolicy";
- }
- else if (rejectionPolicy.equals("DISCARD_OLDEST")) {
- policyClassName = prefix + "DiscardOldestPolicy";
- }
- else {
- policyClassName = rejectionPolicy;
- }
- builder.addPropertyValue("rejectedExecutionHandler", new RootBeanDefinition(policyClassName));
- }
由此可看出拒绝策略采取的基本上是
java.util.concurrent.ThreadPoolExecutor
工具包下的静态内部类, 同时也支持用户自定义的拒绝策略对拒绝策略此处作下小总结
rejection-policy(Spring) | jdk 对应类 | 含义 |
---|---|---|
ABORT | java.util.concurrent.ThreadPoolExecutor.AbortPolicy | 抛出拒绝的异常信息 |
CALLER_RUNS | java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy | 直接执行对应的任务 (线程池不关闭) |
DISCARD | java.util.concurrent.ThreadPoolExecutor.DiscardPolicy | 直接丢弃任务 |
DISCARD_OLDEST | java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy | 直接丢弃队列最老的任务 (最靠头部),塞入此任务到队列尾部 |
bean 实体类
那么我们肯定要知道是 spring 的哪个 bean 来实例化我们的线程池配置呢? 答案就在 getBeanClassName() 方法
- @Override
- protected String getBeanClassName(Element element) {
- return "org.springframework.scheduling.config.TaskExecutorFactoryBean";
- }
直接通过
TaskExecutorFactoryBean
bean 工厂来实例化线程池, 很有意思, 我们也别忘了其获取实例化对象其实是调用了其内部的 getObject() 方法我们继续跟踪把
TaskExecutorFactoryBean - 线程池 bean 工厂类
看继承结构, 我们细心的发现其实现了 InitializingBean 接口, 那我们直接关注
afterPropertiesSet()
方法
- @Override
- public void afterPropertiesSet() throws Exception {
- // 实例化的 bean 类型为 ThreadPoolTaskExecutor
- BeanWrapper bw = new BeanWrapperImpl(ThreadPoolTaskExecutor.class);
- determinePoolSizeRange(bw);
- // 基本属性保存
- if (this.queueCapacity != null) {
- bw.setPropertyValue("queueCapacity", this.queueCapacity);
- }
- if (this.keepAliveSeconds != null) {
- bw.setPropertyValue("keepAliveSeconds", this.keepAliveSeconds);
- }
- if (this.rejectedExecutionHandler != null) {
- bw.setPropertyValue("rejectedExecutionHandler", this.rejectedExecutionHandler);
- }
- if (this.beanName != null) {
- bw.setPropertyValue("threadNamePrefix", this.beanName + "-");
- }
- // 实例化 ThreadPoolTaskExecutor 对象
- this.target = (TaskExecutor) bw.getWrappedInstance();
- // 并执行相应的 afterPropertiesSet 方法
- if (this.target instanceof InitializingBean) {
- ((InitializingBean) this.target).afterPropertiesSet();
- }
- }
真正实例化的对象为
ThreadPoolTaskExecutor.class
task-executor 指定的 pool-size 支持 - 作为分隔符, 比如 2-4, 表示线程池核心线程数为 2 个, 最大线程数为 4; 如果没有分隔符, 则最大线程数等同于核心线程数
task-executor 如果指定 pool-size 为 0-4 且 queue-capacity 为 null, 则会指定线程池的
allowCoreThreadTimeout
为 true, 表明支持核心线程超时释放, 默认不支持
ThreadPoolTaskExecutor.class
也是 InitializingBean 的实现类, 也会被调用
afterPropertiesSet()
方法
ThreadPoolTaskExecutor#afterPropertiesSet()- 实例化
直接查看
initializeExecutor()
初始化 jdk 对应的线程池
- // 默认的 rejectedExecutionHandler 为 AbortPolicy 策略
- @Override
- protected ExecutorService initializeExecutor(
- ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
- // 先创建阻塞的队列
- BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
- // 创建线程池
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
- queue, threadFactory, rejectedExecutionHandler);
- // 设置是否允许核心线程超时被收回默认不允许
- if (this.allowCoreThreadTimeOut) {
- executor.allowCoreThreadTimeOut(true);
- }
- this.threadPoolExecutor = executor;
- return executor;
- }
我们按照上述的注释, 按照两步走进行解析
阻塞队列的创建
- protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
- if (queueCapacity> 0) {
- return new LinkedBlockingQueue<Runnable>(queueCapacity);
- }
- else {
- return new SynchronousQueue<Runnable>();
- }
- }
默认情况下是创建
LinkedBlockingQueue
链式队列, 因为默认的 queueCapacity 大小为 Integer.MAX_VALUE 而 queueCapacity 为 0 的情况下则采取 SynchronousQueue 同步队列, 其约定塞入一个元素必须等待另外的线程消费其内部的一个元素, 其内部最多指定一个元素用于被消费
线程池创建
用到最基本的构造方法
- ThreadPoolExecutor()
- /**
- * Creates a new {@code ThreadPoolExecutor} with the given initial
- * parameters.
- *
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param maximumPoolSize the maximum number of threads to allow in the
- * pool
- * @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the {@code keepAliveTime} argument
- * @param workQueue the queue to use for holding tasks before they are
- * executed. This queue will hold only the {@code Runnable}
- * tasks submitted by the {@code execute} method.
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- * @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached
- * @throws IllegalArgumentException if one of the following holds:<br>
- * {@code corePoolSize <0}<br>
- * {@code keepAliveTime <0}<br>
- * {@code maximumPoolSize <= 0}<br>
- * {@code maximumPoolSize <corePoolSize}
- * @throws NullPointerException if {@code workQueue}
- * or {@code threadFactory} or {@code handler} is null
- */
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
篇幅限于过长, 具体就不解释了直接看注释就明白了
- TaskExecutorFactoryBean#getObject()- 获取实体类
- public TaskExecutor getObject() {
- return this.target;
- }
由上述可知 this.target 对应的 class 类为
ThreadPoolTaskExecutor
, 其内部已实例化了 ThreadPoolExecutor 线程池对象
小结
Spring 创建线程池本质上是通过 ThreadPoolExecutor 的构造方法来进行创建的由此可知 jdk 的 concurrent 工具包很有参考价值
Spring 默认指定的线程池队列为
LinkedBlockingQueue
链式队列, 默认支持无限的任务添加, 用户也可以指定 queue-capacity 来指定队列接受的最多任务数; 并采用 AborPolicy 策略来拒绝多余的任务
Spring 指定的 pool-size 支持 - 分隔符, 具体解释见上文
Spring 的 task-executor 多与 task-scheduler 和
task-scheduled-tasks
搭配使用, 具体见后文分析
来源: https://www.cnblogs.com/question-sky/p/8709929.html