- }
- }
- }
- System.out.println("---------------------结束reduce()函数---------------------");
- System.out.println("---------------------{[" + key.getFirstKey()+"," + key.getSecondKey() + "],[" +score +"]}");
- System.out.println("---------------------进入reduce()函数---------------------");
- context.write(key.getFirstKey(), score);
- //将联合Key的第一个元素作为新的key,将score作为value写出去
- score.set(sb.toString());
- //设置写出去的value
- }
- sb.deleteCharAt(sb.length() - 1);
- if (sb.length() > 0){
- //取出最后一个逗号
- }
- sb.append(val.get() + ",");
- for (IntWritable val : values){
- sb.delete(0, sb.length());
- //先清除上一个组的数据
- throws IOException, InterruptedException {
- protected void reduce(CombinationKey key, Iterable values, Reducer.Context context)
- */
- * 这个分组会调用一次reduce函数
- * 分组后的结果就是
- * 例如:
- * 所谓的分组就是将相同的key对应的value放在一个集合中
- * reduce每次处理一个分组的时候会调用一次reduce函数。
- * 这里要注意一下reduce的调用时机和次数:
- /**
- Text score = new Text();
- StringBuffer sb = new StringBuffer();
- public static class SecondSortReducer extends Reducer{
- }
- }
- System.out.println("---------------------结束map()函数---------------------");
- context.write(combinationKey, score);
- //通过context把map处理后的结果输出
- combinationKey.setSecondKey(score);
- combinationKey.setFirstKey(sortName);
- //设置联合key
- score.set(Integer.parseInt(value.toString()));
- sortName.set(key.toString());
- //构造相关属性
- }
- return;
- if (key == null || value == null || key.toString().equals("")){
- //过滤非法记录(这里用计数器比较好)
- System.out.println("---------------------进入map()函数---------------------");
- protected void map(Text key, Text value, Mapper.Context context) throws IOException, InterruptedException {
- String[] splits = null;
- IntWritable score = new IntWritable();
- Text sortName = new Text();
- private CombinationKey combinationKey = new CombinationKey();
- */
- * 导致栈内存被浪费掉,我们将其写在map函数外面,顶多就只有4个对象句柄
- * 非常的多(极端情况下将产生4*1亿个句柄,当然java也是有自动的GC机制的,一定不会达到这么多)
- * 条输入记录,如果将这些变量都定义在map函数里面则会导致这4个变量的对象句柄
- * 每一行的原始记录的处理都要调用一次map()函数,假设,这个map()函数要处理1一亿
- * 对于分布式的程序,我们一定要注意到内存的使用情况,对于MapReduce框架
- * 这里要特殊说明一下,为什么要将这些变量写在map函数外边
- /**
- public static class SecondSortMapper extends Mapper{
- }
- }
- e.printStackTrace();
- } catch (Exception e) {
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- // 提交作业 退出
- job.setOutputFormatClass(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- //2.3 指定输出的路径和设置输出的格式化类
- job.setOutputValueClass(Text.class);
- job.setOutputKeyClass(Text.class);
- job.setReducerClass(SecondSortReducer.class);
- //2.2 指定Reducer类和输出key和value的类型
- //2.1 Shuffle把数据从Map端拷贝到Reduce端。
- //1.5 归约
- //1.4 排序
- job.setSortComparatorClass(DefinedComparator.class);
- //设置自定义比较策略(因为我的CombineKey重写了compareTo方法,所以这个可以省略)
- job.setGroupingComparatorClass(DefinedGroupSort.class);
- //设置自定义分组策略
- job.setNumReduceTasks(1);
- job.setPartitionerClass(DefinedPartition.class);
- //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setMapOutputValueClass(IntWritable.class);
- job.setMapOutputKeyClass(CombinationKey.class);
- job.setMapperClass(SecondSortMapper.class);
- //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setInputFormatClass(KeyValueTextInputFormat.class);
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- //1.1 设置输入目录和设置输入数据格式化的类
- Job job = new Job(conf, SecondSortMapReduce.class.getName());
- // 创建任务
- }
- fileSystem.delete(new Path(OUT_PATH), true);
- if (fileSystem.exists(new Path(OUT_PATH))) {
- // 如果输出目录存在,我们就删除
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 创建文件系统
- conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
- Configuration conf = new Configuration();
- // 创建配置信息
- try {
- public static void main(String[] args) {
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
- // 定义输出路径
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/sort_data";
- // 定义输入路径
- public class SecondSortMapReduce {
来源: http://blog.csdn.net/sdgihshdv/article/details/74570797