一, 数据接收并行度调优
(1)
通过网络接收数据时 (比如 Kafka,Flume), 会将数据反序列化, 并存储在 Spark 的内存中. 如果数据接收称为系统的瓶颈, 那么可以考虑并行化数据接收.
每一个输入 DStream 都会在某个 Worker 的 Executor 上启动一个 Receiver, 该 Receiver 接收一个数据流. 因此可以通过创建多个输入 DStream, 并且配置它们接收
数据源不同的分区数据, 达到接收多个数据流的效果. 比如说, 一个接收两个 Kafka Topic 的输入 DStream, 可以被拆分为两个输入 DStream, 每个分别接收一个
topic 的数据. 这样就会创建两个 Receiver, 从而并行地接收数据, 进而提升吞吐量. 多个 DStream 可以使用 union 算子进行聚合, 从而形成一个 DStream. 然后
后续的 transformation 算子操作都针对该一个聚合后的 DStream 即可.
- int numStreams = 5;
- List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
- for (int i = 0; i <numStreams; i++) {
- kafkaStreams.add(KafkaUtils.createStream(...));
- }
- JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
- unifiedStream.print();
- (2)
数据接收并行度调优, 除了创建更多输入 DStream 和 Receiver 以外, 还可以考虑调节 block interval. 通过参数, spark.streaming.blockInterval, 可以设置 block interval,
默认是 200ms. 对于大多数 Receiver 来说, 在将接收到的数据保存到 Spark 的 BlockManager 之前, 都会将数据切分为一个一个的 block. 而每个 batch 中的 block 数量, 则决定了该
batch 对应的 RDD 的 partition 的数量, 以及针对该 RDD 执行 transformation 操作时, 创建的 task 的数量. 每个 batch 对应的 task 数量是大约估计的, 即 batch interval / block interval.
例如说, batch interval 为 2s,block interval 为 200ms, 会创建 10 个 task. 如果你认为每个 batch 的 task 数量太少, 即低于每台机器的 CPU core 数量, 那么就说明 batch 的 task 数量
是不够的, 因为所有的 CPU 资源无法完全被利用起来. 要为 batch 增加 block 的数量, 那么就减小 block interval. 然而, 推荐的 block interval 最小值是 50ms, 如果低于这个数值,
那么大量 task 的启动时间, 可能会变成一个性能开销点.
(3)
除了上述说的两个提升数据接收并行度的方式, 还有一种方法, 就是显式地对输入数据流进行重分区. 使用 inputStream.repartition(<number of partitions>) 即可.
这样就可以将接收到的 batch, 分布到指定数量的机器上, 然后再进行进一步的操作.
二, 任务启动调优
如果每秒钟启动的 task 过于多, 比如每秒钟启动 50 个, 那么发送这些 task 去 Worker 节点上的 Executor 的性能开销, 会比较大, 而且此时基本就很难达到毫秒级的延迟了.
使用下述操作可以减少这方面的性能开销:
1,Task 序列化: 使用 Kryo 序列化机制来序列化 task, 可以减小 task 的大小, 从而减少发送这些 task 到各个 Worker 节点上的 Executor 的时间.
2, 执行模式: 在 Standalone 模式下运行 Spark, 可以达到更少的 task 启动时间.
上述方式, 也许可以将每个 batch 的处理时间减少 100 毫秒. 从而从秒级降到毫秒级.
三, 数据处理并行度调优
如果在计算的任何 stage 中使用的并行 task 的数量没有足够多, 那么集群资源是无法被充分利用的. 举例来说, 对于分布式的 reduce 操作,
比如 reduceByKey 和 reduceByKeyAndWindow, 默认的并行 task 的数量是由 spark.default.parallelism 参数决定的. 你可以在 reduceByKey 等操作中,
传入第二个参数, 手动指定该操作的并行度, 也可以调节全局的 spark.default.parallelism 参数.
四, 数据序列化调优
(1)
数据序列化造成的系统开销可以由序列化格式的优化来减小. 在流式计算的场景下, 有两种类型的数据需要序列化.
1, 输入数据: 默认情况下, 接收到的输入数据, 是存储在 Executor 的内存中的, 使用的持久化级别是 StorageLevel.MEMORY_AND_DISK_SER_2. 这意味着,
数据被序列化为字节从而减小 GC 开销, 并且会复制以进行 executor 失败的容错. 因此, 数据首先会存储在内存中, 然后在内存不足时会溢写到磁盘上, 从而
为流式计算来保存所有需要的数据. 这里的序列化有明显的性能开销 --Receiver 必须反序列化从网络接收到的数据, 然后再使用 Spark 的序列化格式序列化数据.
2, 流式计算操作生成的持久化 RDD: 流式计算操作生成的持久化 RDD, 可能会持久化到内存中. 例如, 窗口操作默认就会将数据持久化在内存中, 因为这些数据
后面可能会在多个窗口中被使用, 并被处理多次. 然而, 不像 Spark Core 的默认持久化级别, StorageLevel.MEMORY_ONLY, 流式计算操作生成的 RDD 的默认持久化
级别是 StorageLevel.MEMORY_ONLY_SER , 默认就会减小 GC 开销.
(2)
在上述的场景中, 使用 Kryo 序列化类库可以减小 CPU 和内存的性能开销. 使用 Kryo 时, 一定要考虑注册自定义的类,
并且禁用对应引用的 tracking(spark.kryo.referenceTracking).
在一些特殊的场景中, 比如需要为流式应用保持的数据总量并不是很多, 也许可以将数据以非序列化的方式进行持久化, 从而减少序列化和反序列化的 CPU 开销,
而且又不会有太昂贵的 GC 开销. 举例来说, 如果你数秒的 batch interval, 并且没有使用 Windows 操作, 那么你可以考虑通过显式地设置持久化级别, 来禁止持久化时
对数据进行序列化. 这样就可以减少用于序列化和反序列化的 CPU 性能开销, 并且不用承担太多的 GC 开销.
五, batch interval 调优 (重要)
如果想让一个运行在集群上的 Spark Streaming 应用程序可以稳定, 它就必须尽可能快地处理接收到的数据. 换句话说, batch 应该在生成之后, 就尽可能快地处理掉.
对于一个应用来说, 这个是不是一个问题, 可以通过观察 Spark UI 上的 batch 处理时间来定. batch 处理时间必须小于 batch interval 时间.
基于流式计算的本质, batch interval 对于, 在固定集群资源条件下, 应用能保持的数据接收速率, 会有巨大的影响. 例如, 在 WordCount 例子中, 对于一个特定的
数据接收速率, 应用业务可以保证每 2 秒打印一次单词计数, 而不是每 500ms. 因此 batch interval 需要被设置得, 让预期的数据接收速率可以在生产环境中保持住.
为你的应用计算正确的 batch 大小的比较好的方法, 是在一个很保守的 batch interval, 比如 5~10s, 以很慢的数据接收速率进行测试. 要检查应用是否跟得上这个数据速率,
可以检查每个 batch 的处理时间的延迟, 如果处理时间与 batch interval 基本吻合, 那么应用就是稳定的. 否则, 如果 batch 调度的延迟持续增长, 那么就意味应用无法跟得上
这个速率, 也就是不稳定的. 因此你要想有一个稳定的配置, 可以尝试提升数据处理的速度, 或者增加 batch interval. 记住, 由于临时性的数据增长导致的暂时的延迟增长,
可以合理的, 只要延迟情况可以在短时间内恢复即可.
六, 内存调优
(1)
优化 Spark 应用的内存使用和 GC 行为, 在 Spark Core 的调优中, 已经讲过了. 这里讲一下与 Spark Streaming 应用相关的调优参数.
Spark Streaming 应用需要的集群内存资源, 是由使用的 transformation 操作类型决定的. 举例来说, 如果想要使用一个窗口长度为 10 分钟的 Windows 操作,
那么集群就必须有足够的内存来保存 10 分钟内的数据. 如果想要使用 updateStateByKey 来维护许多 key 的 state, 那么你的内存资源就必须足够大. 反过来说,
如果想要做一个简单的 map-filter-store 操作, 那么需要使用的内存就很少.
通常来说, 通过 Receiver 接收到的数据, 会使用 StorageLevel.MEMORY_AND_DISK_SER_2 持久化级别来进行存储, 因此无法保存在内存中的数据会溢写到磁盘上.
而溢写到磁盘上, 是会降低应用的性能的. 因此, 通常是建议为应用提供它需要的足够的内存资源. 建议在一个小规模的场景下测试内存的使用量, 并进行评估.
(2)
内存调优的另外一个方面是垃圾回收. 对于流式应用来说, 如果要获得低延迟, 肯定不想要有因为 JVM 垃圾回收导致的长时间延迟. 有很多参数可以帮助降低内存使用和 GC 开销:
1,DStream 的持久化: 正如在 "数据序列化调优" 一节中提到的, 输入数据和某些操作生产的中间 RDD, 默认持久化时都会序列化为字节. 与非序列化的方式相比, 这会降低内存和 GC 开销.
使用 Kryo 序列化机制可以进一步减少内存使用和 GC 开销. 进一步降低内存使用率, 可以对数据进行压缩, 由 spark.rdd.compress 参数控制 (默认 false).
2, 清理旧数据: 默认情况下, 所有输入数据和通过 DStream transformation 操作生成的持久化 RDD, 会自动被清理. Spark Streaming 会决定何时清理这些数据,
取决于 transformation 操作类型. 例如, 你在使用窗口长度为 10 分钟内的 Windows 操作, Spark 会保持 10 分钟以内的数据, 时间过了以后就会清理旧数据. 但是在某些特殊场景下,
比如 Spark SQL 和 Spark Streaming 整合使用时, 在异步开启的线程中, 使用 Spark SQL 针对 batch RDD 进行执行查询. 那么就需要让 Spark 保存更长时间的数据, 直到 Spark SQL 查询结束.
可以使用 streamingContext.remember() 方法来实现.
3,CMS 垃圾回收器: 使用并行的 mark-sweep 垃圾回收机制, 被推荐使用, 用来保持 GC 低开销. 虽然并行的 GC 会降低吞吐量, 但是还是建议使用它,
来减少 batch 的处理时间 (降低处理过程中的 gc 开销). 如果要使用, 那么要在 driver 端和 executor 端都开启. 在 spark-submit 中使用 --driver-java-options 设置;
使用 spark.executor.extraJavaOptions 参数设置.-XX:+UseConcMarkSweepGC.
来源: http://www.bubuko.com/infodetail-3164343.html