1 查询文件系统
(1) 文件元数据:FileStatus,该类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及版权信息。FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。
例:展示文件状态信息
public class ShowFileStatusTest{ private MiniDFSCluster cluster; private FileSystem fs; @Before public void setup() throws IOException{ Configuration conf = new Configuration(); if(System.getProperty("test.build.data") == null){ System.setProperties("test.build.data","/tmp"); } cluster = new MiniDFSCluster(conf,1,true,null); fs = cluster.getFileSystem(); OutputStream out = fs.create(new Path("/dir/file")); out.write("content".getBytes("UTF-8")); out.close(); } @After public void tearDown() throws IOException{ if(fs != null){ fs.close(); } if(cluster != null){ cluster.shutdown(); } } @Test(expected = FileNotFoundException.class) public void throwsNotFoundForNonExistentFile() throws IOException{ fs.getFileStatus(new Path("no-such-file")); } @Test public void fileStatusForFile() throws IOException{ Path file = new Path("/dir/file"); FileStatus stat = fs.getFileStatus(file); assertThat(stat.getPath().tuUri().getPath(),is("/dir/file")); assertThat(stat.isDir(),is(false)); assertThat(stat.getLen(),is(7L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(),is((short)1)); assertThat(stat.getBlockSize(),is(64*1024*1024L)); assertThar(stat.getOwner(),is("tom")); assertThat(stat.getGroup(),is("supergroup")); assertThat(stat.getPermission().toString(),is("rw-r--r--")); } @Test public void fileStatusForDirectory() throws IOException{ Path dir = new Path("/dir"); FileStatus stat = fs.getFileStatus(dir); assertThat(stat.getPath().tuUri().getPath(),is("/dir")); assertThat(stat.isDir(),is(true)); assertThat(stat.getLen(),is(0L)); assertThat(stat.getModificationTime(), is(lessThanOrEqualTo(System.currentTimeMillis()))); assertThat(stat.getReplication(),is((short)0)); assertThat(stat.getBlockSize(),is(0L)); assertThar(stat.getOwner(),is("tom")); assertThat(stat.getGroup(),is("supergroup")); assertThat(stat.getPermission().toString(),is("rwxr-xr-x")); } }
(2) 列出文件
FileSystem的listStatus()方法可以列出目录的内容:
public FileStatus[] listStatus(Path f) throws IOException// 允许使用PathFilter来限制匹配的文件和目录 public FileStatus[] listStatus(Path f,PathFilter filter) throws IOException// 指定一组目录,结果相当于依次轮流传递每条路径并对其调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中 public FileStatus[] listStatus(Path[] files) throws IOException public FileStatus[] listStatus(Path[] files,PathFilter filter) throws IOException
例:显示Hadoop文件系统一组路径的文件信息
public class ListStatus{ public static void main(String[] args) throws Exception{ String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI(uri),conf); Path[] paths = new Path[args.length]; for(int i = 0; i < paths.length; i++){ paths[i] = new Path(args[i]); } FileStatus[] status = fs.listStatus(paths); Path[] listedPaths = FileUtil.stat2Paths(status); for(Path p : listedPaths){ System.out.println(p); } } }
运行示例:
% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
(3) 文件模式
Hadoop为执行通配提供了两个FileSystem方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException public FileStatus[] globStatus(Path pathPattern,PathFilter filter) throws IOException
globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序。
例:用于排除匹配正则表达式路径的PathFilter
public class RegexExcludePathFilter implements PathFilter{ private final String regex; public RegexExcludePathFilter(String regex){ this.regex = regex; } public boolean accept(Path path){ return !path.toString().matches(regex); } }
如下示例可扩展到/2007/12/30:
fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$"))
(4) 删除数据
使用FileSystem的delete()方法可以永久性删除文件和目录:
public void delete(Path f,boolean recursive) throws IOException
当f是一个文件或空目录,recursive的值会被忽略。只有在recursive的值为true时,一个非空目录及其内容才能被删除。
2. 数据流
(1) 文件读取剖析
客户端通过调用FileSystem对象的open()方法打开希望读取的文件(step 1)。DistributedFileSystem通过使用RPC来调用namenode,以确定文件起始块的位置(step 2)。对于每一个块,namenode返回存有该块复本的datanode地址,此外,这些datanode根据它们与客户端的距离来排序。
DistributedFileSystem类返回一个FSDataFileInputStream对象给客户端并读取数据,FSDataFileInputStream类转而封装DFSInputStream对象。接着客户端对这个输入流调用read()方法(step 3)。存储着文件起始块的datanode地址的DFSInputStream随即连接距离最近的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端(step 4)。到达块的末端时,DFSIputStream会关闭与该datanode的连接,然后寻找下一个块的最佳datanode(step 5)。
客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它也要询问namenode来检索下一批所需块的datanode的位置。一旦客户端完成读取,就会对FSDataInputStream调用close()方法(step 6)。
在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,,它便会尝试从这个块的另一个最近邻的datanode读取数据。它也会记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInpuStream也会通过校验和确认从datanode发来的数据是否完整。如果发现一个损坏的块,它就会在DFSInputStream试图从其他datanode读取一个块的复本之前通知namenode。
该设计的重点是:namenode告知客户端每个块中最佳的datanode,并让客户端直接联系该datanode且检索数据。
(2) 文件写入剖析
客户端通过对DistributedFileSystem对象调用create()函数来创建文件(step 1)。DistributedFileSystem对namenode创建一个RPC调用,在文件系统的命名空间中创建一个新文件,此时该文件中还没有相应的数据块(step 2)。namenode执行各种不同的检查以确保这个文件不存在,并且客户端有创建该文件的权限。检查均通过,namenode就会创建新文件记录一条记录。DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。FSDataOutputStream封装一个DFSOutputStream。
在客户端写入数据时(step 3),DFSOutputStream将它分成一个一个的数据包,并写入内部队列,称为"数据队列"(data queue)。DataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据备份。这一组datanode构成一个管线(假设复本数为3)。DataStreamer将数据包流式传输到管线中第1个datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。同样地,第2个datanode存储该数据包并且发送给管线中的第3个datanode(step 4).
DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为"确认队列"(ask queue)。当收到管道中所有datanode确认信息后,该数据包才会从确认队列删除(step 5)。
如果在数据写入期间,datanode发生故障,则执行以下操作。首先关闭管线,确认把队列中的任何数据包都添加回数据队列的最前端,以确保故障节点的下游的datanode不会漏掉任何一个数据包。为存储在另一个正常datanode的当前数据块制定一个新的标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节点并把余下的数据块写入管线中的两个正常的datanode。namenode注意到复本量不足时,会在另一个节点上创建一个新的复本。
客户端完成数据的写入后,会对数据流调用close()方法(step6)。该操作将剩余的所有数据包写入datanode管线中,并在联系namenode且发送文件写入完成信号之前,等待确认(step 7)。
3. 复本的布局
Hadoop的默认布局策略是在运行客户端的节点上放第1个复本,第2个复本放在与第1个不同且随机另外选择的机架中节点上。其他复本放在集群中随机选择的节点上。一旦选定复本的放置位置,就会根据网络拓扑创建一个管线,如果复本数为3,则有如图所示的管线:
4 一致模型
在创建一个文件后,希望能在文件系统的命名空间中立即可见,如:
Path p = new Path("p"); Fs.create(p); assertThat(fs.exists(p),is(true))
但是写入文件的内容并不能保证立即可见,即使数据流已经刷新并存储,所以文件长度显示为0:
Path p = new Path("p"); OutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); assertThat(fs.getFileStatus(p).getLen(),is(0L))
当写入的数据超过一个块后,新的reader就能看见第一个块,总之,其他reader不能看见正在写入的块。
HDFS调用FSDataOutputStream调用sync()方法可以强制所有的缓存与数据节点同步,因此,对所有的reader,HDFS能保证文件中到目前为止写入的数据均可见且一致。
Path p = new Path("p"); FSDataOutputStream out = fs.create(p); out.write("content".getBytes("UTF-8")); out.flush(); out.sync(); assertThat(fs.getFileStatus(p).getLen(),is((long)"content".length()));
5. 通过distcp并行复制
distcp的典型应用就是在两个HDFS集群之间传输数据,如果两个集群运行相同版本的Hadoop,就非常适合使用hdfs方案:
% hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar
该指令将第一个集群的/foo目录复制到第二个集群的/bar目录下,注意:源路径必须是绝对路径!
默认情况下,distcp会跳过目标路径下已经存在的文件,但可通过-overwrite选项覆盖现有的文件。也可以通过-update选项来选择仅更新修改过的文件。
当改变先前例子中第一个集群/foo子树下的一个文件,可通过如下命令将修改同步到第二个集群上。
% hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo
当试图在两个运行着不同HDFS版本的集群上使用distcp复制数据,并且使用hdfs协议,由于两个版本的RPC系统不兼容,因此复制作业失败。可使用基于只读HTTP协议的HFTP文件系统并从源文件系统中读取数据。
% hadoop distcp hftp://namenode1:50070/foo hdfs://namenode2/bar
6. Hadoop存档
Hadoop存档文件或HAR文件,将文件存入HDFS块,减少namenode内存使用的同时,还能允许对文件进行透明的访问。
// 希望存档的文档 % hadoop fs -lsr /my/files // 运行archive指令存档,第一个选项files.har是存档文件的名称, // /my/files是需要存档的文件,/my是HAR文件的输出目录 % hadoop archive -archiveName files.har /my/files /my // 查看创建的文档 % hadoop fs -ls /my // 如下命令可现实HAR文件的组成部分:两个索引文件及部分文件的集合 % hadoop fs -ls /my/files.har // 递归列出存档文件中的部分文件 % hadoop fs -lsr har:///my/files.har // 删除HAR文件,对于基础文件系统,HAR文件是一个目录 % hadoop fs -rmr /my/files.bar