*** 数据去重 ***
目标: 原始数据中出现次数超过一次的数据在输出文件中只出现一次.
算法思想: 根据 reduce 的过程特性, 会自动根据 key 来计算输入的 value 集合, 把数据作为 key 输出给 reduce, 无论这个数据出现多少次, reduce 最终结果中 key 只能输出一次.
1. 实例中每个数据代表输入文件中的一行内容, map 阶段采用 Hadoop 默认的作业输入方式. 将 value 设置为 key, 并直接输出. map 输出数据的 key 为数据, 将 value 设置成空值
2. 在 MapReduce 流程中, map 的输出经过 shuffle 过程聚集成后会交给 reduce
3.reduce 阶段不管每个 key 有多少个 value, 它直接将输入的 key 复制为输出的 key, 并输出(输出中的 value 被设置成空).
代码实现:
public class testquchong {
static String INPUT_PATH="hdfs://master:9000/quchong"; // 将文件 file1 和 file2 放在该目录下
static String OUTPUT_PATH="hdfs://master:9000/quchong/qc";
static class MyMapper extends Mapper{ // 将输入输出作为 string 类型, 对应 Text 类型
private static Text line=new Text(); // 每一行作为一个数据
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException{
line=value;
context.write(line,new Text(",")); //key 是唯一的, 作为数据, 即实现去重
}
}
static class MyReduce extends Reducer{
protected void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException{
context.write(key,new Text(" ")); //map 传给 reduce 的数据已经做完数据去重, 输出即可
}
}
public static void main(String[] args) throws Exception{
Path outputpath=new Path(OUTPUT_PATH);
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job,outputpath);
job.waitForCompletion(true);
}
}
*** 数据排序 ***
目标: 实现多个文件中的数据进行从小到大的排序并输出
算法思想: MapReduce 过程中就有排序, 它的默认排序规则按照 key 值进行排序的, 如果 key 为封装 int 的 IntWritable 类型, 那么 MapReduce 按照数字大小对 key 排序, 如果 key 为封装为 String 的 Text 类型, 那么 MapReduce 按照字典顺序对字符串排序.
使用封装 int 的 IntWritable 型数据结构. 也就是在 map 中将读入的数据转化成 IntWritable 型, 然后作为 key 值输出 (value 任意).reduce 拿到之后, 将输入的 key 作为 value 输出, 并根据 value-list 中元素的个数决定输出的次数. 输出的 key(即代码中的 linenum) 是一个全局变量, 它统计当前 key 的位次.
代码实现:
public class paixu {
static String INPUT_PATH = "hdfs://master:9000/test";
static String OUTPUT_PATH = "hdfs://master:9000/output/sort";
static class MyMapper extends Mapper { // 选择为 Int 类型, value 值任意
IntWritable output_key = new IntWritable();
NullWritable output_value = NullWritable.get();
protected void map(Object key, Object value, Context context) throws IOException,
InterruptedException {
int val = Integer.parseUnsignedInt(value.toString().trim()); // 进行数据类型转换
output_key.set(val); context.write(output_key, output_value); //key 值确定
}
}
static class MyReduce extends Reducer { // 输入是 map 的输出, 输出行号和数据为 int
IntWritable output_key = new IntWritable();
int num = 1;
protected void reduce(IntWritable key, Iterable values, Context context) throws IOException,
InterruptedException {
output_key.set(num++); // 循环赋值作为行号
context.write(output_key, key); //key 为 map 传入的数据
}
}
public static void main(String[] args) throws Exception {
Path outputpath = new Path(OUTPUT_PATH);
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, INPUT_PATH);
FileOutputFormat.setOutputPath(job, outputpath); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReduce.class); job.setMapOutputKeyClass(IntWritable.class); // 因为 map 和 reduce 的输出类型不一样
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true);
}
}
来源: http://www.bubuko.com/infodetail-2479769.html