掌握HDFS的Java API接口访问

HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。

1、获取文件系统

 1 /**
 2  * 获取文件系统
 3  *
 4  * @return FileSystem
 5  */
 6 public static FileSystem getFileSystem() {
 7     //读取配置文件
 8     Configuration conf = new Configuration();
 9     // 文件系统
10     FileSystem fs = null;
11
12     String hdfsUri = HDFSUri;
13     if(StringUtils.isBlank(hdfsUri)){
14         // 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统
15         try {
16             fs = FileSystem.get(conf);
17         } catch (IOException e) {
18             logger.error("", e);
19         }
20     }else{
21         // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统
22         try {
23             URI uri = new URI(hdfsUri.trim());
24             fs = FileSystem.get(uri,conf);
25         } catch (URISyntaxException | IOException e) {
26             logger.error("", e);
27         }
28     }
29
30     return fs;
31 }

2、创建文件目录

 1 /**
 2  * 创建文件目录
 3  *
 4  * @param path
 5  */
 6 public static void mkdir(String path) {
 7     try {
 8         // 获取文件系统
 9         FileSystem fs = getFileSystem();
10
11         String hdfsUri = HDFSUri;
12         if(StringUtils.isNotBlank(hdfsUri)){
13             path = hdfsUri + path;
14         }
15
16         // 创建目录
17         fs.mkdirs(new Path(path));
18
19         //释放资源
20         fs.close();
21     } catch (IllegalArgumentException | IOException e) {
22         logger.error("", e);
23     }
24 }

3、删除文件或者文件目录

 1 /**
 2  * 删除文件或者文件目录
 3  *
 4  * @param path
 5  */
 6 public static void rmdir(String path) {
 7     try {
 8         // 返回FileSystem对象
 9         FileSystem fs = getFileSystem();
10
11         String hdfsUri = HDFSUri;
12         if(StringUtils.isNotBlank(hdfsUri)){
13             path = hdfsUri + path;
14         }
15
16         // 删除文件或者文件目录  delete(Path f) 此方法已经弃用
17         fs.delete(new Path(path),true);
18
19         // 释放资源
20         fs.close();
21     } catch (IllegalArgumentException | IOException e) {
22         logger.error("", e);
23     }
24 }

3、根据filter获取目录下的文件

 1 /**
 2  * 根据filter获取目录下的文件
 3  *
 4  * @param path
 5  * @param pathFilter
 6  * @return String[]
 7  */
 8 public static String[] ListFile(String path,PathFilter pathFilter) {
 9     String[] files = new String[0];
10
11     try {
12         // 返回FileSystem对象
13         FileSystem fs = getFileSystem();
14
15         String hdfsUri = HDFSUri;
16         if(StringUtils.isNotBlank(hdfsUri)){
17             path = hdfsUri + path;
18         }
19
20         FileStatus[] status;
21         if(pathFilter != null){
22             // 根据filter列出目录内容
23             status = fs.listStatus(new Path(path),pathFilter);
24         }else{
25             // 列出目录内容
26             status = fs.listStatus(new Path(path));
27         }
28
29         // 获取目录下的所有文件路径
30         Path[] listedPaths = FileUtil.stat2Paths(status);
31         // 转换String[]
32         if (listedPaths != null && listedPaths.length > 0){
33             files = new String[listedPaths.length];
34             for (int i = 0; i < files.length; i++){
35                 files[i] = listedPaths[i].toString();
36             }
37         }
38         // 释放资源
39         fs.close();
40     } catch (IllegalArgumentException | IOException e) {
41         logger.error("", e);
42     }
43
44     return files;
45 }

4、文件上传至 HDFS

 1 /**
 2  * 文件上传至 HDFS
 3  *
 4  * @param delSrc
 5  * @param overwrite
 6  * @param srcFile
 7  * @param destPath
 8  */
 9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
10     // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt
11     Path srcPath = new Path(srcFile);
12
13     // 目的路径
14     String hdfsUri = HDFSUri;
15     if(StringUtils.isNotBlank(hdfsUri)){
16         destPath = hdfsUri + destPath;
17     }
18     Path dstPath = new Path(destPath);
19
20     // 实现文件上传
21     try {
22         // 获取FileSystem对象
23         FileSystem fs = getFileSystem();
24         fs.copyFromLocalFile(srcPath, dstPath);
25         fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
26         //释放资源
27         fs.close();
28     } catch (IOException e) {
29         logger.error("", e);
30     }
31 }

5、从 HDFS 下载文件

 1 /**
 2  * 从 HDFS 下载文件
 3  *
 4  * @param srcFile
 5  * @param destPath
 6  */
 7 public static void getFile(String srcFile,String destPath) {
 8     // 源文件路径
 9     String hdfsUri = HDFSUri;
10     if(StringUtils.isNotBlank(hdfsUri)){
11         srcFile = hdfsUri + srcFile;
12     }
13     Path srcPath = new Path(srcFile);
14
15     // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/
16     Path dstPath = new Path(destPath);
17
18     try {
19         // 获取FileSystem对象
20         FileSystem fs = getFileSystem();
21         // 下载hdfs上的文件
22         fs.copyToLocalFile(srcPath, dstPath);
23         // 释放资源
24         fs.close();
25     } catch (IOException e) {
26         logger.error("", e);
27     }
28 }

6、获取 HDFS 集群节点信息

 1 /**
 2  * 获取 HDFS 集群节点信息
 3  *
 4  * @return DatanodeInfo[]
 5  */
 6 public static DatanodeInfo[] getHDFSNodes() {
 7     // 获取所有节点
 8     DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];
 9
10     try {
11         // 返回FileSystem对象
12         FileSystem fs = getFileSystem();
13
14         // 获取分布式文件系统
15         DistributedFileSystem hdfs = (DistributedFileSystem)fs;
16
17         dataNodeStats = hdfs.getDataNodeStats();
18     } catch (IOException e) {
19         logger.error("", e);
20     }
21     return dataNodeStats;
22 }

7、查找某个文件在 HDFS集群的位置

 1 /**
 2  * 查找某个文件在 HDFS集群的位置
 3  *
 4  * @param filePath
 5  * @return BlockLocation[]
 6  */
 7 public static BlockLocation[] getFileBlockLocations(String filePath) {
 8     // 文件路径
 9     String hdfsUri = HDFSUri;
10     if(StringUtils.isNotBlank(hdfsUri)){
11         filePath = hdfsUri + filePath;
12     }
13     Path path = new Path(filePath);
14
15     // 文件块位置列表
16     BlockLocation[] blkLocations = new BlockLocation[0];
17     try {
18         // 返回FileSystem对象
19         FileSystem fs = getFileSystem();
20         // 获取文件目录
21         FileStatus filestatus = fs.getFileStatus(path);
22         //获取文件块位置列表
23         blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
24     } catch (IOException e) {
25         logger.error("", e);
26     }
27     return blkLocations;
28 }

8、文件重命名

 1 /**
 2  * 文件重命名
 3  *
 4  * @param srcPath
 5  * @param dstPath
 6  */
 7 public boolean rename(String srcPath, String dstPath){
 8     boolean flag = false;
 9     try    {
10         // 返回FileSystem对象
11         FileSystem fs = getFileSystem();
12
13         String hdfsUri = HDFSUri;
14         if(StringUtils.isNotBlank(hdfsUri)){
15             srcPath = hdfsUri + srcPath;
16             dstPath = hdfsUri + dstPath;
17         }
18
19         flag = fs.rename(new Path(srcPath), new Path(dstPath));
20     } catch (IOException e) {
21         logger.error("{} rename to {} error.", srcPath, dstPath);
22     }
23
24     return flag;
25 }

9、判断目录是否存在

 1 /**
 2  * 判断目录是否存在
 3  *
 4  * @param srcPath
 5  * @param dstPath
 6  */
 7 public boolean existDir(String filePath, boolean create){
 8     boolean flag = false;
 9
10     if (StringUtils.isEmpty(filePath)){
11         return flag;
12     }
13
14     try{
15         Path path = new Path(filePath);
16         // FileSystem对象
17         FileSystem fs = getFileSystem();
18
19         if (create){
20             if (!fs.exists(path)){
21                 fs.mkdirs(path);
22             }
23         }
24
25         if (fs.isDirectory(path)){
26             flag = true;
27         }
28     }catch (Exception e){
29         logger.error("", e);
30     }
31
32     return flag;
33 }

10  查看HDFS文件的最后修改时间

  1. public void testgetModifyTime() throws Exception {
  2. Configuration conf = new Configuration();
  3. FileSystem hdfs = FileSystem.get(conf);
  4. Path dst = new Path(hdfsPath);
  5. FileStatus files[] = hdfs.listStatus(dst);
  6. for (FileStatus file : files) {
  7. System.out.println(file.getPath() + "\t"
  8. + file.getModificationTime());
  9. System.out.println(file.getPath() + "\t"
  10. + new Date(file.getModificationTime()));
  11. }
  1. // 查看HDFS文件是否存在
  2. public void testExists() throws Exception {
  3. Configuration conf = new Configuration();
  4. FileSystem hdfs = FileSystem.get(conf);
  5. Path dst = new Path(hdfsPath + "file01.txt");
  6. boolean ok = hdfs.exists(dst);
  7. System.out.println(ok ? "文件存在" : "文件不存在");
  8. }
  1. // 获取HDFS集群上所有节点名称
  2. public void testGetHostName() throws Exception {
  3. Configuration conf = new Configuration();
  4. DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem
  5. .get(conf);
  6. DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
  7. for (DatanodeInfo dataNode : dataNodeStats) {
  8. System.out.println(dataNode.getHostName() + "\t"
  9. + dataNode.getName());
  10. }
  11. }
时间: 2024-07-29 01:14:39

掌握HDFS的Java API接口访问的相关文章

熟练掌握HDFS的Java API接口访问

HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件).HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的. 通过Java API接口对HDFS进行操作,我将其整理成工具类,地址见底部 1.获取文件系统 1 /** 2 * 获取文件系统 3 * 4 * @return F

API接口访问频次限制 / 网站恶意爬虫限制 / 网站恶意访问限制 方案

API接口访问频次限制 / 网站恶意爬虫限制 / 网站恶意访问限制 方案 采用多级拦截,后置拦截的方式体系化解决 1 分层拦截 1.1 第一层 商业web应用防火墙(WAF) 直接用商业服务 传统的F5硬件,不过现在用的很少了 云时代就用云时代的产品,典型代表 阿里云 web应用防火墙 1.2 第二层 API 网关(API Gateway)层 API 网关(API Gateway) kong为代表的开源 API 网关 实现 openresty + lua 自实现 windows平台 安全狗.云锁

Sample: Write And Read data from HDFS with java API

HDFS: hadoop distributed file system 它抽象了整个集群的存储资源,可以存放大文件. 文件采用分块存储复制的设计.块的默认大小是64M. 流式数据访问,一次写入(现支持append),多次读取. 不适合的方面: 低延迟的数据访问 解决方案:HBASE 大量的小文件 解决方案:combinefileinputformat ,或直接把小文件合并成sequencefile存储到hdfs. HDFS的块 块是独立的存储单元.但是如果文件小于默认的块大小如64M,它不会占

HDFS中JAVA API的使用

转自:http://www.cnblogs.com/liuling/p/2013-6-17-01.html 相关源代码:http://www.cnblogs.com/ggjucheng/archive/2013/02/19/2917020.html HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件.删除文件.读取文件内容等操作.下面记录一下使用JAVA API对HDFS中的文件进行操作的过程. 对分HDFS中的文件操作主要涉及一下几个类: Configurat

使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法. 1.先解决依赖,pom <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <scope>provided</scope> </dependency> 2.配置文

IDEA 创建HDFS项目 JAVA api

1.创建quickMaven 1.在properties中写hadoop 的版本号并且通过EL表达式的方式映射到dependency中 2.写一个repostory将依赖加载到本地仓库中 这是加载完成的页面 这是开发代码 package com.kevin.hadoop; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.IOUtils;imp

hdfs的Java Api开发

1.创建maven工程并导入jar包 jdk使用1.8.maven3.x版本 pom.xml添加一下内容 <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dep

android框架Java API接口总注释/**@hide*/和internal API

Android有两种类型的API是不能经由SDK访问的 l 第一种是位于com.android.internal包中的API我,位于frameworks/base/core/java/com/android/internal/.我将称之为internal API. l 第二种API类型是一系列被标记为@hide属性的类和方法.从严格意义上来讲,这不是一个单一的API和类,而是一些的被隐藏的API和类,称之为hidden API Internal和hidden API的区别 Hidden API之

利用Java API通过路径过滤上传多文件至HDFS

在本地文件上传至HDFS过程中,很多情况下一个目录包含很多个文件,而我们需要对这些文件进行筛选,选出符合我们要求的文件,上传至HDFS.这时就需要我们用到文件模式. 在项目开始前,我们先掌握文件模式 1.文件模式 在某个单一操作中处理一系列文件是很常见的.例如一个日志处理的MapReduce作业可能要分析一个月的日志量.如果一个文件一个文件或者一个目录一个目录的声明那就太麻烦了,我们可以使用通配符(wild card)来匹配多个文件(这个操作也叫做globbing). Hadoop提供了两种方法