HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件、删除文件、读取文件内容等操作。下面记录一下使用JAVA API对HDFS中的文件进行操作的过程。
对分HDFS中的文件操作主要涉及一下几个类:
Configuration类:该类的对象封转了客户端或者服务器的配置。
FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作。FileSystem fs = FileSystem.get(conf);通过FileSystem的静态方法get获得该对象。
FSDataInputStream和FSDataOutputStream:这两个类是HDFS中的输入输出流。分别通过FileSystem的open方法和create方法获得。
在eclipse中通过java api操作HDFS时,需要引用一些jar包,日下:
package hadoop; import java.io.IOException; import java.io.InputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class OperaHDFS { /*** * 加载配置文件 * **/ private static Configuration conf = new Configuration(); private static FileSystem fs = null; static { try { conf.set("fs.defaultFS", "hdfs://host:port"); conf.setInt("dfs.replication", 2); // 加载配置项 fs = FileSystem.get(conf); } catch (Exception e) { e.printStackTrace(); } } /*** * 上传本地文件到 HDFS上 * * **/ public static void uploadFile(String src, String dst) throws Exception { // 本地文件路径 Path srcPath = new Path(src); // HDFS文件路径 Path dstPath = new Path(dst); try { // 调用文件系统的文件复制函数,前面参数是指是否删除原文件,true为删除,默认为false fs.copyFromLocalFile(false, srcPath, dstPath); } catch (IOException e) { e.printStackTrace(); } System.out.println("上传成功!"); fs.close();// 释放资源 } /** * 在HDFS上创建一个文件夹 * * **/ public static void createDirOnHDFS(String path) throws Exception { Path p = new Path(path); boolean done = fs.mkdirs(p); if (done) { System.out.println("创建文件夹成功!"); } else { System.out.println("创建文件夹成功!"); } fs.close();// 释放资源 } /** * 在HDFS上创建一个文件 * * **/ public static void createFileOnHDFS(String dst, String content) throws Exception { Path dstPath = new Path(dst); // 打开一个输出流 FSDataOutputStream outputStream = fs.create(dstPath); outputStream.write(content.getBytes()); outputStream.close(); fs.close();// 释放资源 System.out.println("创建文件成功!"); } /** * 在HDFS上删除一个文件或文件夹 * * **/ public static void deleteFileOrDirOnHDFS(String path) throws Exception { Path p = new Path(path); boolean done = fs.deleteOnExit(p); if (done) { System.out.println("删除成功!"); } else { System.out.println("删除失败!"); } fs.close();// 释放资源 } /** * 重名名一个文件夹或者文件 * * **/ public static void renameFileOrDirOnHDFS(String oldName, String newName) throws Exception { Path oldPath = new Path(oldName); Path newPath = new Path(newName); boolean done = fs.rename(oldPath, newPath); if (done) { System.out.println("重命名文件夹或文件成功!"); } else { System.out.println("重命名文件夹或文件失败!"); } fs.close();// 释放资源 } /*** * * 读取HDFS中某个文件 * * **/ public static void readFile(String filePath) throws IOException { Path srcPath = new Path(filePath); InputStream in = null; try { in = fs.open(srcPath); IOUtils.copyBytes(in, System.out, 4096, false); // 复制到标准输出流 } finally { IOUtils.closeStream(in); } } /*** * * 读取HDFS某个文件夹的所有 文件,并打印 * * **/ public static void readAllFile(String path) { // 打印文件路径下的所有文件名 Path dstPath = new Path(path); FileStatus[] fileStatus = null; try { fileStatus = fs.listStatus(dstPath); } catch (IOException e) { e.printStackTrace(); } for (FileStatus status : fileStatus) { if (status.isDirectory()) { readAllFile(status.getPath().toString()); } else { System.out.println("文件: " + status.getPath()); try { readFile(status.getPath().toString()); } catch (IOException e) { e.printStackTrace(); } } } } /** * 从HDFS上下载文件或文件夹到本地 * * **/ public static void downloadFileorDirectoryOnHDFS(String src, String dst) throws Exception { Path srcPath = new Path(src); Path dstPath = new Path(dst); fs.copyToLocalFile(false, srcPath, dstPath); fs.close();// 释放资源 System.out.println("下载文件夹或文件成功!"); } public static void main(String[] args) throws Exception { uploadFile("D:\\text.txt", "/path/file"); deleteFileOrDirOnHDFS("/path/file"); createDirOnHDFS("/path"); createFileOnHDFS("/path/file", "context"); renameFileOrDirOnHDFS("/path", "/path"); readFile("/path/file"); readAllFile("/path/file"); downloadFileorDirectoryOnHDFS("/path/file", "D:\\text2.txt"); } }
时间: 2024-11-07 14:10:24