package com.bw.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Max {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://192.168.0.117:9000");
config.set("yarn.resourcemanager.hostname", "192.168.0.117");
Job job = Job.getInstance(config);
//MR
job.setMapperClass(MaxMapper.class);
job.setReducerClass(MaxReducer.class);
//M-O
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
//R-O
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
//I-O
FileInputFormat.setInputPaths(job, new Path("/b"));
FileOutputFormat.setOutputPath(job, new Path("/Out"));
//boolean
boolean B = job.waitForCompletion(true);
if(B){
System.out.println("Success");
}else{
System.out.println("Error");
}
}
public static class MaxMapper extends Mapper
// 定义一个Long类型的最小值作为临时变量
private Long max = Long.MIN_VALUE;
// 定义输出去的value
private LongWritable maxValue = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Mapper
InterruptedException {
// 获取输入的行
String line = value.toString();
// 抛弃无效记录
if (line == null || line.equals("")) {
return;
}
// 把line转换为数值
long temp = Long.parseLong(line);
// 比较大小
if (temp > max) {
// 把val赋值给tempMax
max = temp;
}
}
/**
* cleanUp()是指map函数执行完成之后就会调用,刚好满足我们的要求 因为map()函数执行完成之后我们单个任务的的最大值也就产生了
*/
@Override
protected void cleanup(Mapper
// 把最后的处理结果写出去
maxValue.set(max);
context.write(maxValue, NullWritable.get());
}
}
/**
* 汇总多个任务产生的最大值,再次比较
*/
public static class MaxReducer extends Reducer
// 定义一个参考的临时变量
private Long max = Long.MIN_VALUE;
// 定义输出的key
private LongWritable maxValue = new LongWritable();
protected void reduce(LongWritable key, Iterable
throws IOException, InterruptedException {
if (key.get() > max) {
max = key.get();
}
}
/**
* reduce任务完成后写出去
*/
protected void cleanup(Reducer
// 设置最大值
maxValue.set(max);
context.write(maxValue, NullWritable.get());
}
}
}
posted on 2017-08-12 10:09 李培元 阅读(...) 评论(...) 编辑 收藏
来源: http://www.cnblogs.com/lipeiyuan2017/p/7349087.html