众所周知, hadoop 生态圈的多数组件都是使用 java 开发的.
那么使用 Java API 方式实现起来, 显得要比其它语言效率更高, 更原生态.
前面有一个 Hadoop 学习笔记 02_MapReduce 练习 https://www.cnblogs.com/frx9527/p/mapreduce.html 是在 Linux 下直接使用的 python2.7 实现的. 这里我试试 windows 下用 java 来练习实现.
练习一: 单词统计, wordcount
1. IntelliJ IDEA 中 New project, maven,SDK1.8 , Next, 输入 Groupid : examplemr , ArtifactId : examplemr , Version: 1.0 , Next,
Project name : examplemr , Project location: D:\test\examplemr Finish
修改 pom.xml 引入必要的 dependency. 在 IDE 的提示中, 点击 Import Changes 等待自动下载相关的依赖包
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>cn.itcast</groupId>
- <artifactId>example-mr</artifactId>
- <version>1.0</version>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.4</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>2.7.4</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <!-- 打 jar 包插件 -->
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
- <configuration>
- <archive>
- <manifest>
- <addClasspath>true</addClasspath>
- <classpathPrefix>lib/</classpathPrefix>
- <mainClass>cn.itcast.hadoop.mr.WordCountDriver</mainClass>
- </manifest>
- </archive>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- <encoding>UTF-8</encoding>
- </configuration>
- </plugin>
- </plugins>
- </build>
- </project>
展开 src,main,java, 右击, new,package, 输入 cn.abc.hadoop.mr 再 New, Java Class, 输入 WordCountMapper
具体代码如下, 注意 import 时的包名正确.
- package cn.abc.hadoop.mr;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- /**
- * Created by abc .
- *
- * 这里就是 mapreduce 程序 mapper 阶段业务逻辑实现的类
- *
- * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
- *
- * KEYIN: 表示 mapper 数据输入的时候 key 的数据类型, 在默认的读取数据组件下, 叫 InputFormat, 它的行为是一行一行的读取待处理的数据
- * 读取一行, 返回一行给我们的 mr 程序, 这种情况下 keyin 就表示每一行的起始偏移量 因此数据类型是 Long
- *
- * VALUEIN: 表述 mapper 数据输入的时候 value 的数据类型, 在默认的读取数据组件下 valuein 就表示读取的这一行内容 因此数据类型是 String
- *
- * KEYOUT 表示 mapper 数据输出的时候 key 的数据类型 在本案例当中 输出的 key 是单词 因此数据类型是 String
- *
- * VALUEOUT 表示 mapper 数据输出的时候 value 的数据类型 在本案例当中 输出的 key 是单词的次数 因此数据类型是 Integer
- *
- * 这里所说的数据类型 String Long 都是 jdk 自带的类型 在序列化的时候 效率低下 因此 hadoop 自己封装一套数据类型
- * long---->LongWritable
- * String-->Text
- * Integer--->Intwritable
- * null-->NullWritable
- *
- *
- */
- public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
- /**
- * 这里就是 mapper 阶段具体的业务逻辑实现方法 该方法的调用取决于读取数据的组件有没有给 mr 传入数据
- * 如果有的话 每传入一个k,v对 该方法就会被调用一次
- * @param key
- * @param value
- * @param context
- * @throws IOException
- * @throws InterruptedException
- */
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- // 拿到传入进来的一行内容, 把数据类型转化为 String
- String line = value.toString();
- // 将这一行内容按照分隔符进行一行内容的切割 切割成一个单词数组
- String[] words = line.split(" ");
- // 遍历数组, 每出现一个单词 就标记一个数字 1 < 单词, 1>
- for (String word : words) {
- // 使用 mr 程序的上下文 context 把 mapper 阶段处理的数据发送出去
- // 作为 reduce 节点的输入数据
- context.write(new Text(word),new IntWritable(1));
- //hadoop hadoop spark --> <hadoop,1><hadoop,1><spark,1>
- }
- }
- }
继续 New, Java Class, WordCountReducer 代码如下:
- package cn.abc.hadoop.mr;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- /**
- * Created by abc
- *
- * 这里是 MR 程序 reducer 阶段处理的类
- *
- * KEYIN: 就是 reducer 阶段输入的数据 key 类型, 对应 mapper 的输出 key 类型 在本案例中 就是单词 Text
- *
- * VALUEIN 就是 reducer 阶段输入的数据 value 类型, 对应 mapper 的输出 value 类型 在本案例中 就是单词次数 IntWritable
- * .
- * KEYOUT 就是 reducer 阶段输出的数据 key 类型 在本案例中 就是单词 Text
- *
- * VALUEOUTreducer 阶段输出的数据 value 类型 在本案例中 就是单词的总次数 IntWritable
- */
- public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
- /**
- * 这里是 reduce 阶段具体业务类的实现方法
- * @param key
- * @param values
- * @param context
- * @throws IOException
- * @throws InterruptedException
- *
- * reduce 接收所有来自 map 阶段处理的数据之后, 按照 key 的字典序进行排序
- * <hello,1><hadoop,1><spark,1><hadoop,1>
- * 排序后:
- * <hadoop,1><hadoop,1><hello,1><spark,1>
- *
- * 按照 key 是否相同作为一组去调用 reduce 方法
- * 本方法的 key 就是这一组相同 kv 对的共同 key
- * 把这一组所有的 v 作为一个迭代器传入我们的 reduce 方法
- *
- * <hadoop,[1,1]>
- */
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- // 定义一个计数器
- int count = 0;
- // 遍历一组迭代器, 把每一个数量 1 累加起来就构成了单词的总次数
- for(IntWritable value:values){
- count +=value.get();
- }
- // 把最终的结果输出
- context.write(key,new IntWritable(count));
- }
- }
最后, WordCountDriver 代码, 写完就可以右击, Run 一下试试.
- package cn.abc.hadoop.mr;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- /**
- * Created by abc .
- *
- * 这个类就是 mr 程序运行时候的主类, 本类中组装了一些程序运行时候所需要的信息
- * 比如: 使用的是那个 Mapper 类 那个 Reducer 类 输入数据在那 输出数据在什么地方
- */
- public class WordCountDriver {
- public static void main(String[] args) throws Exception{
- // 通过 Job 来封装本次 mr 的相关信息
- Configuration conf = new Configuration();
- // 即使没有下面这行, 也可以本地运行 因 \ hadoop-mapreduce-client-core-2.7.4.jar!\mapred-default.xml 中默认的参数就是 local
- //conf.set("mapreduce.framework.name","local");
- Job job = Job.getInstance(conf);
- // 指定本次 mr job jar 包运行主类
- job.setJarByClass(WordCountDriver.class);
- // 指定本次 mr 所用的 mapper reducer 类分别是什么
- job.setMapperClass(WordCountMapper.class);
- job.setReducerClass(WordCountReducer.class);
- // 指定本次 mr mapper 阶段的输出 k v 类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- // 指定本次 mr 最终输出的 k v 类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- // 指定本次 mr 输入的数据路径 和最终输出结果存放在什么位置
- FileInputFormat.setInputPaths(job,"D:\\wordcount\\input");
- FileOutputFormat.setOutputPath(job,new Path("D:\\wordcount\\output"));
- // 如果出现 0644 错误或找不到 winutils.exe, 则需要设置 windows 环境和相关文件.
- // 上面的路径是本地测试时使用, 如果要打包 jar 到 hdfs 上运行时, 需要使用下面的路径.
- //FileInputFormat.setInputPaths(job,"/wordcount/input");
- //FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));
- // job.submit(); // 一般不要这个.
- // 提交程序 并且监控打印程序执行情况
- boolean b = job.waitForCompletion(true);
- System.exit(b?0:1);
- }
- }
如果出现 0644 错误或找不到 winutils.exe, 则需要设置 windows 环境和相关文件.
设置环境变量 HADOOP_HOME 指向 hadoop-2.7.5.tar.gz 完整包解压后的路径, 并且在 Path 中添加 %HADOOP_HOME%\bin;
然后下载 win 环境所需文件, 解压, 将其复制到环境变量为 HADOOP_HOME 的真实路径下的 bin 目录中.
如果想要看看日志, 可以在 examplemr\src\main\resources \ 下放入 log4j.properties 文件. 运行后, 将会看到 \ examplemr\mapreduce_test.log 日志.
相关文件: hadoop-win 相关文件 winutils.exe https://files.cnblogs.com/files/frx9527/hadoop-win.rar | log4j.properties https://files.cnblogs.com/files/frx9527/log4j.rar
来源: http://www.bubuko.com/infodetail-2617804.html