[使用场景]
对 RDD 执行 reduceByKey 等聚合类 shuffle 算子或者在 Spark SQL 中使用 group by 语句进行分组聚合时, 经过 sample 或日志, 界面定位, 发生了数据倾斜.
[解决方案]
局部聚合 + 全局聚合, 进行两阶段聚合. 具体为:
将原本相同的 key 通过附加随机前缀的方式, 变成多个不同的 key, 就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合, 进而解决单个 task 处理数据量过多的问题. 接着去除掉随机前缀, 再次进行全局聚合, 就可以得到最终的结果.
第一步: 给 key 倾斜的 dataSkewRDD 中每个 key 都打上一个随机前缀.
例如 10 以内的随机数, 此时原先一样的 key, 包括集中倾斜的 key 就变成不一样的了, 比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1), 就会变成 (5_hello, 1) (3_hello, 1) (3_hello, 1) (5_hello, 1) (8_hello, 1) (5_hello, 1) ...
第二步: 对打上随机前缀的 key 不再倾斜的 randomPrefixRdd 进行局部聚合.
接着对打上随机数后的数据, 执行 reduceByKey 等聚合操作, 进行局部聚合时, 就不会数据倾斜了. 此时, 第一步局部聚合的结果, 变成了 (5_hello, 3) (3_hello, 2) (8_hello, 1)
第三步: 局部聚合后, 去除 localAggRdd 中每个 key 的随机前缀.
此时, 第二步局部聚合的结果, 变成了 (hello, 3) (hello, 2) (hello, 1)
第四步: 对去除了随机前缀的 removeRandomPrefixRdd 进行全局聚合.
得到最终结果 (hello, 6)
[方案优点]
对于聚合类的 shuffle 操作导致的数据倾斜, 效果不错, 通常都可以解决数据倾斜问题, 至少大幅缓解数据倾斜, 将 Spark 作业的性能提升数倍以上.
[代码实现]
我对上述方案做了代码实现, 见我的 GitHub:https://github.com/wwcom614/Spark
Java 版实现
Scala 版实现
来源: https://www.cnblogs.com/wwcom123/p/10582146.html