MapReduce 工作流程
1. 准备待处理文件
2.job 提交前生成一个处理规划
3. 将切片信息 job.split, 配置信息 job.xml 和我们自己写的 jar 包交给 yarn
4.yarn 根据切片规划计算出 MapTask 的数量
(以一个 MapTask 为例)
5.Maptask 调用 inputFormat 生成 RecordReader, 将自己处理的切片文件内容打散成 K,V 值
6.MapTask 将打散好的 K,V 值交给 Mapper,Mapper 经过一系列的处理将 KV 值写出
7. 写出的 KV 值被 outputCollector 收集起来, 写入一个在内存的环形缓冲区
8,9. 当环形缓冲区被写入的空间等于 80% 时, 会触发溢写. 此时数据是在内存中, 所以在溢写之前, 会对数据进行排序, 是一个二次排序的快排 (先根据分区排序再根据 key 排序). 然后将数据有序的写入到磁盘上.
缓冲区为什么是环形的? 这样做是为了可以在缓冲区的任何地方进行数据的写入.
当第一次溢写时, 数据会从余下的 20% 空间中的中间位置, 再分左右继续写入, 也就是从第一次是从上往下写数据变成了从下往上写数据
10,11. 当多次溢写产生多个有序的文件后, 会触发归并排序, 将多个有序的文件合并成一个有序的大文件. 当文件数 >=10 个时, 会触发归并排序, 取文件的一小部分放入内存的缓冲区, 再生成一个小文件部分大小 x 文件数的缓冲区, 逐个比较放入大文件缓冲区, 依次比较下去, 再将大文件缓冲写入到磁盘, 归并结束后将大文件放在文件列表的末尾, 继续重复此动作, 直到合并成一个大文件. 此次归并排序的时间复杂度要求较低.
12. 当所有的 MapTask 执行完任务后, 启动相应数量的 ReduceTask, 并告知每一个 ReduceTask 应该处理的数据分区
13.ReduceTask 将指定分区的文件下载到本地, 如有多个分区文件的话, ReduceTask 上将会有多个大文件, 再一次归并排序, 形成一个大文件.
14.15, 如果有分组要求的话, ReduceTask 会将数据一组一组的交给 Reduce, 处理完后准备将数据写出
16.Reduce 调用 output 生成 RecordWrite 将数据写入到指定路径中
shuffle 机制
上图中, 数据从 Mapper 写出来之后到数据进入到 Reduce 之前, 这一阶段就叫做 Shuffle
Shuffle 时, 会有三次排序, 第一次是数据从环形缓冲区写入到磁盘时, 会有一次快排, 第二次是在 MapTask 中, 将多个分区且内部有序的小文件归并成一个分区且内部有序的大文件, 第三次是在 ReduceTask 中, 从多个 MapTask 中获取指定分区的大文件, 再进行一个归并排序, 合并成一个大文件.
以 WordCount 为例, 试想一下, 在第一次从环形缓冲区写入到磁盘时, 排好序的数据为 (w1,1),(w1,1),(w1,1),(w2,1),(w2,1),(w3,1), 这样的数据会增加网络传输量, 所以在这里可以使用 Combiner 进行数据合并. 最后形成的数据是 (w1,3),(w2,2),(w3,1), 后续会详细讲解~
partition 分区
将 Mapper 想象成一个水池, 数据是池里的水. 默认分一个去, 只有一根水管. 如果只有一个 ReduceTask, 则水会全部顺着唯一的水管流入到 ReduceTask 中. 如果此时有 3 根水管, 则水会被分成三股水流流入到 3 个 ReduceTask 中, 而且哪些水进哪个水管, 并不受我们主管控制, 也就是数据处理速度加快了~~Partition 分区就决定了分几根水管. 试想一下, 如果有 4 根水管, 末端只有 3 个 ReduceTask, 那么有一股水流会丢失. 也就是造成数据丢失, 程序会报错. 如果只有 2 根水管, 那么则有一个 ReduceTask 无事可做, 最后生成的是一个空文件, 浪费资源
所以, 一般来说, 有几个 ReduceTask 就要分几个区, 至于 partition 和 ReduceTask 设置为几, 要看集群性能, 数据集, 业务, 经验等等~
对应流程图上, 也就是从环形缓冲区写入到磁盘时, 会分区
collector 出现了, 除了将 key,value 收集到缓冲区中之外, 还收集了 partition 分区
key.hashCode() & Integer.MAX_VALUE, 保证取余前的数为正数
比如, numReduceTasks = 3, 一个数 n 对 3 取余, 结果会有 0,1,2 三种可能, 也就是分三个区, 再一次印证了要 reduceTask number = partition number
默认分区是根据 key 的 hashcode 和 reduceTasks 的个数取模得到的, 用户无法控制哪个 key 存储到哪个分区上
案例演练
以 12 小章的统计流量案例为例, 大数据 - Hadoop 生态 (12)-Hadoop 序列化和源码追踪
将手机号 136,137,138,139 开头都分别放到一个独立的 4 个文件中, 其他开头的放到一个文件中
自定义 Partition 类
- package com.atguigu.partitioner;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Partitioner;
- public class MyPartitioner extends Partitioner<Text, FlowBean> {
- public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
- //1. 截取手机前三位
- String start = text.toString().substring(0, 3);
- //2. 按照手机号前三位返回分区号
- switch (start) {
- case "136":
- return 0;
- case "137":
- return 1;
- case "138":
- return 2;
- case "139":
- return 3;
- default:
- return 4;
- }
- }
- }
Driver 类的 main() 中增加以下代码
- job.setPartitionerClass(MyPartitioner.class);
- job.setNumReduceTasks(5);
输出结果, 5 个文件
如果 job.setNumReduceTasks(10), 会生成 10 个文件, 其中 5 个是空文件
如果 job.setNumReduceTasks(2), 程序会直接执行失败报异常
如果 job.setNumReduceTasks(1), 程序会运行成功, 因为如果 numReduceTasks=1 时, 根本就不会执行分区的过程
如果是以下情况, 也会执行失败. MapReduce 会认为你分了 41 个区, 所以分区号必须从 0 开始, 逐一累加.
- job.setNumReduceTasks(5)
- switch (start) {
- case "136":
- return 0;
- case "137":
- return 1;
- case "138":
- return 2;
- case "139":
- return 3;
- default:
- return 40;
- }
来源: https://www.cnblogs.com/duoduotouhenying/p/10104552.html