[toc]
Hadoop HDFS Java API
主要是Java操作HDFS的一些常用代码,下面直接给出代码:
package com.uplooking.bigdata.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 列出目录的内容:listStatus
* 读取文件:open
* 创建目录:mkdirs
* 创建文件:create
* 删除文件或目录:delete
* 显示文件存储位置:getFileBlockLocations
*/
public class HDFSTest {
private FileSystem fs;
private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");
/**
* 初始化资源
* @throws Exception
*/
@Before
public void setUp() throws Exception {
URI uri = new URI("hdfs://uplooking01:9000");
Configuration configuration = new Configuration();
fs = FileSystem.get(uri, configuration);
}
/**
* 列出目录的内容:listStatus
* 模仿:
* $ hdfs dfs -ls /
* -rw-r--r-- 1 uplooking supergroup 28 2018-02-28 12:29 /hello
* drwxr-xr-x - uplooking supergroup 0 2018-02-28 12:31 /output
* drwx------ - uplooking supergroup 0 2018-02-28 12:31 /tmp
*
* @throws IOException
*/
@Test
public void testList() throws IOException {
FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
for (FileStatus fileStatus : fileStatuses) {
// 先定义好需要判断才能确定的项
String prefix = "d";
String repliaction = "-";
// 获取文件类型
if (fileStatus.isFile()) {
prefix = "-";
}
// 获取权限列表
FsPermission permission = fileStatus.getPermission();
String uacl = permission.getUserAction().SYMBOL;
String gacl = permission.getGroupAction().SYMBOL;
String oacl = permission.getOtherAction().SYMBOL;
String acl = uacl + gacl + oacl;
// 获取复制因子数
if (fileStatus.isFile()) {
repliaction = fileStatus.getReplication() + "";
}
// 获取文件属主
String owner = fileStatus.getOwner();
// 获取文件属组
String group = fileStatus.getGroup();
// 获取文件大小
long len = fileStatus.getLen();
// 获取文件修改时间
String mTime = df.format(new Date(fileStatus.getModificationTime()));
// 获取文件路径
Path path = fileStatus.getPath();
// 格式化输出
System.out.println(prefix + acl + "\t" + repliaction + "\t" + owner + " " + group + "\t" + mTime + "\t" + path);
}
}
/**
* 读取文件:open
*
* @throws IOException
*/
@Test
public void testOpen() throws IOException {
FSDataInputStream fis = fs.open(new Path("hdfs://uplooking01:9000/hello"));
// 方式1:
/* byte[] bytes = new byte[1024];
int len = 0;
while ((len = fis.read(bytes)) != -1) {
System.out.println(new String(bytes, 0, len));
}
fis.close();*/
// 方式2:
/*BufferedReader br = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
fis.close();*/
// 方式3:
IOUtils.copyBytes(fis, System.out, 1024, false);
}
/**
* 创建目录:mkdirs
*
* @throws IOException
*/
@Test
public void testMkdir() throws IOException {
boolean ret = fs.mkdirs(new Path("/input/hdfs"));
System.out.println(ret ? "创建目录成功" : "创建目录失败");
}
/**
* 创建文件:create
*
* @throws IOException
*/
@Test
public void testCreate() throws IOException {
// 第二个参数为是否覆盖,Files are overwritten by default
FSDataOutputStream fos = fs.create(new Path("/input/hdfs/word.txt"), false);
fos.write("hello\n".getBytes());
fos.write("xpleaf\n".getBytes());
fos.close();
}
/**
* 删除文件或目录:delete
*
* @throws IOException
*/
@Test
public void testDelete() throws IOException {
// 第二个参数为是否递归删除(当删除目录时)
boolean ret = fs.delete(new Path("/input/hdfs/word.txt"), false);
System.out.println(ret ? "删除成功" : "删除失败");
}
/**
* 显示文件存储位置:getFileBlockLocations
*
* @throws IOException
*/
@Test
public void testLocations() throws IOException {
Path path = new Path("/hadoop-2.6.4.tar.gz");
FileStatus fileStatus = fs.getFileStatus(path);
// 参数分别为:文件路径 偏移起始位置 文件长度
BlockLocation[] locations = fs.getFileBlockLocations(path, 0, fileStatus.getLen());
System.out.println(locations);
for (BlockLocation location : locations) {
System.out.println(location);
}
/**
* 0,134217728,uplooking01 (偏移量从0开始,大小为128MB的块存储在节点uplooking01上)
134217728,61798247,uplooking01 (偏移量从128M开始,大小为59M的块(就是剩余大小)存储在节点uplooking01上)
可以看到,两个块都只存在uplooking01上的,这是因为这里的hadoop环境是伪分布式的
*/
}
/**
* 释放资源
* @throws IOException
*/
@After
public void cleanUp() throws IOException {
fs.close();
}
}
原文地址:http://blog.51cto.com/xpleaf/2074023
时间: 2024-11-09 03:52:36