摘要: public class ReducerJoin { public static class ValueFlag implements Writable { private String value; private String flag; public String ...
public class ReducerJoin {
- public static class ValueFlag implements Writable {
- private String value;
- private String flag;
- public String getValue() {
- return value;
- }
- public void setValue(String value) {
- this.value = value;
- }
- public String getFlag() {
- return flag;
- }
- public void setFlag(String flag) {
- this.flag = flag;
- }
- public void write(DataOutput out) throws IOException {
- out.writeUTF(value);
- out.writeUTF(flag);
- }
- public void readFields(DataInput in ) throws IOException {
- this.value = in.readUTF();
- this.flag = in.readUTF();
- }
- }
- // map读取两个文件 根据来源把每个kv对打上标签 输出给reduce可以必须是关联字段
- public static class ReducerJoinMap extends Mapper < LongWritable,
- Text,
- Text,
- ValueFlag > {
- private FileSplit fileSplit;
- private String fileName;
- private String[] infos;
- private Text oKey = new Text();
- private ValueFlag oValue = new ValueFlag();
- @Override protected void setup(Mapper < LongWritable, Text, Text, ValueFlag > .Context context) throws IOException,
- InterruptedException {
- fileSplit = (FileSplit) context.getInputSplit();
- if (fileSplit.getPath().toString().contains("user-logs-large.txt")) {
- fileName = "userLogsLarge";
- } else if (fileSplit.getPath().toString().contains("user_info.txt")) {
- fileName = "userInfo";
- }
- }
- @Override protected void map(LongWritable key, Text value, Mapper < LongWritable, Text, Text, ValueFlag > .Context context) throws IOException,
- InterruptedException {
- infos = value.toString().split("\\s");
- oValue.setFlag(fileName);
- if (fileName.equals("userLogsLarge")) {
- // 解析user-logs-large.txt
- oKey.set(infos[0]);
- oValue.setValue(infos[1] + "\t" + infos[2]);
- context.write(oKey, oValue);
- } else if (fileName.equals("userInfo")) {
- // 解析user_infos.txt
- oKey.set(infos[0]);
- oValue.setValue(infos[1] + "\t" + infos[2]);
- context.write(oKey, oValue);
- }
- }
- }
- // 接受map发送过来的kv队 根据value中的flag把同一个key对应的value分组
- // 那么两组中的数据就是分别来自两张表中的数据 对这两组数据做笛卡尔乘机即完成关联
- public static class ReducerJoinReducer extends Reducer < Text,
- ValueFlag,
- AvroKey < UserActionLog > ,
- NullWritable > {
- private List < String > userLogsLargeList;
- private List < String > userInfosList;
- private NullWritable outValue = NullWritable.get();
- private AvroKey < UserActionLog > outKey = new AvroKey < UserActionLog > ();
- private String[] infos;
- @Override protected void reduce(Text key, Iterable < ValueFlag > values, Reducer < Text, ValueFlag, AvroKey < UserActionLog > , NullWritable > .Context context) throws IOException,
- InterruptedException {
- userLogsLargeList = new ArrayList < String > ();
- userInfosList = new ArrayList < String > ();
- for (ValueFlag value: values) {
- if (value.getFlag().equals("userLogsLarge")) {
- userLogsLargeList.add(value.getValue());
- } else if (value.getFlag().equals("userInfo")) {
- userInfosList.add(value.getValue());
- }
- }
- // 对两组中的数据进行笛卡尔乘积
- for (String userlog: userLogsLargeList) {
- for (String userinfo: userInfosList) {
- // 构建一个useractionLog对象
- UserActionLog.Builder build = UserActionLog.newBuilder();
- // 从userlog中提取actiontyoe和ipaddress
- infos = userlog.split("\\s");
- build.setActionType(infos[0]);
- build.setIpAddress(infos[1]);
- // 从userinfo 提取gender 和privince
- infos = userinfo.split("\\s");
- if (infos[0].equals("man")) {
- build.setGender(0);
- } else {
- build.setGender(1);
- }
- build.setProvience(infos[1]);
- build.setUserName(key.toString());
- UserActionLog userActionLog = build.build();
- // 吧userAction封装到Avrokey中
- outKey.datum(userActionLog);
- context.write(outKey, outValue);
- }
- }
- }
- }
- public static void main(String[] args) throws IOException,
- ClassNotFoundException,
- InterruptedException {
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- job.setJarByClass(ReducerJoin.class);
- job.setJobName("reducer联合");
- job.setMapperClass(ReducerJoinMap.class);
- job.setReducerClass(ReducerJoinReducer.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(ValueFlag.class);
- job.setOutputKeyClass(AvroKey.class);
- job.setOutputValueClass(NullWriter.class);
- //设置输出的格式是avrokey
- job.setOutputFormatClass(AvroKeyOutputFormat.class);
- //设置输出key的schema
- AvroJob.setOutputKeySchema(job, UserActionLog.SCHEMA$);
- FileInputFormat.addInputPath(job, new Path("/mapjoin"));
- Path outputPath = new Path("/ReducerJoin");
- outputPath.getFileSystem(configuration).delete(outputPath, true);
- FileOutputFormat.setOutputPath(job, outputPath);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
}
来源: http://click.aliyun.com/m/32856/