本文旨在剖析 Hadoop 和 Spark 的 Shuffle 过程, 并对比两者 Shuffle 的差异. Shuffle 描述的是数据从 Map 端到 Reduce 端的过程, 大数据学习 kou 群 74 零零加 [41 三八 yi] 大致分为排序(sort), 溢写(spill), 合并(merge), 拉取拷贝(Copy), 合并排序(merge sort) 这几个过程.
收藏
分享
一, 前言
对于基于 MapReduce 编程范式的分布式计算来说, 本质上而言, 就是在计算数据的交, 并, 差, 聚合, 排序等过程. 而分布式计算分而治之的思想, 让每个节点只计算部分数据, 也就是只处理一个分片, 那么要想求得某个 key 对应的全量数据, 那就必须把相同 key 的数据汇集到同一个 Reduce 任务节点来处理, 那么 Mapreduce 范式定义了一个叫做 Shuffle 的过程来实现这个效果.
二, 编写本文的目的
本文旨在剖析 Hadoop 和 Spark 的 Shuffle 过程, 并对比两者 Shuffle 的差异.
三, Hadoop 的 Shuffle 过程
Shuffle 描述的是数据从 Map 端到 Reduce 端的过程, 大数据学习 kou 群 74 零零加 [41 三八 yi] 大致分为排序(sort), 溢写(spill), 合并(merge), 拉取拷贝(Copy), 合并排序(merge sort) 这几个过程, 大体流程如下:
![image](https://yqfile.alicdn.com/e4ccedfb6ccaaa0d3c0ad5b3b7ab83d96dd9fed2.png)
上图的 Map 的输出的文件被分片为红绿蓝三个分片, 这个分片的就是根据 Key 为条件来分片的, 分片算法可以自己实现, 例如 Hash,Range 等, 最终 Reduce 任务只拉取对应颜色的数据来进行处理, 就实现把相同的 Key 拉取到相同的 Reduce 节点处理的功能. 下面分开来说 Shuffle 的的各个过程.
Map 端做了下图所示的操作:
1,Map 端 sort
Map 端的输出数据, 先写环形缓存区 kvbuffer, 当环形缓冲区到达一个阀值(可以通过配置文件设置, 默认 80), 便要开始溢写, 但溢写之前会有一个 sort 操作, 这个 sort 操作先把 Kvbuffer 中的数据按照 partition 值和 key 两个关键字来排序, 移动的只是索引数据, 排序结果是 Kvmeta 中数据按照 partition 为单位聚集在一起, 同一 partition 内的按照 key 有序.
2,spill(溢写)
当排序完成, 便开始把数据刷到磁盘, 刷磁盘的过程以分区为单位, 一个分区写完, 写下一个分区, 分区内数据有序, 最终实际上会多次溢写, 然后生成多个文件
3,merge(合并)
spill 会生成多个小文件, 对于 Reduce 端拉取数据是相当低效的, 那么这时候就有了 merge 的过程, 合并的过程也是同分片的合并成一个片段(segment), 最终所有的 segment 组装成一个最终文件, 那么合并过程就完成了, 如下图所示
至此, Map 的操作就已经完成, Reduce 端操作即将登场
Reduce 操作
总体过程如下图的红框处:
![image](https://yqfile.alicdn.com/71a52ed4799d3dbbde4552028f3aea05bc1c98c0.png)
1, 拉取拷贝(fetch copy)
Reduce 任务通过向各个 Map 任务拉取对应分片. 这个过程都是以 Http 协议完成, 每个 Map 节点都会启动一个常驻的 HTTP server 服务, Reduce 节点会请求这个 Http Server 拉取数据, 这个过程完全通过网络传输, 所以是一个非常重量级的操作.
2, 合并排序
Reduce 端, 拉取到各个 Map 节点对应分片的数据之后, 会进行再次排序, 排序完成, 结果丢给 Reduce 函数进行计算.
四, 总结
至此整个 shuffle 过程完成, 最后总结几点:
shuffle 过程就是为了对 key 进行全局聚合
排序操作伴随着整个 shuffle 过程, 所以 Hadoop 的 shuffle 是 sort-based 的
Spark shuffle 相对来说更简单, 因为不要求全局有序, 所以没有那么多排序合并的操作. Spark shuffle 分为 write 和 read 两个过程. 我们先来看 shuffle write.
一, shuffle write
shuffle write 的处理逻辑会放到该 ShuffleMapStage 的最后(因为 spark 以 shuffle 发生与否来划分 stage, 也就是宽依赖),final RDD 的每一条记录都会写到对应的分区缓存区 bucket, 如下图所示:
说明:
上图有 2 个 CPU, 可以同时运行两个 ShuffleMapTask
每个 task 将写一个 buket 缓冲区, 缓冲区的数量和 reduce 任务的数量相等
每个 buket 缓冲区会生成一个对应 ShuffleBlockFile
ShuffleMapTask 如何决定数据被写到哪个缓冲区呢? 这个就是跟 partition 算法有关系, 这个分区算法可以是 hash 的, 也可以是 range 的
最终产生的 ShuffleBlockFile 会有多少呢? 就是 ShuffleMapTask 数量乘以 reduce 的数量, 这个是非常巨大的
那么有没有办法解决生成文件过多的问题呢? 有, 开启 FileConsolidation 即可, 开启 FileConsolidation 之后的 shuffle 过程如下:
在同一核 CPU 执行先后执行的 ShuffleMapTask 可以共用一个 bucket 缓冲区, 然后写到同一份 ShuffleFile 里去, 上图所示的 ShuffleFile 实际上是用多个 ShuffleBlock 构成, 那么, 那么每个 worker 最终生成的文件数量, 变成了 CPU 核数乘以 reduce 任务的数量, 大大缩减了文件量.
二, Shuffle read
Shuffle write 过程将数据分片写到对应的分片文件, 这时候万事具备, 只差去拉取对应的数据过来计算了.
那么 Shuffle Read 发送的时机是什么? 是要等所有 ShuffleMapTask 执行完, 再去 fetch 数据吗? 理论上, 只要有一个 ShuffleMapTask 执行完, 就可以开始 fetch 数据了, 实际上, spark 必须等到父 stage 执行完, 才能执行子 stage, 所以, 必须等到所有 ShuffleMapTask 执行完毕, 才去 fetch 数据. fetch 过来的数据, 先存入一个 Buffer 缓冲区, 所以这里一次性 fetch 的 FileSegment 不能太大, 当然如果 fetch 过来的数据大于每一个阀值, 也是会 spill 到磁盘的.
fetch 的过程过来一个 buffer 的数据, 就可以开始聚合了, 这里就遇到一个问题, 每次 fetch 部分数据, 怎么能实现全局聚合呢? 以 Word count 的 reduceByKey(《Spark RDD 操作之 ReduceByKey 》)为例, 假设单词 hello 有十个, 但是一次 fetch 只拉取了 2 个, 那么怎么全局聚合呢? Spark 的做法是用 HashMap, 聚合操作实际上是 map.put(key,map.get(key)+1), 将 map 中的聚合过的数据 get 出来相加, 然后 put 回去, 等到所有数据 fetch 完, 也就完成了全局聚合.
三, 总结
Hadoop 的 MapReduce Shuffle 和 Spark Shuffle 差别总结如下:
Hadoop 的有一个 Map 完成, Reduce 便可以去 fetch 数据了, 不必等到所有 Map 任务完成, 而 Spark 的必须等到父 stage 完成, 也就是父 stage 的 map 操作全部完成才能去 fetch 数据.
Hadoop 的 Shuffle 是 sort-base 的, 那么不管是 Map 的输出, 还是 Reduce 的输出, 都是 partion 内有序的, 而 spark 不要求这一点.
Hadoop 的 Reduce 要等到 fetch 完全部数据, 才将数据传入 reduce 函数进行聚合, 而 spark 是一边 fetch 一边聚合.
来源: http://bigdata.51cto.com/art/201904/595364.htm