- [toc]
- Hadoop HDFS Java API
主要是 Java 操作 HDFS 的一些常用代码, 下面直接给出代码:
- package com.uplooking.bigdata.hdfs;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.*;
- import org.apache.hadoop.fs.permission.FsPermission;
- import org.apache.hadoop.io.IOUtils;
- import org.junit.After;
- import org.junit.Before;
- import org.junit.Test;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.net.URI;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- /**
- * 列出目录的内容: listStatus
- * 读取文件: open
- * 创建目录: mkdirs
- * 创建文件: create
- * 删除文件或目录: delete
- * 显示文件存储位置: getFileBlockLocations
- */
- public class HDFSTest {
- private FileSystem fs;
- private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- /**
- * 初始化资源
- * @throws Exception
- */
- @Before
- public void setUp() throws Exception {
- URI uri = new URI("hdfs://uplooking01:9000");
- Configuration configuration = new Configuration();
- fs = FileSystem.get(uri, configuration);
- }
- /**
- * 列出目录的内容: listStatus
- * 模仿:
- * $ hdfs dfs -ls /
- * -rw-r--r-- 1 uplooking supergroup 28 2018-02-28 12:29 /hello
- * drwxr-xr-x - uplooking supergroup 0 2018-02-28 12:31 /output
- * drwx------ - uplooking supergroup 0 2018-02-28 12:31 /tmp
- *
- * @throws IOException
- */
- @Test
- public void testList() throws IOException {
- FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
- for (FileStatus fileStatus : fileStatuses) {
- // 先定义好需要判断才能确定的项
- String prefix = "d";
- String repliaction = "-";
- // 获取文件类型
- if (fileStatus.isFile()) {
- prefix = "-";
- }
- // 获取权限列表
- FsPermission permission = fileStatus.getPermission();
- String uacl = permission.getUserAction().SYMBOL;
- String gacl = permission.getGroupAction().SYMBOL;
- String oacl = permission.getOtherAction().SYMBOL;
- String acl = uacl + gacl + oacl;
- // 获取复制因子数
- if (fileStatus.isFile()) {
- repliaction = fileStatus.getReplication() + "";
- }
- // 获取文件属主
- String owner = fileStatus.getOwner();
- // 获取文件属组
- String group = fileStatus.getGroup();
- // 获取文件大小
- long len = fileStatus.getLen();
- // 获取文件修改时间
- String mTime = df.format(new Date(fileStatus.getModificationTime()));
- // 获取文件路径
- Path path = fileStatus.getPath();
- // 格式化输出
- System.out.println(prefix + acl + "\t" + repliaction + "\t" + owner + "" + group +"\t"+ mTime +"\t" + path);
- }
- }
- /**
- * 读取文件: open
- *
- * @throws IOException
- */
- @Test
- public void testOpen() throws IOException {
- FSDataInputStream fis = fs.open(new Path("hdfs://uplooking01:9000/hello"));
- // 方式 1:
- /* byte[] bytes = new byte[1024];
- int len = 0;
- while ((len = fis.read(bytes)) != -1) {
- System.out.println(new String(bytes, 0, len));
- }
- fis.close();*/
- // 方式 2:
- /*BufferedReader br = new BufferedReader(new InputStreamReader(fis));
- String line = null;
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- }
- fis.close();*/
- // 方式 3:
- IOUtils.copyBytes(fis, System.out, 1024, false);
- }
- /**
- * 创建目录: mkdirs
- *
- * @throws IOException
- */
- @Test
- public void testMkdir() throws IOException {
- boolean ret = fs.mkdirs(new Path("/input/hdfs"));
- System.out.println(ret ? "创建目录成功" : "创建目录失败");
- }
- /**
- * 创建文件: create
- *
- * @throws IOException
- */
- @Test
- public void testCreate() throws IOException {
- // 第二个参数为是否覆盖, Files are overwritten by default
- FSDataOutputStream fos = fs.create(new Path("/input/hdfs/word.txt"), false);
- fos.write("hello\n".getBytes());
- fos.write("xpleaf\n".getBytes());
- fos.close();
- }
- /**
- * 删除文件或目录: delete
- *
- * @throws IOException
- */
- @Test
- public void testDelete() throws IOException {
- // 第二个参数为是否递归删除(当删除目录时)
- boolean ret = fs.delete(new Path("/input/hdfs/word.txt"), false);
- System.out.println(ret ? "删除成功" : "删除失败");
- }
- /**
- * 显示文件存储位置: getFileBlockLocations
- *
- * @throws IOException
- */
- @Test
- public void testLocations() throws IOException {
- Path path = new Path("/hadoop-2.6.4.tar.gz");
- FileStatus fileStatus = fs.getFileStatus(path);
- // 参数分别为: 文件路径 偏移起始位置 文件长度
- BlockLocation[] locations = fs.getFileBlockLocations(path, 0, fileStatus.getLen());
- System.out.println(locations);
- for (BlockLocation location : locations) {
- System.out.println(location);
- }
- /**
- * 0,134217728,uplooking01 (偏移量从 0 开始, 大小为 128MB 的块存储在节点 uplooking01 上)
- 134217728,61798247,uplooking01 (偏移量从 128M 开始, 大小为 59M 的块 (就是剩余大小) 存储在节点 uplooking01 上)
- 可以看到, 两个块都只存在 uplooking01 上的, 这是因为这里的 hadoop 环境是伪分布式的
- */
- }
- /**
- * 释放资源
- * @throws IOException
- */
- @After
- public void cleanUp() throws IOException {
- fs.close();
- }
- }
来源: http://www.bubuko.com/infodetail-2510411.html