@
目录
问题引出
默认 Partitioner 分区
自定义 Partitioner 步骤
Partition 分区案例实操
分区总结
问题引出
要求将统计结果按照条件输出到不同文件中(分区).
比如: 将统计结果按照手机归属地不同省份输出到不同文件中(分区)
默认 Partitioner 分区
- public class HashPartitioner<K,V> extends Partitioner<K,V>{
- public int getPartition(K key,V value, int numReduceTasks){
- return (key.hashCode() & Integer.MAX VALUE) & numReduceTasks;
- }
- }
默认分区是根据 key 的 hashCode 对 ReduceTasks 个数取模得到的.
用户没法控制哪个 key 存储到哪个分区.
自定义 Partitioner 步骤
自定义类继承 Partitioner, 重写 getPartition()方法
- public class CustomPartitioner extends Partitioner<Text,FlowBea>{
- @Override
- public int getPartition(Text key,FlowBean value,int numPartitions){
- // 控制分区代码逻辑
- ......
- return partition;
- }
- }
在 Job 驱动类中, 设置自定义 Partitioner
job.setPartitionerClass(CustomPartitioner.class)
自定义 Partition 后, 要根据自定义 Partitioner 的逻辑设置相应数量的 ReduceTask
job.setNumReduceTask(5);// 假设需要分 5 个区
Partition 分区案例实操
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
输入数据:
期望输出数据:
手机号 136,137,138,139 开头都分别放到一个独立的 4 个文件中, 其他开头的放到一个文件中. 所以总共分为 5 个文件, 也就是五个区.
相比于之前的自定义 flowbean, 这次自定义分区, 只需要多编写一个分区器, 以及在 job 驱动类中设置分区器, mapper 和 reducer 类不改变
MyPartitioner.java
- /*
- * KEY, VALUE: Mapper 输出的 Key-value 类型
- */
- public class MyPartitioner extends Partitioner<Text, FlowBean>{
- // 计算分区 numPartitions 为总的分区数, reduceTask 的数量
- // 分区号必须为 int 型的值, 且必须符合 0<= partitionNum <numPartitions
- @Override
- public int getPartition(Text key, FlowBean value, int numPartitions) {
- String suffix = key.toString().substring(0, 3);// 前开后闭, 取手机号前三位数
- int partitionNum=0;// 分区编号
- switch (suffix) {
- case "136":
- partitionNum=numPartitions-1;// 由于分区编号不能大于分区总数, 所以用这种方法比较好
- break;
- case "137":
- partitionNum=numPartitions-2;
- break;
- case "138":
- partitionNum=numPartitions-3;
- break;
- case "139":
- partitionNum=numPartitions-4;
- break;
- default:
- break;
- }
- return partitionNum;
- }
- }
FlowBeanDriver.java
- public class FlowBeanDriver {
- public static void main(String[] args) throws Exception {
- Path inputPath=new Path("e:/mrinput/flowbean");
- Path outputPath=new Path("e:/mroutput/partitionflowbean");
- // 作为整个 Job 的配置
- Configuration conf = new Configuration();
- // 保证输出目录不存在
- FileSystem fs=FileSystem.get(conf);
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
- // 1创建 Job
- Job job = Job.getInstance(conf);
- // 2设置 Job
- // 设置 Job 运行的 Mapper,Reducer 类型, Mapper,Reducer 输出的 key-value 类型
- job.setMapperClass(FlowBeanMapper.class);
- job.setReducerClass(FlowBeanReducer.class);
- // Job 需要根据 Mapper 和 Reducer 输出的 Key-value 类型准备序列化器, 通过序列化器对输出的 key-value 进行序列化和反序列化
- // 如果 Mapper 和 Reducer 输出的 Key-value 类型一致, 直接设置 Job 最终的输出类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FlowBean.class);
- // 设置输入目录和输出目录
- FileInputFormat.setInputPaths(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
- // 设置 ReduceTask 的数量为 5
- job.setNumReduceTasks(5);
- // 设置使用自定义的分区器
- job.setPartitionerClass(MyPartitioner.class);
- // 3运行 Job
- job.waitForCompletion(true);
- }
- }
FlowBeanMapper.java
- /*
- * 1. 统计手机号 (String) 的上行(long,int), 下行(long,int), 总流量(long,int)
- *
- * 手机号为 key,Bean{上行 (long,int), 下行(long,int), 总流量(long,int)} 为 value
- *
- *
- *
- *
- */
- public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
- private Text out_key=new Text();
- private FlowBean out_value=new FlowBean();
- // (0,1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200)
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
- throws IOException, InterruptedException {
- String[] words = value.toString().split("\t");
- // 封装手机号
- out_key.set(words[1]);
- // 封装上行
- out_value.setUpFlow(Long.parseLong(words[words.length-3]));
- // 封装下行
- out_value.setDownFlow(Long.parseLong(words[words.length-2]));
- context.write(out_key, out_value);
- }
- }
FlowBeanReducer.java
- public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
- private FlowBean out_value=new FlowBean();
- @Override
- protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
- throws IOException, InterruptedException {
- long sumUpFlow=0;
- long sumDownFlow=0;
- for (FlowBean flowBean : values) {
- sumUpFlow+=flowBean.getUpFlow();
- sumDownFlow+=flowBean.getDownFlow();
- }
- out_value.setUpFlow(sumUpFlow);
- out_value.setDownFlow(sumDownFlow);
- out_value.setSumFlow(sumDownFlow+sumUpFlow);
- context.write(key, out_value);
- }
- }
FlowBean.java
- public class FlowBean implements Writable{
- private long upFlow;
- private long downFlow;
- private long sumFlow;
- public FlowBean() {
- }
- public long getUpFlow() {
- return upFlow;
- }
- public void setUpFlow(long upFlow) {
- this.upFlow = upFlow;
- }
- public long getDownFlow() {
- return downFlow;
- }
- public void setDownFlow(long downFlow) {
- this.downFlow = downFlow;
- }
- public long getSumFlow() {
- return sumFlow;
- }
- public void setSumFlow(long sumFlow) {
- this.sumFlow = sumFlow;
- }
- // 序列化 在写出属性时, 如果为引用数据类型, 属性不能为 null
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeLong(upFlow);
- out.writeLong(downFlow);
- out.writeLong(sumFlow);
- }
- // 反序列化 序列化和反序列化的顺序要一致
- @Override
- public void readFields(DataInput in) throws IOException {
- upFlow=in.readLong();
- downFlow=in.readLong();
- sumFlow=in.readLong();
- }
- @Override
- public String toString() {
- return upFlow + "\t" + downFlow + "\t" + sumFlow;
- }
- }
输出结果:
总共五个文件
一号区:
二号区:
三号区:
四号区:
其他号码为第五号区:
分区总结
如果
ReduceTask 的数量> getPartition 的结果数
, 则会多产生几个空的输出文件 part-r-000xx
如果
Reduceask 的数量 < getPartition 的结果数
, 则有一部分分区数据无处安放, 会 Exception
如果
ReduceTask 的数量 = 1
, 则不管 MapTask 端输出多少个分区文件, 最终结果都交给这一个 ReduceTask, 最终也就只会产生一个结果文件 partr-00000
以刚才的案例分析:
例如: 假设自定义分区数为 5, 则
job.setlNlurmReduce Task(1); 会正常运行, 只不过会产生一个输出文件
job.setlNlunReduce Task(2), 会报错
job.setNumReduceTasks(6); 大于 5, 程序会正常运行, 会产生空文件
来源: https://www.cnblogs.com/sunbr/p/13356378.html