- import java.io.IOException;
- import java.util.Iterator;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- /**
- * @version v0.0.1
- * @author xuhang
- *
- */
- /*
- * 1.导出为jar包并拷贝到当前用户能访问的地方,比如:/usr/local/hadoopmapreducedemo.jar
- * 2.执行运算:hadoop jar [jar路径] [输入文件路径] [输出文件路径]
- * 输入文件如果直接写文件夹名的话,调用的是hdfs下当前用户目录下的该文件,比如直接输input,提示/user/hadoop/input不存在
- * 所以保险期间可以输入绝对路径 hdfs://ceoihong:9000/usr/local/hadoop/input
- * 输出文件需要保证对应的路径不存,否则报FileAlreadyExistsException异常。
- * hadoop jar /usr/local/hadoopmapreducedemo.jar hdfs://ceoihong:9000/usr/local/hadoop/input hdfs://ceoihong:9000/usr/local/hadoop/output
- * 最终结果保存在/usr/local/hadoop/output/part-r-00000中
- */
- public class HadoopMapReduceDemo {
- public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
- protected void map(LongWritable key, Text value,
- Mapper<LongWritable, Text, Text, LongWritable>.Context context)
- throws IOException, InterruptedException {
- //默认是用当前行的偏移量和当前行作为一对键值对
- String line = value.toString();
- String[] words = line.split("\\\\s");
- for(String word : words){
- context.write(new Text(word), new LongWritable(1));
- }
- }
- }
- public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
- protected void reduce(Text arg0, Iterable<LongWritable> arg1,
- Reducer<Text, LongWritable, Text, LongWritable>.Context arg2)
- throws IOException, InterruptedException {
- //已按照键将对应的所有的值封装成Iterable对象<key,<value1,value2,value3...>>
- long sum = 0;
- Iterator<LongWritable> it = arg1.iterator();
- while(it.hasNext()){
- LongWritable i = it.next();
- sum += i.get();
- }
- arg2.write(arg0, new LongWritable(sum));
- }
- }
- public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
- Configuration conf = new Configuration();
- String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
- if (otherArgs.length != 2) {
- System.err.println("Usage: HadoopMapReduceDemo <in> <out>");
- System.exit(2);
- }
- Job job = new Job(conf);
- job.setJarByClass(HadoopMapReduceDemo.class);
- job.setJobName("hadoopmapreducedemo");//设置任务名称
- job.setOutputKeyClass(Text.class);//
- job.setOutputValueClass(LongWritable.class); //输出键值对类型
- job.setMapperClass(MyMapper.class);//
- job.setCombinerClass(MyReducer.class);//这个是干嘛的??
- job.setReducerClass(MyReducer.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入文件路径
- FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出文件路径
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
- //该片段来自于http://www.codesnippet.cn/detail/2610201513901.html
来源: http://www.codesnippet.cn/detail/2610201513901.html