一, MapReduce 编程思想
学些 MapRedcue 主要是学习它的编程思想, 在 MR 的编程模型中, 主要思想是把对数据的运算流程分成 map 和 reduce 两个阶段:
Map 阶段: 读取原始数据, 形成 key-value 数据 (map 方法)
Reduce 阶段: 把 map 阶段的 key-value 数据按照相同的 key 进行分组聚合 (reduce 方法)
它其实是一种数据逻辑运算模型, 对于这样的运算模型, 有一些成熟的具体软件实现, 比如 hadoop 中的 mapreduce 框架, spark 等, 例如在 hadoop 的 mr 框架中, 对 map 阶段的具体实现是 map task, 对 reduce 阶段的实现是 reduce task. 这些框架已经为我们提供了一些通用功能的实现, 让我们专注于数据处理的逻辑, 而不考虑分布式的具体实现, 比如读取文件, 写文件, 数据分发等. 我们要做的工作就是在这些编程框架下, 来实现我们的具体需求.
下面我们先介绍一些 map task 和 reduce task 中的一些具体实现:
二, MapTask 和 ReduceTask
2.1 Map Task
读数据: 利用 InputFormat 组件完成数据的读取.
InputFormat-->TextInputFormat 读取文本文件的具体实现
-->SequenceFileInputFormat 读取 Sequence 文件
-->DBInputFormat 读数据库
处理数据: 这一阶段将读取到的数据按照规则进行处理, 生成 key-value 形式的结果. maptask 通过调用用 Mapper 类的 map 方法实现对数据的处理.
分区: 这一阶段主要是把 map 阶段产生的 key-value 数据进行分区, 以分发给不同的 reduce task 来处理, 使用的是 Partitioner 类. maptask 通过调用 Partitioner 类的 getPartition() 方法来决定如何划分数据给不同的 reduce task.
排序: 这一阶段, 对 key-value 数据做排序. maptask 会按照 key 对数据进行排序, 排序时调用 key.compareTo() 方法来实现对 key-value 数据排序.
2.2 Reduce Task
读数据: 这一阶段通过 http 方式从 maptask 产生的数据文件中下载属于自己的 "区" 的数据. 由于一个区的数据可能来自多个 maptask, 所以 reduce 还要把这些分散的数据进行合并 (归并排序)
处理数据: 一个 reduce task 中, 处理刚才下载到自己本地的数据. 通过调用 GroupingComparator 的 compare() 方法来判断文件中的哪些 key-value 属于同一组. 然后将这一组数传给 Reducer 类的 reduce() 方法聚合一次.
输出结果: 调用 OutputFormat 组件将结果 key-value 数据写出去.
Outputformat --> TextOutputFormat 写文本文件 (会把一个 key-value 对写一行, 分隔符为制表符 \ t
--> SequenceFileOutputFormat 写 Sequence 文件 (直接将 key-value 对象序列化到文件中)
--> DBOutputFormat
下面介绍下利用 MapReduce 框架下的一般编程过程. 我们要做的 工作就是把我们对数据的处理逻辑加入到框架的业务逻辑中. 我们编写的 MapReduce 的 job 客户端主要包括三个部分, Mapper , Reducer 和 JobSubmitter, 三个部分分别完成 MR 程序的 map 逻辑, reduce 逻辑以及将我们编写的 job 程序提交给集群. 下面分别介绍这三个部分如何实现.
三, Hadoop 中 MapReduce 框架下的一般编程步骤
Mapper: 创建类, 该类要实现 Mapper 父类, 复写 read() 方法, 在方法内实现当前工程中的 map 逻辑.
Reducer: 创建类, 继承 Reducer 父类, 复写 reduce() 方法, 方法内实现当前工程中的 reduce 逻辑.
jobSubmitter: 这是 job 在集群上实际运行的类, 主要是通过 main 方法, 封装 job 相关参数, 并把 job 提交. jobsubmitter 内一般包括以下操作
step1: 创建 Configuration 对象, 并通过创建的对象对集群进行配置, 同时支持用户自定义一些变量并配置. 这一步有些像我们集群搭建的时候对 $haoop_home/etc/hadoop/* 下的一些文件进行的配置.
step2: 获得 job 对象, 并通过 job 对象对我们 job 运行进行一些配置. 例如, 设置集群运行的 jar 文件, 设置实际执行 map 和 reduce 的类等, 下面列出一些必要设置和可选设置.
- Configuration conf = new Configuration(); // 创建集群配置对象.
- Job job = Job.getInstance(conf);// 根据配置对象获取一个 job 客户端实例.
- job.setJarByClass(JobSubmitter.class);// 设置集群上 job 执行的类
- job.setMapperClass(FlowCountMapper.class);// 设置 job 执行时使用的 Mapper 类
- job.setReducerClass(FlowCountReducer.class);// 设置 job 执行时使用的 Reducer 类
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(FlowBean.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
- FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));
- FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));
- // 设置 maptask 做数据分发时使用的分发逻辑类, 如果不指定, 默认使用 hashpar
- job.setPartitionerClass(ProvincePartitioner.class);
- job.setNumReduceTasks(4);// 自定义的分发逻辑下, 可能产生 n 个分区, 所以 reducetask 的数量需要是 n
- boolean res = job.waitForCompletion(true);
- System.exit(res ? 0:-1);
一般实践中, 可以定义一个类, 其中添加 main 方法对 job 进行提交, 并在其中定义静态内部类 maper 和 reduce 类.
四, MapReduce 框架中的可自定义项
遇到一些复杂的需求, 需要我们自定义实现一些组件
2.1 自定义排序规则
2.2 自定义序列化数据类型
五, MR 程序的调试, 执行方式
5.1 提交到 Linux 运行
5.2 Win 本地执行
来源: http://www.bubuko.com/infodetail-3064167.html