前言
在大数据计算领域, Spark 已经成为了越来越流行越来越受欢迎的计算平台之一 Spark 的功能涵盖了大数据领域的离线批处理 SQL 类处理流式 / 实时计算机器学习图计算等各种不同类型的计算操作, 应用范围与前景非常广泛在美团大众点评, 已经有很多同学在各种项目中尝试使用 Spark 大多数同学(包括笔者在内), 最初开始尝试使用 Spark 的原因很简单, 主要就是为了让大数据计算作业的执行速度更快性能更高
然而, 通过 Spark 开发出高性能的大数据计算作业, 并不是那么简单的如果没有对 Spark 作业进行合理的调优, Spark 作业的执行速度可能会很慢, 这样就完全体现不出 Spark 作为一种快速大数据计算引擎的优势来因此, 想要用好 Spark, 就必须对其进行合理的性能优化
Spark 的性能调优实际上是由很多部分组成的, 不是调节几个参数就可以立竿见影提升作业性能的我们需要根据不同的业务场景以及数据情况, 对 Spark 作业进行综合性的分析, 然后进行多个方面的调节和优化, 才能获得最佳性能
笔者根据之前的 Spark 作业开发经验以及实践积累, 总结出了一套 Spark 作业的性能优化方案整套方案主要分为开发调优资源调优数据倾斜调优 shuffle 调优几个部分开发调优和资源调优是所有 Spark 作业都需要注意和遵循的一些基本原则, 是高性能 Spark 作业的基础; 数据倾斜调优, 主要讲解了一套完整的用来解决 Spark 作业数据倾斜的解决方案; shuffle 调优, 面向的是对 Spark 的原理有较深层次掌握和研究的同学, 主要讲解了如何对 Spark 作业的 shuffle 运行过程以及细节进行调优
本文作为 Spark 性能优化指南的基础篇, 主要讲解开发调优以及资源调优
开发调优
调优概述
Spark 性能优化的第一步, 就是要在开发 Spark 作业的过程中注意和应用一些性能优化的基本原则开发调优, 就是要让大家了解以下一些 Spark 基本开发原则, 包括: RDD lineage 设计算子的合理使用特殊操作的优化等在开发过程中, 时时刻刻都应该注意以上原则, 并将这些原则根据具体的业务以及实际的应用场景, 灵活地运用到自己的 Spark 作业中
原则一: 避免创建重复的 RDD
通常来说, 我们在开发一个 Spark 作业时, 首先是基于某个数据源 (比如 Hive 表或 HDFS 文件) 创建一个初始的 RDD; 接着对这个 RDD 执行某个算子操作, 然后得到下一个 RDD; 以此类推, 循环往复, 直到计算出最终我们需要的结果在这个过程中, 多个 RDD 会通过不同的算子操作 (比如 mapreduce 等) 串起来, 这个 RDD 串, 就是 RDD lineage, 也就是 RDD 的血缘关系链
我们在开发过程中要注意: 对于同一份数据, 只应该创建一个 RDD, 不能创建多个 RDD 来代表同一份数据
一些 Spark 初学者在刚开始开发 Spark 作业时, 或者是有经验的工程师在开发 RDD lineage 极其冗长的 Spark 作业时, 可能会忘了自己之前对于某一份数据已经创建过一个 RDD 了, 从而导致对于同一份数据, 创建了多个 RDD 这就意味着, 我们的 Spark 作业会进行多次重复计算来创建多个代表相同数据的 RDD, 进而增加了作业的性能开销
一个简单的例子
- // 需要对名为 hello.txt 的 HDFS 文件进行一次 map 操作, 再进行一次 reduce 操作也就是说, 需要对一份数据执行两次算子操作
- // 错误的做法: 对于同一份数据执行多次算子操作时, 创建多个 RDD
- // 这里执行了两次 textFile 方法, 针对同一个 HDFS 文件, 创建了两个 RDD 出来, 然后分别对每个 RDD 都执行了一个算子操作
- // 这种情况下, Spark 需要从 HDFS 上两次加载 hello.txt 文件的内容, 并创建两个单独的 RDD; 第二次加载 HDFS 文件以及创建 RDD 的性能开销, 很明显是白白浪费掉的
- val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
- rdd1.map(...)
- val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
- rdd2.reduce(...)
- // 正确的用法: 对于一份数据执行多次算子操作时, 只使用一个 RDD
- // 这种写法很明显比上一种写法要好多了, 因为我们对于同一份数据只创建了一个 RDD, 然后对这一个 RDD 执行了多次算子操作
- // 但是要注意到这里为止优化还没有结束, 由于 rdd1 被执行了两次算子操作, 第二次执行 reduce 操作的时候, 还会再次从源头处重新计算一次 rdd1 的数据, 因此还是会有重复计算的性能开销
- // 要彻底解决这个问题, 必须结合原则三: 对多次使用的 RDD 进行持久化, 才能保证一个 RDD 被多次使用时只被计算一次
- val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
- rdd1.map(...)
- rdd1.reduce(...)
原则二: 尽可能复用同一个 RDD
除了要避免在开发过程中对一份完全相同的数据创建多个 RDD 之外, 在对不同的数据执行算子操作时还要尽可能地复用一个 RDD 比如说, 有一个 RDD 的数据格式是 key-value 类型的, 另一个是单 value 类型的, 这两个 RDD 的 value 数据是完全一样的那么此时我们可以只使用 key-value 类型的那个 RDD, 因为其中已经包含了另一个的数据对于类似这种多个 RDD 的数据有重叠或者包含的情况, 我们应该尽量复用一个 RDD, 这样可以尽可能地减少 RDD 的数量, 从而尽可能减少算子执行的次数
一个简单的例子
- // 错误的做法
- // 有一个 < Long, String > 格式的 RDD, 即 rdd1
- // 接着由于业务需要, 对 rdd1 执行了一个 map 操作, 创建了一个 rdd2, 而 rdd2 中的数据仅仅是 rdd1 中的 value 值而已, 也就是说, rdd2 是 rdd1 的子集
- JavaPairRDD<Long, String> rdd1 = ...
- JavaRDD<String> rdd2 = rdd1.map(...)
- // 分别对 rdd1 和 rdd2 执行了不同的算子操作
- rdd1.reduceByKey(...)
- rdd2.map(...)
- // 正确的做法
- // 上面这个 case 中, 其实 rdd1 和 rdd2 的区别无非就是数据格式不同而已, rdd2 的数据完全就是 rdd1 的子集而已, 却创建了两个 rdd, 并对两个 rdd 都执行了一次算子操作
- // 此时会因为对 rdd1 执行 map 算子来创建 rdd2, 而多执行一次算子操作, 进而增加性能开销
- // 其实在这种情况下完全可以复用同一个 RDD
- // 我们可以使用 rdd1, 既做 reduceByKey 操作, 也做 map 操作
- // 在进行第二个 map 操作时, 只使用每个数据的 tuple._2, 也就是 rdd1 中的 value 值, 即可
- JavaPairRDD<Long, String> rdd1 = ...
- rdd1.reduceByKey(...)
- rdd1.map(tuple._2...)
- // 第二种方式相较于第一种方式而言, 很明显减少了一次 rdd2 的计算开销
- // 但是到这里为止, 优化还没有结束, 对 rdd1 我们还是执行了两次算子操作, rdd1 实际上还是会被计算两次
- // 因此还需要配合原则三: 对多次使用的 RDD 进行持久化进行使用, 才能保证一个 RDD 被多次使用时只被计算一次
原则三: 对多次使用的 RDD 进行持久化
当你在 Spark 代码中多次对一个 RDD 做了算子操作后, 恭喜, 你已经实现 Spark 作业第一步的优化了, 也就是尽可能复用 RDD 此时就该在这个基础之上, 进行第二步优化了, 也就是要保证对一个 RDD 执行多次算子操作时, 这个 RDD 本身仅仅被计算一次
Spark 中对于一个 RDD 执行多次算子的默认原理是这样的: 每次你对一个 RDD 执行一个算子操作时, 都会重新从源头处计算一遍, 计算出那个 RDD 来, 然后再对这个 RDD 执行你的算子操作这种方式的性能是很差的
因此对于这种情况, 我们的建议是: 对多次使用的 RDD 进行持久化此时 Spark 就会根据你的持久化策略, 将 RDD 中的数据保存到内存或者磁盘中以后每次对这个 RDD 进行算子操作时, 都会直接从内存或磁盘中提取持久化的 RDD 数据, 然后执行算子, 而不会从源头处重新计算一遍这个 RDD, 再执行算子操作
对多次使用的 RDD 进行持久化的代码示例
- // 如果要对一个 RDD 进行持久化, 只要对这个 RDD 调用 cache()和 persist()即可
- // 正确的做法
- // cache()方法表示: 使用非序列化的方式将 RDD 中的数据全部尝试持久化到内存中
- // 此时再对 rdd1 执行两次算子操作时, 只有在第一次执行 map 算子时, 才会将这个 rdd1 从源头处计算一次
- // 第二次执行 reduce 算子时, 就会直接从内存中提取数据进行计算, 不会重复计算一个 rdd
- val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
- rdd1.map(...)
- rdd1.reduce(...)
- // persist()方法表示: 手动选择持久化级别, 并使用指定的方式进行持久化
- // 比如说, StorageLevel.MEMORY_AND_DISK_SER 表示, 内存充足时优先持久化到内存中, 内存不充足时持久化到磁盘文件中
- // 而且其中的_SER 后缀表示, 使用序列化的方式来保存 RDD 数据, 此时 RDD 中的每个 partition 都会序列化成一个大的字节数组, 然后再持久化到内存或磁盘中
- // 序列化的方式可以减少持久化的数据对内存 / 磁盘的占用量, 进而避免内存被持久化数据占用过多, 从而发生频繁 GC
- val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
- rdd1.map(...)
- rdd1.reduce(...)
对于 persist()方法而言, 我们可以根据不同的业务场景选择不同的持久化级别
Spark 的持久化级别
持久化级别 | 含义解释 |
---|---|
MEMORY_ONLY | 使用未序列化的 Java 对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个 RDD 执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用 cache()方法时,实际就是使用的这种持久化策略。 |
MEMORY_AND_DISK | 使用未序列化的 Java 对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个 RDD 执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 |
MEMORY_ONLY_SER | 基本含义同 MEMORY_ONLY。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC。 |
MEMORY_AND_DISK_SER | 基本含义同 MEMORY_AND_DISK。唯一的区别是,会将 RDD 中的数据进行序列化,RDD 的每个 partition 会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁 GC。 |
DISK_ONLY | 使用未序列化的 Java 对象格式,将数据全部写入磁盘文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对 RDD 计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。 |
如何选择一种最合适的持久化策略
默认情况下, 性能最高的当然是 MEMORY_ONLY, 但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个 RDD 的所有数据因为不进行序列化与反序列化操作, 就避免了这部分的性能开销; 对这个 RDD 的后续算子操作, 都是基于纯内存中的数据的操作, 不需要从磁盘文件中读取数据, 性能也很高; 而且不需要复制一份数据副本, 并远程传送到其他节点上但是这里必须要注意的是, 在实际的生产环境中, 恐怕能够直接用这种策略的场景还是有限的, 如果 RDD 中数据比较多时(比如几十亿), 直接用这种持久化级别, 会导致 JVM 的 OOM 内存溢出异常
如果使用 MEMORY_ONLY 级别时发生了内存溢出, 那么建议尝试使用 MEMORY_ONLY_SER 级别该级别会将 RDD 数据序列化后再保存在内存中, 此时每个 partition 仅仅是一个字节数组而已, 大大减少了对象数量, 并降低了内存占用这种级别比 MEMORY_ONLY 多出来的性能开销, 主要就是序列化与反序列化的开销但是后续算子可以基于纯内存进行操作, 因此性能总体还是比较高的此外, 可能发生的问题同上, 如果 RDD 中的数据量过多的话, 还是可能会导致 OOM 内存溢出的异常
如果纯内存的级别都无法使用, 那么建议使用 MEMORY_AND_DISK_SER 策略, 而不是 MEMORY_AND_DISK 策略因为既然到了这一步, 就说明 RDD 的数据量很大, 内存无法完全放下序列化后的数据比较少, 可以节省内存和磁盘的空间开销同时该策略会优先尽量尝试将数据缓存在内存中, 内存缓存不下才会写入磁盘
通常不建议使用 DISK_ONLY 和后缀为_2 的级别: 因为完全基于磁盘文件进行数据的读写, 会导致性能急剧降低, 有时还不如重新计算一次所有 RDD 后缀为_2 的级别, 必须将所有数据都复制一份副本, 并发送到其他节点上, 数据复制以及网络传输会导致较大的性能开销, 除非是要求作业的高可用性, 否则不建议使用
原则四: 尽量避免使用 shuffle 类算子
如果有可能的话, 要尽量避免使用 shuffle 类算子因为 Spark 作业运行过程中, 最消耗性能的地方就是 shuffle 过程 shuffle 过程, 简单来说, 就是将分布在集群中多个节点上的同一个 key, 拉取到同一个节点上, 进行聚合或 join 等操作比如 reduceByKeyjoin 等算子, 都会触发 shuffle 操作
shuffle 过程中, 各个节点上的相同 key 都会先写入本地磁盘文件中, 然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同 key 而且相同 key 都拉取到同一个节点进行聚合操作时, 还有可能会因为一个节点上处理的 key 过多, 导致内存不够存放, 进而溢写到磁盘文件中因此在 shuffle 过程中, 可能会发生大量的磁盘文件读写的 IO 操作, 以及数据的网络传输操作磁盘 IO 和网络数据传输也是 shuffle 性能较差的主要原因
因此在我们的开发过程中, 能避免则尽可能避免使用 reduceByKeyjoindistinctrepartition 等会进行 shuffle 的算子, 尽量使用 map 类的非 shuffle 算子这样的话, 没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业, 可以大大减少性能开销
Broadcast 与 map 进行 join 代码示例
- // 传统的 join 操作会导致 shuffle 操作
- // 因为两个 RDD 中, 相同的 key 都需要通过网络拉取到一个节点上, 由一个 task 进行 join 操作
- val rdd3 = rdd1.join(rdd2)
- // Broadcast+map 的 join 操作, 不会导致 shuffle 操作
- // 使用 Broadcast 将一个数据量较小的 RDD 作为广播变量
- val rdd2Data = rdd2.collect()
- val rdd2DataBroadcast = sc.broadcast(rdd2Data)
- // 在 rdd1.map 算子中, 可以从 rdd2DataBroadcast 中, 获取 rdd2 的所有数据
- // 然后进行遍历, 如果发现 rdd2 中某条数据的 key 与 rdd1 的当前数据的 key 是相同的, 那么就判定可以进行 join
- // 此时就可以根据自己需要的方式, 将 rdd1 当前数据与 rdd2 中可以连接的数据, 拼接在一起(String 或 Tuple)
- val rdd3 = rdd1.map(rdd2DataBroadcast...)
- // 注意, 以上操作, 建议仅仅在 rdd2 的数据量比较少 (比如几百 M, 或者一两 G) 的情况下使用
- // 因为每个 Executor 的内存中, 都会驻留一份 rdd2 的全量数据
原则五: 使用 map-side 预聚合的 shuffle 操作
如果因为业务需要, 一定要使用 shuffle 操作, 无法用 map 类的算子来替代, 那么尽量使用可以 map-side 预聚合的算子
所谓的 map-side 预聚合, 说的是在每个节点本地对相同的 key 进行一次聚合操作, 类似于 MapReduce 中的本地 combinermap-side 预聚合之后, 每个节点本地就只会有一条相同的 key, 因为多条相同的 key 都被聚合起来了其他节点在拉取所有节点上的相同 key 时, 就会大大减少需要拉取的数据数量, 从而也就减少了磁盘 IO 以及网络传输开销通常来说, 在可能的情况下, 建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子因为 reduceByKey 和 aggregateByKey 算子都会使用用户自定义的函数对每个节点本地的相同 key 进行预聚合而 groupByKey 算子是不会进行预聚合的, 全量的数据会在集群的各个节点之间分发和传输, 性能相对来说比较差
比如如下两幅图, 就是典型的例子, 分别基于 reduceByKey 和 groupByKey 进行单词计数其中第一张图是 groupByKey 的原理图, 可以看到, 没有进行任何本地聚合时, 所有数据都会在集群节点之间传输; 第二张图是 reduceByKey 的原理图, 可以看到, 每个节点本地的相同 key 数据, 都进行了预聚合, 然后才传输到其他节点上进行全局聚合
原则六: 使用高性能的算子
除了 shuffle 相关的算子有优化原则之外, 其他的算子也都有着相应的优化原则
使用 reduceByKey/aggregateByKey 替代 groupByKey
详情见原则五: 使用 map-side 预聚合的 shuffle 操作
使用 mapPartitions 替代普通 map
mapPartitions 类的算子, 一次函数调用会处理一个 partition 所有的数据, 而不是一次函数调用处理一条, 性能相对来说会高一些但是有的时候, 使用 mapPartitions 会出现 OOM(内存溢出)的问题因为单次函数调用就要处理掉一个 partition 所有的数据, 如果内存不够, 垃圾回收时是无法回收掉太多对象的, 很可能出现 OOM 异常所以使用这类操作时要慎重!
使用 foreachPartitions 替代 foreach
原理类似于使用 mapPartitions 替代 map, 也是一次函数调用处理一个 partition 的所有数据, 而不是一次函数调用处理一条数据在实践中发现, foreachPartitions 类的算子, 对性能的提升还是很有帮助的比如在 foreach 函数中, 将 RDD 中所有数据写 MySQL, 那么如果是普通的 foreach 算子, 就会一条数据一条数据地写, 每次函数调用可能就会创建一个数据库连接, 此时就势必会频繁地创建和销毁数据库连接, 性能是非常低下; 但是如果用 foreachPartitions 算子一次性处理一个 partition 的数据, 那么对于每个 partition, 只要创建一个数据库连接即可, 然后执行批量插入操作, 此时性能是比较高的实践中发现, 对于 1 万条左右的数据量写 MySQL, 性能可以提升 30% 以上
使用 filter 之后进行 coalesce 操作
通常对一个 RDD 执行 filter 算子过滤掉 RDD 中较多数据后(比如 30% 以上的数据), 建议使用 coalesce 算子, 手动减少 RDD 的 partition 数量, 将 RDD 中的数据压缩到更少的 partition 中去因为 filter 之后, RDD 的每个 partition 中都会有很多数据被过滤掉, 此时如果照常进行后续的计算, 其实每个 task 处理的 partition 中的数据量并不是很多, 有一点资源浪费, 而且此时处理的 task 越多, 可能速度反而越慢因此用 coalesce 减少 partition 数量, 将 RDD 中的数据压缩到更少的 partition 之后, 只要使用更少的 task 即可处理完所有的 partition 在某些场景下, 对于性能的提升会有一定的帮助
使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作
repartitionAndSortWithinPartitions 是 Spark 官网推荐的一个算子, 官方建议, 如果需要在 repartition 重分区之后, 还要进行排序, 建议直接使用 repartitionAndSortWithinPartitions 算子因为该算子可以一边进行重分区的 shuffle 操作, 一边进行排序 shuffle 与 sort 两个操作同时进行, 比先 shuffle 再 sort 来说, 性能可能是要高的
原则七: 广播大变量
有时在开发过程中, 会遇到需要在算子函数中使用外部变量的场景 (尤其是大变量, 比如 100M 以上的大集合), 那么此时就应该使用 Spark 的广播(Broadcast) 功能来提升性能
在算子函数中使用到外部变量时, 默认情况下, Spark 会将该变量复制多个副本, 通过网络传输到 task 中, 此时每个 task 都有一个变量副本如果变量本身比较大的话(比如 100M, 甚至 1G), 那么大量的变量副本在网络中传输的性能开销, 以及在各个节点的 Executor 中占用过多内存导致的频繁 GC, 都会极大地影响性能
因此对于上述情况, 如果使用的外部变量比较大, 建议使用 Spark 的广播功能, 对该变量进行广播广播后的变量, 会保证每个 Executor 的内存中, 只驻留一份变量副本, 而 Executor 中的 task 执行时共享该 Executor 中的那份变量副本这样的话, 可以大大减少变量副本的数量, 从而减少网络传输的性能开销, 并减少对 Executor 内存的占用开销, 降低 GC 的频率
广播大变量的代码示例
- // 以下代码在算子函数中, 使用了外部的变量
- // 此时没有做任何特殊操作, 每个 task 都会有一份 list1 的副本
- val list1 = ...
- rdd1.map(list1...)
- // 以下代码将 list1 封装成了 Broadcast 类型的广播变量
- // 在算子函数中, 使用广播变量时, 首先会判断当前 task 所在 Executor 内存中, 是否有变量副本
- // 如果有则直接使用; 如果没有则从 Driver 或者其他 Executor 节点上远程拉取一份放到本地 Executor 内存中
- // 每个 Executor 内存中, 就只会驻留一份广播变量副本
- val list1 = ...
- val list1Broadcast = sc.broadcast(list1)
- rdd1.map(list1Broadcast...)
原则八: 使用 Kryo 优化序列化性能
在 Spark 中, 主要有三个地方涉及到了序列化:
在算子函数中使用到外部变量时, 该变量会被序列化后进行网络传输(见原则七: 广播大变量中的讲解)
将自定义的类型作为 RDD 的泛型类型时(比如 JavaRDD,Student 是自定义类型), 所有自定义类型对象, 都会进行序列化因此这种情况下, 也要求自定义的类必须实现 Serializable 接口
使用可序列化的持久化策略时(比如 MEMORY_ONLY_SER),Spark 会将 RDD 中的每个 partition 都序列化成一个大的字节数组
对于这三种出现序列化的地方, 我们都可以通过使用 Kryo 序列化类库, 来优化序列化和反序列化的性能 Spark 默认使用的是 Java 的序列化机制, 也就是 ObjectOutputStream/ObjectInputStream API 来进行序列化和反序列化但是 Spark 同时支持使用 Kryo 序列化库, Kryo 序列化类库的性能比 Java 序列化类库的性能要高很多官方介绍, Kryo 序列化机制比 Java 序列化机制, 性能高 10 倍左右 Spark 之所以默认没有使用 Kryo 作为序列化类库, 是因为 Kryo 要求最好要注册所有需要进行序列化的自定义类型, 因此对于开发者来说, 这种方式比较麻烦
以下是使用 Kryo 的代码示例, 我们只要设置序列化类, 再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型作为 RDD 泛型类型的自定义类型等):
- // 创建 SparkConf 对象
- val conf = new SparkConf().setMaster(...).setAppName(...)
- // 设置序列化器为 KryoSerializer
- conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- // 注册要序列化的自定义类型
- conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
原则九: 优化数据结构
Java 中, 有三种类型比较耗费内存:
对象, 每个 Java 对象都有对象头引用等额外的信息, 因此比较占用内存空间
字符串, 每个字符串内部都有一个字符数组以及长度等额外信息
集合类型, 比如 HashMapLinkedList 等, 因为集合类型内部通常会使用一些内部类来封装集合元素, 比如 Map.Entry
因此 Spark 官方建议, 在 Spark 编码实现中, 特别是对于算子函数中的代码, 尽量不要使用上述三种数据结构, 尽量使用字符串替代对象, 使用原始类型 (比如 IntLong) 替代字符串, 使用数组替代集合类型, 这样尽可能地减少内存占用, 从而降低 GC 频率, 提升性能
但是在笔者的编码实践中发现, 要做到该原则其实并不容易因为我们同时要考虑到代码的可维护性, 如果一个代码中, 完全没有任何对象抽象, 全部是字符串拼接的方式, 那么对于后续的代码维护和修改, 无疑是一场巨大的灾难同理, 如果所有操作都基于数组实现, 而不使用 HashMapLinkedList 等集合类型, 那么对于我们的编码难度以及代码可维护性, 也是一个极大的挑战因此笔者建议, 在可能以及合适的情况下, 使用占用内存较少的数据结构, 但是前提是要保证代码的可维护性
资源调优
调优概述
在开发完 Spark 作业之后, 就该为作业配置合适的资源了 Spark 的资源参数, 基本都可以在 spark-submit 命令中作为参数设置很多 Spark 初学者, 通常不知道该设置哪些必要的参数, 以及如何设置这些参数, 最后就只能胡乱设置, 甚至压根儿不设置资源参数设置的不合理, 可能会导致没有充分利用集群资源, 作业运行会极其缓慢; 或者设置的资源过大, 队列没有足够的资源来提供, 进而导致各种异常总之, 无论是哪种情况, 都会导致 Spark 作业的运行效率低下, 甚至根本无法运行因此我们必须对 Spark 作业的资源使用原理有一个清晰的认识, 并知道在 Spark 作业运行过程中, 有哪些资源参数是可以设置的, 以及如何设置合适的参数值
Spark 作业基本运行原理
详细原理见上图我们使用 spark-submit 提交一个 Spark 作业之后, 这个作业就会启动一个对应的 Driver 进程根据你使用的部署模式 (deploy-mode) 不同, Driver 进程可能在本地启动, 也可能在集群中某个工作节点上启动 Driver 进程本身会根据我们设置的参数, 占有一定数量的内存和 CPU core 而 Driver 进程要做的第一件事情, 就是向集群管理器 (可以是 Spark Standalone 集群, 也可以是其他的资源管理集群, 美团大众点评使用的是 YARN 作为资源管理集群) 申请运行 Spark 作业需要使用的资源, 这里的资源指的就是 Executor 进程 YARN 集群管理器会根据我们为 Spark 作业设置的资源参数, 在各个工作节点上, 启动一定数量的 Executor 进程, 每个 Executor 进程都占有一定数量的内存和 CPU core
在申请到了作业执行所需的资源之后, Driver 进程就会开始调度和执行我们编写的作业代码了 Driver 进程会将我们编写的 Spark 作业代码分拆为多个 stage, 每个 stage 执行一部分代码片段, 并为每个 stage 创建一批 task, 然后将这些 task 分配到各个 Executor 进程中执行 task 是最小的计算单元, 负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段), 只是每个 task 处理的数据不同而已一个 stage 的所有 task 都执行完毕之后, 会在各个节点本地的磁盘文件中写入计算中间结果, 然后 Driver 就会调度运行下一个 stage 下一个 stage 的 task 的输入数据就是上一个 stage 输出的中间结果如此循环往复, 直到将我们自己编写的代码逻辑全部执行完, 并且计算完所有的数据, 得到我们想要的结果为止
Spark 是根据 shuffle 类算子来进行 stage 的划分如果我们的代码中执行了某个 shuffle 类算子 (比如 reduceByKeyjoin 等), 那么就会在该算子处, 划分出一个 stage 界限来可以大致理解为, shuffle 算子执行之前的代码会被划分为一个 stage,shuffle 算子执行以及之后的代码会被划分为下一个 stage 因此一个 stage 刚开始执行的时候, 它的每个 task 可能都会从上一个 stage 的 task 所在的节点, 去通过网络传输拉取需要自己处理的所有 key, 然后对拉取到的所有相同的 key 使用我们自己编写的算子函数执行聚合操作(比如 reduceByKey() 算子接收的函数)这个过程就是 shuffle
当我们在代码中执行了 cache/persist 等持久化操作时, 根据我们选择的持久化级别的不同, 每个 task 计算出来的数据也会保存到 Executor 进程的内存或者所在节点的磁盘文件中
因此 Executor 的内存主要分为三块: 第一块是让 task 执行我们自己编写的代码时使用, 默认是占 Executor 总内存的 20%; 第二块是让 task 通过 shuffle 过程拉取了上一个 stage 的 task 的输出后, 进行聚合等操作时使用, 默认也是占 Executor 总内存的 20%; 第三块是让 RDD 持久化时使用, 默认占 Executor 总内存的 60%
task 的执行速度是跟每个 Executor 进程的 CPU core 数量有直接关系的一个 CPU core 同一时间只能执行一个线程而每个 Executor 进程上分配到的多个 task, 都是以每个 task 一条线程的方式, 多线程并发运行的如果 CPU core 数量比较充足, 而且分配到的 task 数量比较合理, 那么通常来说, 可以比较快速和高效地执行完这些 task 线程
以上就是 Spark 作业的基本运行原理的说明, 大家可以结合上图来理解理解作业基本原理, 是我们进行资源参数调优的基本前提
资源参数调优
了解完了 Spark 作业运行的基本原理之后, 对资源相关的参数就容易理解了所谓的 Spark 资源参数调优, 其实主要就是对 Spark 运行过程中各个使用资源的地方, 通过调节各种参数, 来优化资源使用的效率, 从而提升 Spark 作业的执行性能以下参数就是 Spark 中主要的资源参数, 每个参数都对应着作业运行原理中的某个部分, 我们同时也给出了一个调优的参考值
num-executors
参数说明: 该参数用于设置 Spark 作业总共要用多少个 Executor 进程来执行 Driver 在向 YARN 集群管理器申请资源时, YARN 集群管理器会尽可能按照你的设置来在集群的各个工作节点上, 启动相应数量的 Executor 进程这个参数非常之重要, 如果不设置的话, 默认只会给你启动少量的 Executor 进程, 此时你的 Spark 作业的运行速度是非常慢的
参数调优建议: 每个 Spark 作业的运行一般设置 50~100 个左右的 Executor 进程比较合适, 设置太少或太多的 Executor 进程都不好设置的太少, 无法充分利用集群资源; 设置的太多的话, 大部分队列可能无法给予充分的资源
executor-memory
参数说明: 该参数用于设置每个 Executor 进程的内存 Executor 内存的大小, 很多时候直接决定了 Spark 作业的性能, 而且跟常见的 JVM OOM 异常, 也有直接的关联
参数调优建议: 每个 Executor 进程的内存设置 4G~8G 较为合适但是这只是一个参考值, 具体的设置还是得根据不同部门的资源队列来定可以看看自己团队的资源队列的最大内存限制是多少, num-executors 乘以 executor-memory, 是不能超过队列的最大内存量的此外, 如果你是跟团队里其他人共享这个资源队列, 那么申请的内存量最好不要超过资源队列最大总内存的 1/3~1/2, 避免你自己的 Spark 作业占用了队列所有的资源, 导致别的同学的作业无法运行
executor-cores
参数说明: 该参数用于设置每个 Executor 进程的 CPU core 数量这个参数决定了每个 Executor 进程并行执行 task 线程的能力因为每个 CPU core 同一时间只能执行一个 task 线程, 因此每个 Executor 进程的 CPU core 数量越多, 越能够快速地执行完分配给自己的所有 task 线程
参数调优建议: Executor 的 CPU core 数量设置为 2~4 个较为合适同样得根据不同部门的资源队列来定, 可以看看自己的资源队列的最大 CPU core 限制是多少, 再依据设置的 Executor 数量, 来决定每个 Executor 进程可以分配到几个 CPU core 同样建议, 如果是跟他人共享这个队列, 那么 num-executors * executor-cores 不要超过队列总 CPU core 的 1/3~1/2 左右比较合适, 也是避免影响其他同学的作业运行
driver-memory
参数说明: 该参数用于设置 Driver 进程的内存
参数调优建议: Driver 的内存通常来说不设置, 或者设置 1G 左右应该就够了唯一需要注意的一点是, 如果需要使用 collect 算子将 RDD 的数据全部拉取到 Driver 上进行处理, 那么必须确保 Driver 的内存足够大, 否则会出现 OOM 内存溢出的问题
spark.default.parallelism
参数说明: 该参数用于设置每个 stage 的默认 task 数量这个参数极为重要, 如果不设置可能会直接影响你的 Spark 作业性能
参数调优建议: Spark 作业的默认 task 数量为 500~1000 个较为合适很多同学常犯的一个错误就是不去设置这个参数, 那么此时就会导致 Spark 自己根据底层 HDFS 的 block 数量来设置 task 的数量, 默认是一个 HDFS block 对应一个 task 通常来说, Spark 默认设置的数量是偏少的(比如就几十个 task), 如果 task 数量偏少的话, 就会导致你前面设置好的 Executor 的参数都前功尽弃试想一下, 无论你的 Executor 进程有多少个, 内存和 CPU 有多大, 但是 task 只有 1 个或者 10 个, 那么 90% 的 Executor 进程可能根本就没有 task 执行, 也就是白白浪费了资源! 因此 Spark 官网建议的设置原则是, 设置该参数为 num-executors * executor-cores 的 2~3 倍较为合适, 比如 Executor 的总 CPU core 数量为 300 个, 那么设置 1000 个 task 是可以的, 此时可以充分地利用 Spark 集群的资源
spark.storage.memoryFraction
参数说明: 该参数用于设置 RDD 持久化数据在 Executor 内存中能占的比例, 默认是 0.6 也就是说, 默认 Executor 60% 的内存, 可以用来保存持久化的 RDD 数据根据你选择的不同的持久化策略, 如果内存不够时, 可能数据就不会持久化, 或者数据会写入磁盘
参数调优建议: 如果 Spark 作业中, 有较多的 RDD 持久化操作, 该参数的值可以适当提高一些, 保证持久化的数据能够容纳在内存中避免内存不够缓存所有的数据, 导致数据只能写入磁盘中, 降低了性能但是如果 Spark 作业中的 shuffle 类操作比较多, 而持久化操作比较少, 那么这个参数的值适当降低一些比较合适此外, 如果发现作业由于频繁的 gc 导致运行缓慢(通过 spark web ui 可以观察到作业的 gc 耗时), 意味着 task 执行用户代码的内存不够用, 那么同样建议调低这个参数的值
spark.shuffle.memoryFraction
参数说明: 该参数用于设置 shuffle 过程中一个 task 拉取到上个 stage 的 task 的输出后, 进行聚合操作时能够使用的 Executor 内存的比例, 默认是 0.2 也就是说, Executor 默认只有 20% 的内存用来进行该操作 shuffle 操作在进行聚合时, 如果发现使用的内存超出了这个 20% 的限制, 那么多余的数据就会溢写到磁盘文件中去, 此时就会极大地降低性能
参数调优建议: 如果 Spark 作业中的 RDD 持久化操作较少, shuffle 操作较多时, 建议降低持久化操作的内存占比, 提高 shuffle 操作的内存占比比例, 避免 shuffle 过程中数据过多时内存不够用, 必须溢写到磁盘上, 降低了性能此外, 如果发现作业由于频繁的 gc 导致运行缓慢, 意味着 task 执行用户代码的内存不够用, 那么同样建议调低这个参数的值
资源参数的调优, 没有一个固定的值, 需要同学们根据自己的实际情况(包括 Spark 作业中的 shuffle 操作数量 RDD 持久化操作数量以及 spark web ui 中显示的作业 gc 情况), 同时参考本篇文章中给出的原理以及调优建议, 合理地设置上述参数
资源参数参考示例
以下是一份 spark-submit 命令的示例, 大家可以参考一下, 并根据自己的实际情况进行调节:
1 ./bin/spark-submit 2 --master yarn-cluster 3 --num-executors 100 4 --executor-memory 6G 5 --executor-cores 4 6 --driver-memory 1G 7 --conf spark.default.parallelism=1000 8 --conf spark.storage.memoryFraction=0.5 9 --conf spark.shuffle.memoryFraction=0.3 \
写在最后的话
根据实践经验来看, 大部分 Spark 作业经过本次基础篇所讲解的开发调优与资源调优之后, 一般都能以较高的性能运行了, 足以满足我们的需求但是在不同的生产环境和项目背景下, 可能会遇到其他更加棘手的问题(比如各种数据倾斜), 也可能会遇到更高的性能要求为了应对这些挑战, 需要使用更高级的技巧来处理这类问题在后续的 Spark 性能优化指南高级篇中, 我们会详细讲解数据倾斜调优以及 Shuffle 调优
来源: http://www.bubuko.com/infodetail-2503677.html