项目文件: GitHub
Mapreduce 流程:
- package test.wordcount;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.StringTokenizer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.GenericOptionsParser;
- /*
- * @author:Kouch
- *
- * "词频统计" 思路:
- * 1 input: 从输入文件读取数据;
- * 2 split : 一行 为一个 < key,value > 对 - value: 行内容;
- * 3 map : 将一行切分成一个一个单词, 每个单词输出为一个 <Word,1> 的兼键值对;
- * 4 shuffle: 将相同的 key(一行的内容) 累计 - <key,value-list> :<Word , n> ;
- * hadoop 会自动统计每一次 map 后的单词频率;
- * 5 reduce: 汇总时统计整体的同一个单词的频率;
- * 6 output: 输出;
- *
- */
- public class WordCount {
- //map
- public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
- //hadoop 的数据类型: 有一个单词, 就记录一个 '1';
- private static final IntWritable one = new IntWritable(1);
- //(在循环 map 时) 保存每一个单词;
- private Text Word = new Text();
- public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
- // 强调: 具体的切分依据数据的保存格式;
- // 文档数据格式: 单词以空格分割;
- // 一行内容: Text value 转成 StringTokenizer;
- StringTokenizer itr = new StringTokenizer(value.toString());
- // 将每个单词输出到 context
- while(itr.hasMoreTokens()) {
- this.Word.set(itr.nextToken());
- context.write(this.Word, one);
- }
- }
- }
- //reduce
- public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
- //
- private IntWritable result = new IntWritable();
- public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
- int sum = 0;
- // 迭代累计频率;
- IntWritable val;
- for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
- val = (IntWritable)i$.next();
- }
- this.result.set(sum);
- context.write(key, this.result);
- }
- }
- //main
- public static void main(String[] args) throws Exception {
- // 配置类
- Configuration conf = new Configuration();
- // 传参设置
- //String[] ioArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs(); // 可用于打包 jar
- String[] ioArgs =new String[] {"input","output"};
- if(ioArgs.length <2) {
- System.err.println("Usage: wordcount <in> [<in>...] <out>");
- System.exit(2);
- }
- //job 相关配置
- Job job = Job.getInstance(conf, "word count");
- job.setJarByClass(WordCount.class);
- job.setMapperClass(WordCount.TokenizerMapper.class);
- job.setCombinerClass(WordCount.IntSumReducer.class);
- job.setReducerClass(WordCount.IntSumReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- for(int i = 0; i < ioArgs.length - 1; ++i) {
- FileInputFormat.addInputPath(job, new Path(ioArgs[i]));
- }
- FileOutputFormat.setOutputPath(job, new Path(ioArgs[ioArgs.length - 1]));
- // 等待 job 完成之后再返回结果并退出程序
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
来源: http://www.bubuko.com/infodetail-3182139.html