本篇为 elasticsearch 源码分析系列文章的第十篇, 本篇延续上一篇 ElasticSearch 的 Plugin 引出的内容, 进行各种 Plugin 中线程池的分析
上篇讲到了 ElasticSearch 中插件的基本概念, 以及 Node 实例化中涉及到的 PluginService 初始化编码, 本篇将会继续研究 Node 实例化的过程中 PluginsService 发挥的作用, 也就是通过 PluginsService 中的参数构建线程池框架
线程池在何时初始化
当 Node 完成了 PluginsService 的构造后, 紧接会通过
getExecutorBuilders
方法取得线程池的 Executor 构造器列表, 代码如下:
List <ExecutorBuilder < ?>>executorBuilders = pluginsService.getExecutorBuilders(settings)
此时 PluginsService 对象中已经有了需要加载的所有 plugin 了, 包含 modules 路径和 plugins 路径中的所有组件, 这里统称为 plugin 如下图所示总共是包含了 13 个已加载的 Plugin, 分别是 modules 路径中的默认必须加载的 12 个和 Plugins 路径中的自定义安装的 1 个 (ICU 分词器) 如下图所示
构建线程池框架
初始化 ExecutorBuilder 集合
Node 实例化过程中, 通过代码:
List < ExecutorBuilder < ?>>executorBuilders = pluginsService.getExecutorBuilders(settings);
查找到自定义的线程池 Executor 构建器再获得自定义线程池构建器集合后, 开始构建线程池(ThreadPool)
ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
首先通过代码获得处理器 CPU 的数量,
Runtime.getRuntime().availableProcessors()
当然这个值是可以被 Setting 中设置的变量 processors 来覆盖的这个变量在代码中被标记为 availableProcessors 然后创建变量
halfProcMaxAt5, 这个变量的意思是 availableProcessors 的一半, 但最大不超过 5
halfProcMaxAt10, 这个变量的意思是 availableProcessors 的一半, 但最大不超过 10
这两个变量在后面创建各种线程池构造器中反复用到
在确定了可使用的处理器数量后, 就能确定线程池的最小值(genericThreadPoolMax),ElasticSearch 中是确定为: 可用 CPU 处理器数量的 4 倍, 且固定范围为最小 128, 最大为 512
由此可见如果用一般服务器的话, 线程池上限最终会被确定为 128, 可以说还是比较高的设定了
接下来开始构造执行不同操作时线程池 Executor,ElasticSearch 中把各个操作的 Executor 构造为 Map,
Map<String, ExecutorBuilder>
, 下面是各个 Executor 对象的解释:
普通操作的 Executor: 构建一个可伸缩的 Executor 构建器, value 为 ScalingExecutorBuilder 对象接收参数和对应操作如下:
name: 线程池执行者的名称, 也就是 generic
core: 线程池中线程的最小值, 固定为 4 将
thread_pool.generic.core
的设为这个值
max: 线程池中线程的最大值, 对应上面提到的 genericThreadPoolMax, 在本机跑的结果是 128
keepAlive: 超过 4 个线程后, 线程保持活跃的时间这个值固定为 30 秒这个参数被设定为变量
thread_pool.generic.keep_alive
索引操作的 Executor: 构建一个固定的 Executor 构建器 key 为 index,value 为 FixedExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.index.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 idnex
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.index.size
的值为 size 的值, 本机跑的结果是 4
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.index.queue_size
的值为 200, 注意这个值固定为 200
批处理操作的 Executor: 构建一个固定的 Executor 构建器 key 为 bulk,value 为 FixedExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.bulk.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 bulk
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.bulk.size
的值为 size 的值, 本机跑的结果是 4
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.bulk.queue_size
的值为 200, 注意这个值固定为 200
get 操作的 Executor: 构建一个固定的 Executor 构建器 key 为 get,value 为 FixedExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.get.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 get
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.get.size
的值为 size 的值, 本机跑的结果是 4
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.get.queue_size
的值为 1000, 注意这个值固定为 1000
查询操作的 Executor: 构建一个根据利特尔法则自动扩展长度的 Executor 构建器, 这个构建器的逻辑与其他构建器不同, 也显得比较复杂, 也说明了对于查询操作, ElasticSearch 做了特殊的优化 key 为 search,value 为 AutoQueueAdjustingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.search.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 search
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.search.size
的值为 size 的值, 本机跑的结果是 7
initialQueueSize: 初始化队列的大小, 固定设置为 1000, 造配置变量
thread_pool.search.queue_size
的值为 200
minQueueSize: 队列的最小长度, 固定设置为 1000 设定配置变量
thread_pool.search.min_queue_size
的值为 1000
maxQueueSize: 队列的最大长度, 固定设置为 1000, 设定配置变量
thread_pool.search.max_queue_size
的值为 1000
frameSize: 队列的步进长度, 固定设置为 2000, 构造配置变量
thread_pool.search.auto_queue_frame_size
的值为 200, 注意这个值固定为 200
thread_pool.search.target_response_time
针对 search 操作的相应被设置为 1S,
管理操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 management,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.management.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 management,
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.management.size
的值为 size 的值, 本机跑的结果是 1
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.management.queue_size
的值为 200, 注意这个值固定为 200
keepAlive: 超过 1 个线程后, 线程保持活跃的时间这个值固定为 5 分钟这个参数被设定为变量
thread_pool.management.keep_alive
监听操作的 Executor: 构建一个固定的 Executor 构建器 key 为 listener,value 为 FixedExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.listener.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 listener,
size: 线程的固定大小, 上文提到的 halfProcMaxAt10, 和参数 name 一起构造配置变量
thread_pool.listener.size
的值为 size 的值, 本机跑的结果是 2
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.listener.queue_size
的值为 **-1**, 意思就没有阻塞队列
flush 操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 flush,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.flush.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 flush,
size: 线程的固定大小, 上文提到的 halfProcMaxAt5, 和参数 name 一起构造配置变量
thread_pool.flush.size
的值为 size 的值, 本机跑的结果是 4
keepAlive: 超过 1 个线程后, 线程保持活跃的时间这个值固定为 5 分钟这个参数被设定为变量
thread_pool.management.keep_alive
refresh 操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 refresh,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.refresh.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 refresh,
size: 线程的固定大小, 上文提到的 halfProcMaxAt10, 和参数 name 一起构造配置变量
thread_pool.refresh.size
的值为 size 的值, 本机跑的结果是 4
keepAlive: 超过 1 个线程后, 线程保持活跃的时间这个值固定为 5 分钟这个参数被设定为变量
thread_pool.management.keep_alive
warmer 操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 warmer,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.warmer.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 warmer,
size: 线程的固定大小, 上文提到的 halfProcMaxAt5, 和参数 name 一起构造配置变量
thread_pool.warmer.size
的值为 size 的值, 本机跑的结果是 4
keepAlive: 超过 1 个线程后, 线程保持活跃的时间这个值固定为 5 分钟这个参数被设定为变量
thread_pool.management.keep_alive
snapshot 操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 snapshot,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.snapshot.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 snapshot,
size: 线程的固定大小, 上文提到的 halfProcMaxAt5, 和参数 name 一起构造配置变量
thread_pool.snapshot.size
的值为 size 的值, 本机跑的结果是 4
keepAlive: 超过 1 个线程后, 线程保持活跃的时间这个值固定为 5 分钟这个参数被设定为变量
thread_pool.management.keep_alive
碎片处理操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为
fetch_shard_started
,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.fetch_shard_started.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 fetch_shard_started,
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.fetch_shard_started.size
的值为 size 的值, 本机跑的结果是 4
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.fetch_shard_started.queue_size
的值为 200, 注意这个值固定为 200
强制 merge 操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 force_merge,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.force_merge.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 force_merge,
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.force_merge.size
的值为 size 的值, 本机跑的结果是 4
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.force_merge.queue_size
的值为 200, 注意这个值固定为 200
获取碎片操作的 Executor: 构建一个可伸缩的 Executor 构建器 key 为 fetch_shard_store,value 为 ScalingExecutorBuilder 对象, 接收参数和对应操作如下:
settings:Node 的配置 settings 设定配置变量
thread_pool.fetch_shard_store.size
的值为该参数中 cpu 的数量
name: 线程池执行者的名称, 也就是 fetch_shard_store,
size: 线程的固定大小, 和参数 name 一起构造配置变量
thread_pool.fetch_shard_store.size
的值为 size 的值, 本机跑的结果是 4
queueSize: 阻塞队列的大小, 构造配置变量
thread_pool.fetch_shard_store.queue_size
的值为 200, 注意这个值固定为 200
至此就完成了 org.elasticsearch.threadpool.ThreadPool 对象的创建
ThreadPool 对象的作用
得到 ThreadPool 的对象后, 通过线程池进行了 NodeClient 的构建
client = new NodeClient(settings, threadPool);
和 ResourceWatcherService 对象的构建,
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
后面还有很多的组件都用到了线程池, 比如:
- IngestService
- ClusterInfoService
- MonitorService
- ActionModule
- IndicesService
- NetworkModule
- TransportService
- DiscoveryModule
- NodeService
可以看出都是 ElasticSearch 的核心组件, 这些组件的功能和原理, 我都会在以后的文章中讲解, 而像 ElasticSearch 这种存储搜索系统来说 IO 操作肯定非常频繁, 而线程池是专门致力于解决系统的 IO 问题, 它在这些服务组件中的作用也显得愈发重要
利特尔法则
查询操作中提到的利特尔法则是一种描述稳定系统中, 三个变量之间关系的法则
其中 L 表示平均请求数量,λ表示请求的频率, W 表示响应请求的平均时间举例来说, 如果每秒请求数为 10 次, 每个请求处理时间为 1 秒, 那么在任何时刻都有 10 个请求正在被处理回到我们的话题, 就是需要使用 10 个线程来进行处理如果单个请求的处理时间翻倍, 那么处理的线程数也要翻倍, 变成 20 个
理解了处理时间对于请求处理效率的影响之后, 我们会发现, 通常理论上限可能不是线程池大小的最佳值线程池上限还需要参考任务处理时间
假设 JVM 可以并行处理 1000 个任务, 如果每个请求处理时间不超过 30 秒, 那么在最坏情况下, 每秒最多只能处理 33.3 个请求然而, 如果每个请求只需要 500 毫秒, 那么应用程序每秒可以处理 2000 个请求
来源: https://juejin.im/post/5a7bf978f265da4e8f0493d7