Shuffle 简介
Shuffle 的本意是洗牌, 混洗的意思, 把一组有规则的数据尽量打乱成无规则的数据. 而在 MapReduce 中, Shuffle 更像是洗牌的逆过程, 指的是将 map 端的无规则输出按指定的规则 "打乱" 成具有一定规则的数据, 以便 reduce 端接收处理. 其在 MapReduce 中所处的工作阶段是 map 输出后到 reduce 接收前, 具体可以分为 map 端和 reduce 端前后两个部分.
在 shuffle 之前, 也就是在 map 阶段, MapReduce 会对要处理的数据进行分片 (split) 操作, 为每一个分片分配一个 MapTask 任务. 接下来 map 会对每一个分片中的每一行数据进行处理得到键值对 (key,value) 此时得到的键值对又叫做 "中间结果". 此后便进入 reduce 阶段, 由此可以看出 Shuffle 阶段的作用是处理 "中间结果".
由于 Shuffle 涉及到了磁盘的读写和网络的传输, 因此 Shuffle 性能的高低直接影响到了整个程序的运行效率.
MapReduce Shuffle
Hadoop 的核心思想是 MapReduce, 但 shuffle 又是 MapReduce 的核心. shuffle 的主要工作是从 Map 结束到 Reduce 开始之间的过程. shuffle 阶段又可以分为 Map 端的 shuffle 和 Reduce 端的 shuffle.
Map 端的 shuffle
下图是 MapReduce Shuffle 的官方流程:
因为频繁的磁盘 I/O 操作会严重的降低效率, 因此 "中间结果" 不会立马写入磁盘, 而是优先存储到 map 节点的 "环形内存缓冲区", 在写入的过程中进行分区(partition), 也就是对于每个键值对来说, 都增加了一个 partition 属性值, 然后连同键值对一起序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组, 默认大小为 100M).
当写入的数据量达到预先设置的阙值后便会启动溢写出线程将缓冲区中的那部分数据溢出写 (spill) 到磁盘的临时文件中, 并在写入前根据 key 进行排序 (sort) 和合并(combine, 可选操作).
溢出写过程按轮询方式将缓冲区中的内容写到 mapreduce.cluster.local.dir 属性指定的本地目录中. 当整个 map 任务完成溢出写后, 会对磁盘中这个 map 任务产生的所有临时文件 (spill 文件) 进行归并 (merge) 操作生成最终的正式输出文件, 此时的归并是将所有 spill 文件中的相同 partition 合并到一起, 并对各个 partition 中的数据再进行一次排序 (sort), 生成 key 和对应的 value-list, 文件归并时, 如果溢写文件数量超过参数 min.num.spills.for.combine 的值(默认为 3) 时, 可以再次进行合并.
至此 map 端的工作已经全部结束, 最终生成的文件也会存储在 TaskTracker 能够访问的位置. 每个 reduce task 不间断的通过 RPC 从 JobTracker 那里获取 map task 是否完成的信息, 如果得到的信息是 map task 已经完成, 那么 Shuffle 的后半段开始启动.
Reduce 端的 shuffle
当 mapreduce 任务提交后, reduce task 就不断通过 RPC 从 JobTracker 那里获取 map task 是否完成的信息, 如果获知某台 TaskTracker 上的 map task 执行完成, Shuffle 的后半段过程就开始启动. Reduce 端的 shuffle 主要包括三个阶段, copy,merge 和 reduce.
每个 reduce task 负责处理一个分区的文件, 以下是 reduce task 的处理流程:
reduce task 从每个 map task 的结果文件中拉取对应分区的数据. 因为数据在 map 阶段已经是分好区了, 并且会有一个额外的索引文件记录每个分区的起始偏移量. 所以 reduce task 取数的时候直接根据偏移量去拉取数据就 ok.
reduce task 从每个 map task 拉取分区数据的时候会进行再次合并, 排序, 按照自定义的 reducer 的逻辑代码去处理.
最后就是 Reduce 过程了, 在这个过程中产生了最终的输出结果, 并将其写到 HDFS 上.
为什么要排序
key 存在 combine 操作, 排序之后相同的 key 放到一块显然方便做合并操作.
reduce task 是按 key 去处理数据的. 如果没有排序那必须从所有数据中把当前相同 key 的所有 value 数据拿出来, 然后进行 reduce 逻辑处理. 显然每个 key 到这个逻辑都需要做一次全量数据扫描, 影响性能, 有了排序很方便的得到一个 key 对于的 value 集合.
reduce task 按 key 去处理数据时, 如果 key 按顺序排序, 那么 reduce task 就按 key 顺序去读取, 显然当读到的 key 是文件末尾的 key 那么就标志数据处理完毕. 如果没有排序那还得有其他逻辑来记录哪些 key 处理完了, 哪些 key 没有处理完.
虽有千万种理由需要这么做, 但是很耗资源, 并且像排序其实我们有些业务并不需要排序.
为什么要文件合并
因为内存放不下就会溢写文件, 就会发生多次溢写, 形成很多小文件, 如果不合并, 显然会小文件泛滥, 集群需要资源开销去管理这些小文件数据.
任务去读取文件的数增多, 打开的文件句柄数也会增多.
mapreduce 是全局有序. 单个文件有序, 不代表全局有序, 只有把小文件合并一起排序才会全局有序.
Spark 的 Shuffle
Spark 的 Shuffle 是在 MapReduce Shuffle 基础上进行的调优. 其实就是对排序, 合并逻辑做了一些优化. 在 Spark 中 Shuffle write 相当于 MapReduce 的 map,Shuffle read 相当于 MapReduce 的 reduce.
Spark 丰富了任务类型, 有些任务之间数据流转不需要通过 Shuffle, 但是有些任务之间还是需要通过 Shuffle 来传递数据, 比如宽依赖的 group by key 以及各种 by key 算子. 宽依赖之间会划分 stage, 而 Stage 之间就是 Shuffle, 如下图中的 stage0,stage1 和 stage3 之间就会产生 Shuffle.
在 Spark 的中, 负责 shuffle 过程的执行, 计算和处理的组件主要就是 ShuffleManager, 也即 shuffle 管理器. ShuffleManager 随着 Spark 的发展有两种实现的方式, 分别为 HashShuffleManager 和 SortShuffleManager, 因此 spark 的 Shuffle 有 Hash Shuffle 和 Sort Shuffle 两种.
Spark Shuffle 发展史
Spark 0.8 及以前 Hash Based Shuffle
Spark 0.8.1 为 Hash Based Shuffle 引入 File Consolidation 机制
Spark 0.9 引入 ExternalAppendOnlyMap
Spark 1.1 引入 Sort Based Shuffle, 但默认仍为 Hash Based Shuffle
Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
Spark 1.4 引入 Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort 并入 Sort Based Shuffle
Spark 2.0 Hash Based Shuffle 退出历史舞台
在 Spark 的版本的发展, ShuffleManager 在不断迭代, 变得越来越先进.
在 Spark 1.2 以前, 默认的 shuffle 计算引擎是 HashShuffleManager. 该 ShuffleManager 而 HashShuffleManager 有着一个非常严重的弊端, 就是会产生大量的中间磁盘文件, 进而由大量的磁盘 IO 操作影响了性能. 因此在 Spark 1.2 以后的版本中, 默认的 ShuffleManager 改成了 SortShuffleManager.
SortShuffleManager 相较于 HashShuffleManager 来说, 有了一定的改进. 主要就在于, 每个 Task 在进行 shuffle 操作时, 虽然也会产生较多的临时磁盘文件, 但是最后会将所有的临时文件合并 (merge) 成一个磁盘文件, 因此每个 Task 就只有一个磁盘文件. 在下一个 stage 的 shuffle read task 拉取自己的数据时, 只要根据索引读取每个磁盘文件中的部分数据即可.
Hash Shuffle
HashShuffleManager 的运行机制主要分成两种, 一种是普通运行机制, 另一种是合并的运行机制. 合并机制主要是通过复用 buffer 来优化 Shuffle 过程中产生的小文件的数量. Hash shuffle 是不具有排序的 Shuffle.
普通机制的 Hash Shuffle
最开始使用的 Hash Based Shuffle, 每个 Mapper 会根据 Reducer 的数量创建对应的 bucket,bucket 的数量是 M * R,M 是 map 的数量, R 是 Reduce 的数量.
如下图所示: 2 个 core 4 个 map task 3 个 reduce task, 会产生 4*3=12 个小文件.
优化后的 Hash Shuffle
普通机制 Hash Shuffle 会产生大量的小文件(M * R), 对文件系统的压力也很大, 也不利于 IO 的吞吐量, 后来做了优化(设置 spark.shuffle.consolidateFiles=true 开启, 默认 false), 把在同一个 core 上的多个 Mapper 输出到同一个文件, 这样文件数就变成 core * R 个了.
如下图所示: 2 个 core 4 个 map task 3 个 reduce task, 会产生 2*3=6 个小文件.
Hash shuffle 合并机制的问题:
如果 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大, 也会产生很多小文件. 进而引出了更优化的 sort shuffle.
在 Spark 1.2 以后的版本中, 默认的 ShuffleManager 改成了 SortShuffleManager.
Sort Shuffle
SortShuffleManager 的运行机制主要分成两种, 一种是普通运行机制, 另一种是 bypass 运行机制. 当 shuffle read task 的数量小于等于 spark.shuffle.sort.bypassMergeThreshold 参数的值时(默认为 200), 就会启用 bypass 机制.
普通机制的 Sort Shuffle
这种机制和 mapreduce 差不多, 在该模式下, 数据会先写入一个内存数据结构中, 此时根据不同的 shuffle 算子, 可能选用不同的数据结构. 如果是 reduceByKey 这种聚合类的 shuffle 算子, 那么会选用 Map 数据结构, 一边通过 Map 进行聚合, 一边写入内存; 如果是 join 这种普通的 shuffle 算子, 那么会选用 Array 数据结构, 直接写入内存. 接着, 每写一条数据进入内存数据结构之后, 就会判断一下, 是否达到了某个临界阈值. 如果达到临界阈值的话, 那么就会尝试将内存数据结构中的数据溢写到磁盘, 然后清空内存数据结构.
在溢写到磁盘文件之前, 会先根据 key 对内存数据结构中已有的数据进行排序. 排序过后, 会分批将数据写入磁盘文件. 默认的 batch 数量是 10000 条, 也就是说, 排序好的数据, 会以每批 1 万条数据的形式分批写入磁盘文件.
一个 task 将所有数据写入内存数据结构的过程中, 会发生多次磁盘溢写操作, 也会产生多个临时文件. 最后会将之前所有的临时磁盘文件都进行合并, 由于一个 task 就只对应一个磁盘文件因此还会单独写一份索引文件, 其中标识了下游各个 task 的数据在文件中的 start offset 与 end offset.
SortShuffleManager 由于有一个磁盘文件 merge 的过程, 因此大大减少了文件数量, 由于每个 task 最终只有一个磁盘文件所以文件个数等于上游 shuffle write 个数.
bypass 机制的 Sort Shuffle
bypass 运行机制的触发条件如下:
1)shuffle map task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值, 默认值 200.
2)不是聚合类的 shuffle 算子(比如 reduceByKey).
此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件, 并将数据按 key 进行 hash 然后根据 key 的 hash 值, 将 key 写入对应的磁盘文件之中. 当然, 写入磁盘文件时也是先写入内存缓冲, 缓冲写满之后再溢写到磁盘文件的. 最后, 同样会将所有临时磁盘文件都合并成一个磁盘文件, 并创建一个单独的索引文件.
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的, 因为都要创建数量惊人的磁盘文件, 只是在最后会做一个磁盘文件的合并而已. 因此少量的最终磁盘文件, 也让该机制相对未经优化的 HashShuffleManager 来说, shuffle read 的性能会更好.
而该机制与普通 SortShuffleManager 运行机制的不同在于:
第一, 磁盘写机制不同;
第二, 不会进行排序. 也就是说, 启用该机制的最大好处在于, shuffle write 过程中, 不需要进行数据的排序操作, 也就节省掉了这部分的性能开销.
Spark Shuffle 总结
Shuffle 过程本质上都是将 Map 端获得的数据使用分区器进行划分, 并将数据发送给对应的 Reducer 的过程.
Shuffle 作为处理连接 map 端和 reduce 端的枢纽, 其 shuffle 的性能高低直接影响了整个程序的性能和吞吐量. map 端的 shuffle 一般为 shuffle 的 Write 阶段, reduce 端的 shuffle 一般为 shuffle 的 read 阶段. Hadoop 和 spark 的 shuffle 在实现上面存在很大的不同, spark 的 shuffle 分为两种实现, 分别为 HashShuffle 和 SortShuffle.
HashShuffle 又分为普通机制和合并机制, 普通机制因为其会产生 MR 个数的巨量磁盘小文件而产生大量性能低下的 Io 操作, 从而性能较低, 因为其巨量的磁盘小文件还可能导致 OOM,HashShuffle 的合并机制通过重复利用 buffer 从而将磁盘小文件的数量降低到 CoreR 个, 但是当 Reducer 端的并行任务或者是数据分片过多的时候, 依然会产生大量的磁盘小文件.
SortShuffle 也分为普通机制和 bypass 机制, 普通机制在内存数据结构 (默认为 5M) 完成排序, 会产生 2M 个磁盘小文件. 而当 shuffle map task 数量小于 spark.shuffle.sort.bypassMergeThreshold 参数的值. 或者算子不是聚合类的 shuffle 算子 (比如 reduceByKey) 的时候会触发 SortShuffle 的 bypass 机制, SortShuffle 的 bypass 机制不会进行排序, 极大的提高了其性能.
在 Spark 1.2 以前, 默认的 shuffle 计算引擎是 HashShuffleManager, 因为 HashShuffleManager 会产生大量的磁盘小文件而性能低下, 在 Spark 1.2 以后的版本中, 默认的 ShuffleManager 改成了 SortShuffleManager.
SortShuffleManager 相较于 HashShuffleManager 来说, 有了一定的改进. 主要就在于, 每个 Task 在进行 shuffle 操作时, 虽然也会产生较多的临时磁盘文件, 但是最后会将所有的临时文件合并 (merge) 成一个磁盘文件, 因此每个 Task 就只有一个磁盘文件. 在下一个 stage 的 shuffle read task 拉取自己的数据时, 只要根据索引读取每个磁盘文件中的部分数据即可.
Spark 与 MapReduce Shuffle 的异同
从整体功能上看, 两者并没有大的差别. 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition, 不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask, 也可能是 ResultTask).Reducer 以内存作缓冲区, 边 shuffle 边 aggregate 数据, 等到数据 aggregate 好以后进行 reduce(Spark 里可能是后续的一系列操作).
从流程的上看, 两者差别不小. Hadoop MapReduce 是 sort-based, 进入 combine 和 reduce 的 records 必须先 sort. 这样的好处在于 combine/reduce 可以处理大规模的数据, 因为其输入数据可以通过外排得到(mapper 对每段数据先做排序, reducer 的 shuffle 对排好序的每段数据做归并). 以前 Spark 默认选择的是 hash-based, 通常使用 HashMap 来对 shuffle 来的数据进行合并, 不会对数据进行提前排序. 如果用户需要经过排序的数据, 那么需要自己调用类似 sortByKey 的操作. 在 Spark 1.2 之后, sort-based 变为默认的 Shuffle 实现.
从流程实现角度来看, 两者也有不少差别. Hadoop MapReduce 将处理流程划分出明显的几个阶段: map, spill, merge, shuffle, sort, reduce 等. 每个阶段各司其职, 可以按照过程式的编程思想来逐一实现每个阶段的功能. 在 Spark 中, 没有这样功能明确的阶段, 只有不同的 stage 和一系列的 transformation, 所以 spill, merge, aggregate 等操作需要蕴含在 transformation 中.
来源: https://www.cnblogs.com/xiaodf/p/10650921.html