spark 本来已经是一个分布式的计算平台, 按说不应该手工去处理并行 / 异步的事情. 但是, 最近我实现的一个 spark 任务, 需要一次写出数十个分区的数据, 虽然这些分区的数据之间完全独立, 但坑爹的是, 基础数据平台提供的写数据接口只支持同步的一次写一个分区的数据. 这样造成的结果就是, 用循环来实现时, 虽然我有很多个计算节点, 数据 (RDD) 也分布于各个节点之上, 但是我只能等一个分区写完成后, 再写下一个分区: 因为 "写分区" 这个任务的下发是同步阻塞的.
- partitions
- .map(part => writeToDisk(data.filter(part), part))
引入 Future
这里要感谢 scala 提供的 Future 方案. 它可以方便的将同步的阻塞操作包装成异步操作并行下发.
配合 Await.ready 操作来等待所有 future 完成, 我们可以将上面的代码改写为:
- partitions
- .map(part => Future { writeToDisk(data.filter(data.part == part), part) })
- .map(f => Await.ready(f, Duration.Inf))
避免 data 重复计算
在 spark 中, 我们知道使用 cache/persist 可以避免数据流的重复计算. 在这里也是一样, Future 之前需要将 data 用 cache/persist 打个点.
但是这样还! 不! 够!
在这里我们希望发生的事情是 data 在 future 之前先计算好(只计算一次), 然后异步的分发下去写对应的分区.
但是由于 spark 的惰性计算特性, 使用 Future 之后, 多个 job 并行下发, 每个 job 在执行时 data 都还没有计算出来, 也就没有 cache 的数据. 反应到 spark ui 上的 jobs 页面的情况就是, 看上去多个 job 并行执行了, 但是 cache 操作并没有带来 tasks skipped.
这时, 我们需要在 future 之前, 强制把 data 计算出来并 cache 住. 这里其实只需要调用一些不影响数据的 action 算子即可, 例如 data.count().
最终的结果, 在使用上面的改进措施之后, 我的这个 spark 任务执行时间缩短了约 60%.
来源: https://juejin.im/entry/5adaa5596fb9a07a9b35856e