概述
Shuffle, 翻译成中文就是洗牌. 之所以需要 Shuffle, 还是因为具有某种共同特征的一类数据需要最终汇聚 (aggregate) 到一个计算节点上进行计算. 这些数据分布在各个存储节点上并且由不同节点的计算单元处理. 以最简单的 Word?Count 为例, 其中数据保存在 Node1,Node2 和 Node3;
? ?
经过处理后, 这些数据最终会汇聚到 Nodea,Nodeb 处理, 如下图所示.
这个数据重新打乱然后汇聚到不同节点的过程就是 Shuffle. 但是实际上, Shuffle 过程可能会非常复杂:
1)数据量会很大, 比如单位为 TB 或 PB 的数据分散到几百甚至数千, 数万台机器上.
2)为了将这个数据汇聚到正确的节点, 需要将这些数据放入正确的 Partition, 因为数据大小已经大于节点的内存, 因此这个过程中可能会发生多次硬盘续写.
3)为了节省带宽, 这个数据可能需要压缩, 如何在压缩率和压缩解压时间中间
做一个比较好的选择?
4)数据需要通过网络传输, 因此数据的序列化和反序列化也变得相对复杂.
? ?
一般来说, 每个 Task 处理的数据可以完全载入内存(如果不能, 可以减小每个 Partition 的大小), 因此 Task 可以做到在内存中计算. 但是对于 Shuffle 来说, 如果不持久化这个中间结果, 一旦数据丢失, 就需要重新计算依赖的全部 RDD, 因此有必要持久化这个中间结果. 所以这就是为什么 Shuffle 过程会产生文件的原因.
如果 Shuffle 过程不落地,1可能会造成内存溢出 2当某分区丢失时, 会重新计算所有父分区数据
- ? ?
- Shuffle Write
Shuffle?Write, 即数据是如何持久化到文件中, 以使得下游的 Task 可以获取到其需要处理的数据的(即 Shuffle?Read). 在 Spark 0.8 之前, Shuffle Write 是持久化到缓存的, 但后来发现实际应用中, shuffle 过程带来的数据通常是巨量的, 所以经常会发生内存溢出的情况, 所以在 Spark?0.8 以后, Shuffle?Write 会将数据持久化到硬盘, 再之后 Shuffle?Write 不断进行演进优化, 但是数据落地到本地文件系统的实现并没有改变.
- ? ?
- 1)Hash?Based?Shuffle?Write
在 Spark?1.0 以前, Spark 只支持 Hash?Based?Shuffle. 因为在很多运算场景中并不需要排序, 因此多余的排序只能使性能变差, 比如 Hadoop 的 Map?Reduce 就是这么实现的, 也就是 Reducer 拿到的数据都是已经排好序的. 实际上 Spark 的实现很简单: 每个 Shuffle?Map?Task 根据 key 的哈希值, 计算出每个 key 需要写入的 Partition 然后将数据单独写入一个文件, 这个 Partition 实际上就对应了下游的一个 Shuffle?Map?Task 或者 Result?Task. 因此下游的 Task 在计算时会通过网络 (如果该 Task 与上游的 Shuffle?Map?Task 运行在同一个节点上, 那么此时就是一个本地的硬盘读写) 读取这个文件并进行计算.
? ?
? ?
Hash?Based?Shuffle?Write 存在的问题
由于每个 Shuffle?Map?Task 需要为每个下游的 Task 创建一个单独的文件, 因此文件的数量就是:
- number(shuffle_map_task)*number(result_task).
- ? ?
如果 Shuffle?Map?Task 是 1000, 下游的 Task 是 500, 那么理论上会产生 500000 个文件(对于 size 为 0 的文件 Spark 有特殊的处理). 生产环境中 Task 的数量实际上会更多, 因此这个简单的实现会带来以下问题:
1)每个节点可能会同时打开多个文件, 每次打开文件都会占用一定内存. 假设每个 Write?Handler 的默认需要 100KB 的内存, 那么同时打开这些文件需要 50GB 的内存, 对于一个集群来说, 还是有一定的压力的. 尤其是如果 Shuffle?Map?Task 和下游的 Task 同时增大 10 倍, 那么整体的内存就增长到 5TB.
? ?
2)从整体的角度来看, 打开多个文件对于系统来说意味着随机读, 尤其是每个文件比较小但是数量非常多的情况. 而现在机械硬盘在随机读方面的性能特别差, 非常容易成为性能的瓶颈. 如果集群依赖的是固态硬盘, 也许情况会改善很多, 但是随机写的性能肯定不如顺序写的.
- ? ?
- 2)Sort?Based Shuffle?Write
在 Spark?1.2.0 中, Spark?Core 的一个重要的升级就是将默认的 Hash?Based?Shuffle 换成了 Sort?Based?Shuffle, 即 spark.shuffle.manager 从 Hash 换成了 Sort, 对应的实现类分别是 org.apache.spark.shuffle.hash.HashShuffleManager 和 org.apache.spark.shuffle.sort.SortShuffleManager.
那么 Sort?Based?Shuffle"取代"Hash?Based?Shuffle 作为默认选项的原因是什么?
正如前面提到的, Hash?Based?Shuffle 的每个 Mapper 都需要为每个 Reducer 写一个文件, 供 Reducer 读取, 即需要产生 M*R 个数量的文件, 如果 Mapper 和 Reducer 的数量比较大, 产生的文件数会非常多.
而 Sort?Based?Shuffle 的模式是: 每个 Shuffle?Map?Task 不会为每个 Reducer 生成一个单独的文件; 相反, 它会将所有的结果写到一个文件里, 同时会生成一个 Index 文件,
? ?
Reducer 可以通过这个 Index 文件取得它需要处理的数据. 避免产生大量文件的直接收益就是节省了内存的使用和顺序 Disk?IO 带来的低延时. 节省内存的使用可以减少 GC 的风险和频率. 而减少文件的数量可以避免同时写多个文件给系统带来的压力.
? ?
? ?
Sort?Based?Write 实现详解
Shuffle?Map?Task 会按照 key 相对应的 Partition?ID 进行 Sort, 其中属于同一个 Partition 的 key 不会 Sort. 因为对于不需要 Sort 的操作来说, 这个 Sort 是负收益的; 要知道之前 Spark 刚开始使用 Hash?Based 的 Shuffle 而不是 Sort?Based 就是为了避免 Hadoop?Map?Reduce 对于所有计算都会 Sort 的性能损耗. 对于那些需要 Sort 的运算, 比如 sortByKey, 这个 Sort 在 Spark?1.2.0 里还是由 Reducer 完成的.
? ?
? ?
1答出 shuffle 的定义
2spark shuffle 的特点
3spark shuffle 的目的
4spark shuffel 的实现类, 即对应优缺点
.
来源: http://www.bubuko.com/infodetail-3102512.html