package com.gw;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
/**
* @author wangweifeng
* @dercription: 实现往HDFS中新建文件夹、删除文件夹、创建文件、删除文件 获取HDFS集群上所有节点名称信息
* 、文件重命名、读取HDFS文件中的内容、 上传本地文件到HDFS中、下载HDFS中的文件本地文件系统、列出目录下所有文件
* (如果是目录则显示,层次显示目录内的文件)
*/
public class HdfsUtil {
// 初始化配置参数
static Configuration conf = new Configuration();
static {
String path = "/home/Hadoop/hadoop/etc/hadoop/";
conf.addResource(new Path(path + "core-site.xml"));
conf.addResource(new Path(path + "hdfs-site.xml"));
conf.addResource(new Path(path + "mapred-site.xml"));
}
//获取FileSystem
public static FileSystem getFs() throws IOException {
FileSystem fs = FileSystem.get(conf);
return fs;
}
// 实现往HDFS中新建文件夹
public static void mkDir(String path) throws IOException {
FileSystem fs = getFs();
Path srcPath = new Path(path);
boolean isok = fs.mkdirs(srcPath);
if (isok) {
System.out.println("create dir Ok!");
} else {
System.out.println("create dir failure!");
}
fs.close();
}
// 删除HDFS中的文件夹或是文件
public static void delete(String path) throws IOException {
FileSystem fs = getFs();
Path srcPath = new Path(path);
boolean isok = fs.delete(srcPath, true);
if (isok) {
System.out.println("delete ok!");
} else {
System.out.println("delete failure!");
}
fs.close();
}
// 在HDFS中创建文件并写入内容
public static void createFile(String path, byte[] contents)
throws IOException {
FileSystem fs = getFs();
Path dstPath = new Path(path); // 目标路径
// 打开一个输出流
FSDataOutputStream outputStream = fs.create(dstPath);
outputStream.write(contents);
outputStream.close();
fs.close();
System.out.println("file is created sucess!");
}
// 在HDFS中创建文件并写入内容
public static void createFile(String path, String contents)
throws IOException {
createFile(path, contents.getBytes("UTF-8"));
}
// 对HDFS中的文件重命名
public static void reNameFile(String oldName, String newName)
throws IOException {
FileSystem fs = getFs();
Path oldNamePath = new Path(oldName);
Path newNamePath = new Path(newName);
boolean isok = fs.rename(oldNamePath, newNamePath);
if (isok) {
System.out.println("rename ok!");
} else {
System.out.println("rename failure");
}
fs.close();
}
// 读取HDFS中的文件内容并打印到标准输出
public static void readFilePrint(String path) throws IOException {
FileSystem fs = getFs();
Path srcPath = new Path(path);
// 打开一个输入流
InputStream in = fs.open(srcPath);
try {
fs.open(srcPath);
IOUtils.copyBytes(in, System.out, 4096, false); // 复制到标准输出流
} finally {
IOUtils.closeStream(in);
}
}
// 读取文件内容
public static byte[] readFile(String path) throws IOException {
FileSystem fs = getFs();
if (isExist(path)) {
Path srcPath = new Path(path);
FSDataInputStream is = fs.open(srcPath);
FileStatus stat = fs.getFileStatus(srcPath);
byte[] buffer = new byte[(int) stat.getLen()];
is.readFully(0, buffer);
is.close();
fs.close();
return buffer;
} else {
throw new IOException("the file is not found .");
}
}
// 上传
public static void upLoadFromLoacl(String HDFS_Path, String Loacl_Path)
throws IOException {
FileSystem fs = getFs();
Path srcPath = new Path(Loacl_Path);
Path desPath = new Path(HDFS_Path);
fs.copyFromLocalFile(srcPath, desPath);
// 打印文件路径
System.out.println("Upload to " + conf.get("fs.default.name"));
System.out.println("------------list files------------" + "\n");
FileStatus[] fileStatus = fs.listStatus(desPath);
for (FileStatus status : fileStatus) {
System.out.println(status.getPath());
}
fs.close();
}
// 下载
public static void downLoadToLoacl(String HDFS_Path, String Loacl_Path)
throws IOException {
FileSystem fs = getFs();
Path srcPath = new Path(HDFS_Path);
Path desPath = new Path(Loacl_Path);
fs.copyToLocalFile(srcPath, desPath);
fs.close();
System.out.println("download to sucess! ");
}
// 判断文件或文件夹是否存在,如果存在则返回true,否在返回false
public static boolean isExist(String path) throws IOException {
FileSystem fs = getFs();
Path srcPath = new Path(path);
boolean isExist = false;
if (fs.isDirectory(srcPath)) {
isExist = true;
} else if (fs.isFile(srcPath)) {
isExist = true;
}
return isExist;
}
// 获取集群中所有数据节点的状态
public static void getDateNodeInfo() throws IOException {
FileSystem fs = getFs();
DistributedFileSystem hdfs = (DistributedFileSystem) fs;
DatanodeInfo[] dataNodeInfo = hdfs.getDataNodeStats();
for (int i = 0; i < dataNodeInfo.length; i++) {
System.out.println("DataNode_" + i + "_Name:"
+ dataNodeInfo[i].getHostName() + "DataNode_" + i + "Ip:"
+ dataNodeInfo[i].getInfoAddr());
}
}
public static void main(String[] args) throws IOException {
System.out.println("........开始测试HDFS......");
// 1、创建一个文件夹命名为HDTest2,并在该文件夹中创建一个文件夹test以便测试删除
//mkDir("/HDTest2");
//mkDir("/HDTest2/test");
// 2、删除/HDTest2/test
//delete("/HDTest2/test");
// 3、在HDFS中创建一个文件,并写入"helloworld"
//createFile("/HDTest2/helloworld.txt", "helloworld of HDFS!");
// 4、将/HDTest2/helloworld.txt 命名为/HDTest2/helloworld.ini
//reNameFile("/HDTest2/helloworld.txt", "/HDTest2/helloworld.ini");
// 5、将文件内容打印出来
//readFilePrint("/HDTest2/helloworld.ini");
// 6、将文件内容读出来,并创建 /HDTest2/test.txt文件
//createFile("/HDTest2/test.txt", readFile("/HDTest2/helloworld.ini"));
// 7、将本机桌面my.cnf文件上传到/HDTest2
//upLoadFromLoacl("/HDTest2/", "/home/Hadoop/Desktop/my.cnf");
// 8、将/HDTest2/test.txt下载到桌面
//downLoadToLoacl("/HDTest2/test.txt", "/home/Hadoop/Desktop");
// 9、打印DataNode的信息
//getDateNodeInfo();
}
}