背景
在某地市开展项目的时候, 发现数据采集, 数据探索, 预处理, 数据统计, 训练预测都需要很多资源, 现场资源不够用.
目前该项目的资源 3 台旧的服务器, 每台的资源 内存为 128G,cores 为 24 (core 可暂时忽略, 以下仅考虑内存即可) .
案例分析
我们先对任务分别分析, 然后分类.
数据采集基于 DC, 接的是 Kafka 的源, 属于流式, 常驻任务. kafka 来新数据时才需要资源, 空闲时可释放. 目前占用的资源情况为: 28( topic 数)*2(线程数)*1G = 56G, 且该值会随着带采集增量表数量的增加而增加.
数据探索主要是算法人员使用命令行或是使用智能融合平台的相关功能进行数据探索, 属于临时任务, 但会同时有多个并发, 且使用的资源跟具体的要处理的数据量和业务有关, 一般算法人员会将此值设置得很大. 目前平台有两个 Thrift server 服务, 都占用 17G 内存, 共计 34G.
预处理单个需要的资源为 4G*4+4G=20G, 可并发执行, 耗用的资源等于 13G * 并发数.
数据统计需要的资源为 5G*2+3G=13G, 一般十分钟左右.
算法训练需要的资源为 12G*3=36G, 此为默认值, 现场一般都会调得比较高.
算法预测需要的资源相对较少, 此处先忽略.
可以看到任务大体分为以下几类:
常驻任务. 此类任务一般在空闲时不需要资源, 这是典型的动态资源使用场景. 如: 流式采集, Thrift Server, 算法训练平台的预提交任务等.
临时任务. 此类任务又分为两种:
单一任务, 且与数据量基本固定. 此时需要的资源可以固化下来. 另外对优先级极高的应用也可归为此类.
单一任务, 但与数据量相关. 如每次面对的数据量不同, 典型的应用是统计任务, 数据量在不断增多, 且每天的增量不固定, 此时可以使用动态资源
多个任务. 此类任务一般是面临的场景完全未知, 比如说预处理任务, 训练任务. 我们不清楚任务的内部详情, 完全无法准确预估资源, 只能设置最大值或是每次提交任务都单独设置. 这其实要求用户有任务调优经验, 对用户的要求较高.
定时任务. 此类任务与临时任务类似, 只是加上了简单调度功能. 如数据统计.
通过分析可以知道, 很多 Spark 应用都是需要动态资源分配的, 很多用户通过 UI 经常触发的任务也可使用动态资源规划在不损失更多资源的情况下变成常驻任务来提高响应速度.
动态资源分配机制
Spark 提供了基于应用工作负载来动态分配资源的机制, 这意味着应用可以根据需要想资源管理器 (比如说 YARN) 释放资源和再请求资源. 如果多个应用共享资源的话, 这个特性是非常有用的.
需要首先说明的是, 这套机制的基本单元是 Executor, 类似于其它产品中的 Slot, 这里的单个 Executor 的资源可通过 spark.executor.memory 和 spark.executor.cores 分配配置其占用的内存及核数.
由于无法确切地知道什么时候需要请求 Executor 和移除 Executor,Spark 制定了一套请求和移除的机制.
请求机制. 如果查看队列中有挂起的任务, 且挂起的时间超过 spark.dynamicAllocation.schedulerBacklogTimeout 秒, 则请求 Executor, 按轮次请求, 每轮按指数增加, 如: 1, 2, 4, 8 ......
移除机制. 如果一个 Executor 空闲时间超过 spark.dynamicAllocation.executorIdleTimeout 秒, 则移除. 需要注意的是, 在大多数场景下, 这个与请求机制是互斥的, 也就是说, 如果还有挂起的任务, 那就不应该释放资源.
满足移除机制, 还有两个细节需要处理才能移除 Executor.
给其他 Executor 提供 shuffle 数据服务. Spark 系统在运行含 shuffle 过程的应用时, Executor 进程除了运行 task, 还要负责写 shuffle 数据, 给其他 Executor 提供 shuffle 数据. 当 Executor 进程任务过重, 导致 GC 而不能为其他 Executor 提供 shuffle 数据时, 会影响任务运行. External shuffle Service 是长期存在于 NodeManager 进程中的一个辅助服务. 通过该服务来抓取 shuffle 数据, 减少了 Executor 的压力, 在 Executor GC 的时候也不会影响其他 Executor 的任务运行. 我们可以在 Executor 完成后就移除它, 由 External shuffle Service 给其他 Executor 继续提供 shuffle 数据服务.
缓存数据. 写 shuffle 文件的时候, Executor 也会缓存数据到磁盘或内存中, 一旦 Executor 移除, 这部分数据也会不可访问, 因此只要有缓存数据, Executor 就不会被移除. 设置 spark.dynamicAllocation.cachedExecutorIdleTimeout 可在即使有缓存数据的情况下也能在超时的时候移除 Executor, 该值默认为无线大. 后续这个可能会被优化, 类似于使用 External shuffle Service.
动态资源分配配置
配置 External shuffle Service
修改 hadoop-env.sh , 将添加到 classpath
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/lib/LOCALCLUSTER/spark/yarn/spark-2.1.0-yarn-shuffle.jar
其中
/usr/lib/LOCALCLUSTER/spark/
为 Spark home 目录
修改 yarn-site.xml:
- <property>
- <name>yarn.nodemanager.aux-services</name>
- <value>mapreduce_shuffle**,spark_shuffle**</value>
- </property>
- <property>
- <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
- <value>org.apache.spark.network.yarn.YarnShuffleService</value>
- </property>
修改 yarn-env.sh 中的 YARN_HEAPSIZE 变量, 默认值为 1000(Mb). 提升这个变量的值可以避免 shuffle 时的 GC 问题.
重启所有节点的 nodemanager 服务
重启相关需要动态资源分配的服务
配置 Spark 程序
此处仅列出最常用的参数, 具体见附录一
属性 | 默认值 | 说明 |
---|---|---|
spark.executor.memory | 1G | 单个 executor 的内存,推荐 1G,由于动态资源分配是基于 Executor 的,单个 Executor 的内存不宜过大。 |
spark.yarn.executor.memoryOverhead 或 spark.executor.memoryOverhead | executorMemory * 0.10, with minimum of 384 | 分配给单个 executor 的堆外内存 ,一个 Executor 可用的内存为 spark.executor.memory + spark.executor.memoryOverhead |
spark.executor.cores | 1 | 单个 executor 可用核数,与可并行执行的任务数相关,多个任务共享 spark.executor.memory,增大可提供并行度,也会加大 OOM 的风险 |
spark.dynamicAllocation.enabled | false | 启用动态资源分配,必须设置为 true |
spark.shuffle.service.enabled | false | 启用外部 shuffle 服务,必须设置为 true |
spark.dynamicAllocation.minExecutors | 0 | 最小可用 cores,建议设置成 1 |
spark.dynamicAllocation.maxExecutors | infinity | 最大可用 cores,必须设置 |
一个实际的应用如下:
- /usr/lib/LOCALCLUSTER/SERVICE-SPARK-retro/sbin/start-thriftserver.sh \
- --name "Awaken Insight Thrift Server" \
- --master yarn-client --queue applications-retro \
- --conf spark.driver.memory=10g
- --conf spark.yarn.executor.memoryOverhead=2048
- --conf spark.eventLog.enabled=false \
- --conf spark.dynamicAllocation.enabled=true \
- --conf spark.shuffle.service.enabled=true \
- --conf spark.dynamicAllocation.minExecutors=1 \
- --conf spark.dynamicAllocation.maxExecutors=12 \
- --conf spark.executor.memory=1g \
- --conf spark.executor.cores=1
可在 spark-default.conf 下配置类似参数, 可对所有应用生效 .(不推荐)
更好地使用动态资源分配
由于动态资源分配思想其实是建议将单个 Executor 的资源设置一个比较小的值, 如 1G. 而实际上此值一般设置得比较大, 主要是为了防止 OOM. 那为了更好地使用动态资源分配, 必须解决此问题.
Executor OOM 类错误 (错误代码 137,143 等)一般是由于堆内存 已达上限, Task 需要更多的内存, 而又得不到足够的内存而导致. 因此, 解决方案要从增加每个 Task 的内存使用量, 满足任务需求 或 降低单个 Task 的内存消耗量, 从而使现有内存可以满足任务运行需求两个角度出发. 因此:
增加单个 task 的内存使用量
增加
spark.executor.memory
, 使每个 Task 可使用内存增加.
降低 Executor 的可用 Core 的数量
spark.executor.cores
, 使 Executor 中同时运行的任务数减少, 在总资源不变的情况下, 使每个 Task 获得的内存相对增加.
降低单个 Task 的内存消耗量
降低单个 Task 的内存消耗量可从配制方式和调整应用逻辑两个层面进行优化:
配置方式
减少每个 Task 处理的数据量, 可降低 Task 的内存开销, 在 Spark 中, 每个 partition 对应一个处理任务 Task, 因此, 在数据总量一定的前提下, 可以通过增加 partition 数量的方式来减少每个 Task 处理的数据量, 从而降低 Task 的内存开销. 针对不同的 Spark 应用类型, 存在不同的 partition 调整参数如下:
P = spark.default.parallism (非 SQL 应用) 有父 RDD 的, 以他们的 partition 数为主, 没有的 (如 parallelize) 取决于所有 numExcutors*executorCore, 最小为 2.
P = spark.sql.shuffle.partition (SQL 应用) 默认值 200
通过增加 P 的值, 可在一定程度上使 Task 现有内存满足任务运行
注: 当调整一个参数不能解决问题时, 上述方案应进行协同调整
调整应用逻辑
Executor OOM 一般发生 Shuffle 阶段, 该阶段需求计算内存较大, 且应用逻辑对内存需求有较大影响, 下面举例就行说明:
选择合适的算子. 如: groupByKey 转换为 reduceByKey
一般情况下, groupByKey 能实现的功能使用 reduceByKey 均可实现, 而 ReduceByKey 存在 Map 端的合并, 可以有效减少传输带宽占用及 Reduce 端内存消耗.
数据倾斜预处理
数据倾斜是指任务间处理的数据量存大较大的差异.
如左图所示, key 为 010 的数据较多, 当发生 shuffle 时, 010 所在分区存在大量数据, 不仅拖慢 Job 执行 (Job 的执行时间由最后完成的任务决定). 而且导致 010 对应 Task 内存消耗过多, 可能导致 OOM. 而右图, 经过预处理(加盐, 此处仅为举例说明问题, 解决方法不限于此) 可以有效减少数据倾斜导致 的问题
注: 上述举例仅为说明调整应用逻辑可以在一定程序上解决 OOM 问题, 解决方法不限于上述举例
动态资源分配效果
本文主要针对 1051847284 条过车记录 (约 10 亿) 进行如下操作, 分别记录时间.
类型 | SQL |
---|---|
count | select count(1) from sparta_pass_di |
全局排序 | select * from sparta_pass_di order by passTime desc limit 10 |
聚合排序 | select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; |
过滤查询 | select * from sparta_pass_di where plateNo = '粤 GU54MX' limit 10; |
Thrift server V.S. spark sql
以融合平台的 Thrift Server 为例, 先简单对比 Thrift server 与 spark sql 之间的性能差异, 如下图结果可知, 总资源一致的情况下基本没有太大差异:
sql / 命令 / 时间 | spark-sql --master yarn-client --driver-memory 10G --driver-cores 1 --executor-memory 6G --executor-cores 1 --num-executors 2 --conf spark.sql.shuffle.partition=500 | start-thriftserver.sh --master yarn-client --driver-memory 10G --num-executors 2 --conf spark.driver.memory=10g --executor-memory 6g --conf spark.sql.shuffle.partition=500 |
---|---|---|
select count(1) from sparta_pass_di | 6 s | 7 s |
select * from sparta_pass_di order by passTime desc limit 10 | 21 min | 20 min |
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; | 2.0 min | 1.4 min |
select * from sparta_pass_di where plateNo = '粤 GU54MX' limit 10; | 0.4 s | 0.3 s |
静态资源分配 V.S. 动态资源分配
在上述前提下, 对比静态资源分配和动态资源分配之间的差异, 可以看到在明显耗时的全局排序耗时明显更短, 其余性能差距不大, 但是空闲资源会被释放.
sql / 命令 / 时间 | spark-sql --driver-memory 10G --driver-cores 1 --executor-memory 1G --executor-cores 1 --num-executors 12 --conf spark.sql.shuffle.partition=500 | start-thriftserver.sh --driver-memory 10G --driver-cores 1 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=12 --conf spark.executor.memory=1g --conf spark.executor.cores=1 --conf spark.sql.shuffle.partition=500 |
---|---|---|
select count(1) from sparta_pass_di | 8 s | 10 s |
select * from sparta_pass_di order by passTime desc limit 10 | 11 min | 4.1 min |
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; | 57 s | 51 s |
select * from sparta_pass_di where plateNo = '粤 GU54MX' limit 10; | 0.5 s | 0.5 s |
shuffle.partition 多 V.S. 少
spark.sql.shuffle.partition 的默认值为 200, 增加 spark.sql.shuffle.partition 到 500, 没有看到明显的性能提升
sql / 命令 / 时间 | spark-sql --master yarn-client --driver-memory 10G --driver-cores 1 --executor-memory 6G --executor-cores 1 --num-executors 2 \ | spark-sql --master yarn-client --driver-memory 10G --driver-cores 1 --executor-memory 6G --executor-cores 1 --num-executors 2 --conf spark.sql.shuffle.partition=500 |
---|---|---|
select count(1) from sparta_pass_di | 9 s | 6 s |
select * from sparta_pass_di order by passTime desc limit 10 | 23 min | 21 min |
select plateNo, count(1) as cnt from sparta_pass_di group by plateNo order by cnt desc limit 10; | 1.6 min | 2.0 min |
select * from sparta_pass_di where plateNo = '粤 GU54MX' limit 10; | 0.3 s | 0.4 s |
阶段性总结一下, 动态资源分配在简单任务性能与静态资源分配差不多, 在复杂任务中性能提升较多, 可能是由于常驻的外部 shuffle 服务带来的性能提升, 需要进一步测试验证. 而由于实际应用中 executor 内存都分配得较大, 总资源一定的情况下, 使得任务的并行度较小, 任务执行更慢(21 min V.S. 4.1 min). 通过增加 spark.sql.shuffle.partition 来提升任务并行度, 没有看到明显的性能提升.
可能约束
流式采集使用 DC, 是否可用动态分配. 当前 DC 并未使用 Spark, 动态资源分配可能需要额外的开发.
算法引擎的资源管控可能会失效. 由于资源未知, 只设定了范围([最小值, 最大值]), 那资源管控到底以哪一个为主?
总结
本案例针对现场出现的资源不足问题做了分析, 对任务进行了分类, 然后引入动态分配机制, 对融合平台的 Thrift Server 做了几组测试, 可以看到动态资源分配优势较大, 建议推广.
参考文献
https://www.jianshu.com/p/10e91ace3378
附录一, 动态资源分配参数说明
Dynamic Allocation ()
Property Name | Default | Meaning |
---|---|---|
spark.dynamicAllocation.enabled | false | Whether to use dynamic resource allocation, which scales the number of executors registered with this application up and down based on the workload. For more detail, see the description here . This requires spark.shuffle.service.enabled to be set. The following configurations are also relevant: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio |
spark.dynamicAllocation.executorIdleTimeout | 60s | If dynamic allocation is enabled and an executor has been idle for more than this duration, the executor will be removed. For more detail, see this description . |
spark.dynamicAllocation.cachedExecutorIdleTimeout | infinity | If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration, the executor will be removed. For more details, see this description . |
spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors | Initial number of executors to run if dynamic allocation is enabled. If --num-executors (or spark.executor.instances) is set and larger than this value, it will be used as the initial number of executors. |
spark.dynamicAllocation.maxExecutors | infinity | Upper bound for the number of executors if dynamic allocation is enabled. |
spark.dynamicAllocation.minExecutors | 0 | Lower bound for the number of executors if dynamic allocation is enabled. |
spark.dynamicAllocation.executorAllocationRatio | 1 | By default, the dynamic allocation will request enough executors to maximize the parallelism according to the number of tasks to process. While this minimizes the latency of the job, with small tasks this setting can waste a lot of resources due to executor allocation overhead, as some executor might not even do any work. This setting allows to set a ratio that will be used to reduce the number of executors w.r.t. full parallelism. Defaults to 1.0 to give maximum parallelism. 0.5 will divide the target number of executors by 2 The target number of executors computed by the dynamicAllocation can still be overridden by the spark.dynamicAllocation.minExecutors and spark.dynamicAllocation.maxExecutors settings |
spark.dynamicAllocation.schedulerBacklogTimeout | 1s | If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested. For more detail, see this description . |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | schedulerBacklogTimeout | Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for subsequent executor requests. For more detail, see this description . |
来源: https://www.cnblogs.com/starqiu/p/12409219.html