最近在公司接到一个任务,是关于数据采集方面的。
需求主要有 3 个:
正好最近都有在这方面做知识储备。正所谓养兵千日,用兵一时啊。学习到的东西只有应用到真实的环境中才有意义不是么。
这里只做模拟环境,而不是真实的线上环境,所以也很简单,如果要使用的话还需要优化优化。
说明一下,这个系统 OS 最好使用 Linux 的,然后 Hadoop 也推荐使用 CDH 发行版的,因为在兼容性、安全性、稳定性都要好于开源的版本。比如说 CDH 的易于升级维护,已解决好 Hadoop 生态其他产品的版本兼容问题,补丁更新比开源要及时(毕竟商业公司支持)等等
还有之所以使用 SpringBoot 是因为快捷,方便,不用做一大堆的配置,不管是作为演示还是生产开发都挺好的。
这里只是做一个很简单的演示,就是在 Web 页面提供一个上传按钮,使用户可以将本地文件上传至 Hadoop 集群平台。
首先看下 pom 文件的依赖:
- <?xml version="1.0" encoding="UTF-8" ?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>
- 4.0.0
- </modelVersion>
- <groupId>
- com.infosys.hadoop
- </groupId>
- <artifactId>
- upload
- </artifactId>
- <version>
- 1.0-SNAPSHOT
- </version>
- <name>
- upload
- </name>
- <packaging>
- jar
- </packaging>
- <parent>
- <groupId>
- org.springframework.boot
- </groupId>
- <artifactId>
- spring-boot-starter-parent
- </artifactId>
- <version>
- 1.5.1.RELEASE
- </version>
- <relativePath/>
- <!-- lookup parent from repository -->
- </parent>
- <properties>
- <project.build.sourceEncoding>
- UTF-8
- </project.build.sourceEncoding>
- <project.reporting.outputEncoding>
- UTF-8
- </project.reporting.outputEncoding>
- <hadoop.version>
- 2.6.5
- </hadoop.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>
- org.springframework.boot
- </groupId>
- <artifactId>
- spring-boot-starter-web
- </artifactId>
- </dependency>
- <dependency>
- <groupId>
- javax.servlet
- </groupId>
- <artifactId>
- javax.servlet-api
- </artifactId>
- <version>
- 3.1.0
- </version>
- </dependency>
- <dependency>
- <groupId>
- org.apache.hadoop
- </groupId>
- <artifactId>
- hadoop-client
- </artifactId>
- <version>
- ${hadoop.version}
- </version>
- <exclusions>
- <exclusion>
- <groupId>
- org.slf4j
- </groupId>
- <artifactId>
- slf4j-log4j12
- </artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <!-- Test -->
- <dependency>
- <groupId>
- junit
- </groupId>
- <artifactId>
- junit
- </artifactId>
- <version>
- 4.12
- </version>
- <scope>
- test
- </scope>
- </dependency>
- <dependency>
- <groupId>
- org.apache.mrunit
- </groupId>
- <artifactId>
- mrunit
- </artifactId>
- <version>
- 1.1.0
- </version>
- <classifier>
- hadoop2
- </classifier>
- <scope>
- test
- </scope>
- </dependency>
- <dependency>
- <groupId>
- org.apache.hadoop
- </groupId>
- <artifactId>
- hadoop-minicluster
- </artifactId>
- <version>
- ${hadoop.version}
- </version>
- <scope>
- test
- </scope>
- </dependency>
- </dependencies>
- <build>
- <finalName>
- ${project.artifactId}
- </finalName>
- <plugins>
- <plugin>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-archetype-plugin
- </artifactId>
- <version>
- 2.2
- </version>
- </plugin>
- <plugin>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-resources-plugin
- </artifactId>
- <configuration>
- <encoding>
- UTF-8
- </encoding>
- </configuration>
- </plugin>
- <plugin>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-compiler-plugin
- </artifactId>
- <version>
- 3.1
- </version>
- <configuration>
- <source>
- 1.8
- </source>
- <target>
- 1.8
- </target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>
- org.apache.maven.plugins
- </groupId>
- <artifactId>
- maven-jar-plugin
- </artifactId>
- <version>
- 2.5
- </version>
- <configuration>
- <outputDirectory>
- ${basedir}
- </outputDirectory>
- </configuration>
- </plugin>
- <plugin>
- <groupId>
- org.springframework.boot
- </groupId>
- <artifactId>
- spring-boot-maven-plugin
- </artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
我们就是添加了一个 SpringBoot 和 Hadoop Client 的依赖。其他的是一些测试相关的。关于这个 Hadoop Client 它提供了一些开发 Hadoop 应用所需的所有依赖,可以参考之前的一篇博客:
首页界面就只是提供一个上传表单按钮:
index.html
- <!doctype html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
- <meta http-equiv="X-UA-Compatible" content="ie=edge">
- <title>
- Upload
- </title>
- </head>
- <body>
- <form action="/upload" method="post" enctype="multipart/form-data">
- <p>
- 文件:
- <input type="file" name="file">
- </p>
- <p>
- <input type="submit" value="上传">
- </p>
- </form>
- </body>
- </html>
然后在 Controller 提供一个接口进行访问首页:
HomeController.java
- @Controller
- @RequestMapping(value = "/")
- public class HomeController {
- public ModelAndView home() {
- return new ModelAndView("index");
- }
- }
上传的逻辑也很简单,就是使用
上传文件的形式先将文件接收到后台,然后调用
- SpringBoot
提供的接口 API 执行上传。
- Hadoop
- @Controller
- public class UploadController {
- @PostMapping("/upload")
- @ResponseBody
- public String handleFileUpload(@RequestParam("file") MultipartFile file) {
- if (!file.isEmpty()) {
- try {
- String originalFilename = file.getOriginalFilename();
- BufferedOutputStream out = new BufferedOutputStream(
- new FileOutputStream(
- new File(originalFilename)
- )
- );
- out.write(file.getBytes());
- out.flush();
- out.close();
- String destFileName = "/user/hadoop/" + originalFilename;
- Upload.main(new String[]{originalFilename, destFileName});
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- return "上传失败," + e.getMessage();
- } catch (IOException e) {
- e.printStackTrace();
- return "上传失败, " + e.getMessage();
- }
- return "上传成功";
- } else {
- return "上传失败,文件为空。";
- }
- }
- }
最后我们在提供一个类来操作 Hadoop 接口。
Upload.java
- public class Upload {
- public static final String FS_DEFAULT_FS = "fs.defaultFS";
- public static final String HDFS_HOST = "hdfs://192.168.1.2:9000";
- public static void main(String[] args) throws IOException {
- Configuration conf = new Configuration();
- conf.setBoolean(CROSS_PLATFORM, true);
- conf.set(FS_DEFAULT_FS, HDFS_HOST);
- GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
- String[] remainingArgs = optionsParser.getRemainingArgs();
- if (remainingArgs.length < 2) {
- System.err.println("Usage: upload <source> <dest>");
- System.exit(2);
- }
- Path source = new Path(args[0]);
- Path dest = new Path(args[1]);
- FileSystem fs = FileSystem.get(conf);
- fs.copyFromLocalFile(true, false, source, dest);
- }
- }
其中的 fs.defaultFS 属性需要与集群 Master NameNode 节点中配置的一直。该属性配置一般在
文件中进行定义。 可以看到我们实际的操作很简单,就只是调用 Hadoop 的 FileSystem 接口中的
- etc/hadoop/core-site.xml
方法,该方法参数说明:
- copyFromLocalFile
当然上传的方式肯定不止这一种,比如:通过 Hadoop 的 rest 接口调用 PUT 也可以上传,还有 Python 等其他语言也有相应的 API 接口等等
如果是要做成平台的话,这样肯定是远远不够的,每个用户都可以上传就需要做好隔离措施,我们可以采用 HDFS 目录隔离的方式,不过我觉得这样不够好,最好采用 CDH 支持的 kerberos 进行授权认证的方式比较好。开源的 Hadoop 默认只支持 Simple 的形式,也就是与操作系统一致的用户验证。
来源: http://www.bubuko.com/infodetail-1978682.html