Hadoop学习笔记(3) Hadoop文件系统二

1 查询文件系统

  (1) 文件元数据:FileStatus,该类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及版权信息。FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。

  例:展示文件状态信息

public class ShowFileStatusTest{
    private MiniDFSCluster cluster;
    private FileSystem fs;

    @Before
    public void setup() throws IOException{
        Configuration conf = new Configuration();
        if(System.getProperty("test.build.data") == null){
            System.setProperties("test.build.data","/tmp");
        }
        cluster = new MiniDFSCluster(conf,1,true,null);
        fs = cluster.getFileSystem();
        OutputStream out = fs.create(new Path("/dir/file"));
        out.write("content".getBytes("UTF-8"));
        out.close();
    }

    @After
    public void tearDown() throws IOException{
        if(fs != null){
            fs.close();
        }
        if(cluster != null){
            cluster.shutdown();
        }
    }

    @Test(expected = FileNotFoundException.class)
    public void throwsNotFoundForNonExistentFile() throws IOException{
        fs.getFileStatus(new Path("no-such-file"));
    }

    @Test
    public void fileStatusForFile() throws IOException{
        Path file = new Path("/dir/file");
        FileStatus stat = fs.getFileStatus(file);
        assertThat(stat.getPath().tuUri().getPath(),is("/dir/file"));
        assertThat(stat.isDir(),is(false));
        assertThat(stat.getLen(),is(7L));
        assertThat(stat.getModificationTime(),
                is(lessThanOrEqualTo(System.currentTimeMillis())));
        assertThat(stat.getReplication(),is((short)1));
        assertThat(stat.getBlockSize(),is(64*1024*1024L));
        assertThar(stat.getOwner(),is("tom"));
        assertThat(stat.getGroup(),is("supergroup"));
        assertThat(stat.getPermission().toString(),is("rw-r--r--"));
    }

    @Test
    public void fileStatusForDirectory() throws IOException{
        Path dir = new Path("/dir");
        FileStatus stat = fs.getFileStatus(dir);
        assertThat(stat.getPath().tuUri().getPath(),is("/dir"));
        assertThat(stat.isDir(),is(true));
        assertThat(stat.getLen(),is(0L));
        assertThat(stat.getModificationTime(),
                is(lessThanOrEqualTo(System.currentTimeMillis())));
        assertThat(stat.getReplication(),is((short)0));
        assertThat(stat.getBlockSize(),is(0L));
        assertThar(stat.getOwner(),is("tom"));
        assertThat(stat.getGroup(),is("supergroup"));
        assertThat(stat.getPermission().toString(),is("rwxr-xr-x"));

    }
}

  (2) 列出文件

  FileSystem的listStatus()方法可以列出目录的内容:

public FileStatus[] listStatus(Path f) throws IOException// 允许使用PathFilter来限制匹配的文件和目录
public FileStatus[] listStatus(Path f,PathFilter filter) throws IOException// 指定一组目录,结果相当于依次轮流传递每条路径并对其调用listStatus()方法,再将FileStatus对象数组累积存入同一数组中
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files,PathFilter filter) throws IOException

  例:显示Hadoop文件系统一组路径的文件信息

public class ListStatus{
    public static void main(String[] args) throws Exception{
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI(uri),conf);

        Path[] paths = new Path[args.length];
        for(int i = 0; i < paths.length; i++){
            paths[i]  = new Path(args[i]);
        }

        FileStatus[] status = fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for(Path p : listedPaths){
            System.out.println(p);
        }
    }
}

  运行示例:

  % hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom

  (3)  文件模式

  Hadoop为执行通配提供了两个FileSystem方法:

public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern,PathFilter filter) throws IOException

  globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序。

  例:用于排除匹配正则表达式路径的PathFilter

public class RegexExcludePathFilter implements PathFilter{
    private final String regex;
    public RegexExcludePathFilter(String regex){
        this.regex = regex;
    }

    public boolean accept(Path path){
        return !path.toString().matches(regex);
    }
}

  如下示例可扩展到/2007/12/30:

  fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$"))

  (4) 删除数据

  使用FileSystem的delete()方法可以永久性删除文件和目录:

public void delete(Path f,boolean recursive) throws IOException

  当f是一个文件或空目录,recursive的值会被忽略。只有在recursive的值为true时,一个非空目录及其内容才能被删除。

2. 数据流

  (1) 文件读取剖析

  

  客户端通过调用FileSystem对象的open()方法打开希望读取的文件(step 1)。DistributedFileSystem通过使用RPC来调用namenode,以确定文件起始块的位置(step 2)。对于每一个块,namenode返回存有该块复本的datanode地址,此外,这些datanode根据它们与客户端的距离来排序。

  DistributedFileSystem类返回一个FSDataFileInputStream对象给客户端并读取数据,FSDataFileInputStream类转而封装DFSInputStream对象。接着客户端对这个输入流调用read()方法(step 3)。存储着文件起始块的datanode地址的DFSInputStream随即连接距离最近的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端(step 4)。到达块的末端时,DFSIputStream会关闭与该datanode的连接,然后寻找下一个块的最佳datanode(step 5)。

  客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它也要询问namenode来检索下一批所需块的datanode的位置。一旦客户端完成读取,就会对FSDataInputStream调用close()方法(step 6)。

  在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,,它便会尝试从这个块的另一个最近邻的datanode读取数据。它也会记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInpuStream也会通过校验和确认从datanode发来的数据是否完整。如果发现一个损坏的块,它就会在DFSInputStream试图从其他datanode读取一个块的复本之前通知namenode。

  该设计的重点是:namenode告知客户端每个块中最佳的datanode,并让客户端直接联系该datanode且检索数据。

  (2) 文件写入剖析

 

  客户端通过对DistributedFileSystem对象调用create()函数来创建文件(step 1)。DistributedFileSystem对namenode创建一个RPC调用,在文件系统的命名空间中创建一个新文件,此时该文件中还没有相应的数据块(step 2)。namenode执行各种不同的检查以确保这个文件不存在,并且客户端有创建该文件的权限。检查均通过,namenode就会创建新文件记录一条记录。DistributedFileSystem向客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。FSDataOutputStream封装一个DFSOutputStream。

  在客户端写入数据时(step 3),DFSOutputStream将它分成一个一个的数据包,并写入内部队列,称为"数据队列"(data queue)。DataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据备份。这一组datanode构成一个管线(假设复本数为3)。DataStreamer将数据包流式传输到管线中第1个datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。同样地,第2个datanode存储该数据包并且发送给管线中的第3个datanode(step 4).

  DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为"确认队列"(ask queue)。当收到管道中所有datanode确认信息后,该数据包才会从确认队列删除(step 5)。

  如果在数据写入期间,datanode发生故障,则执行以下操作。首先关闭管线,确认把队列中的任何数据包都添加回数据队列的最前端,以确保故障节点的下游的datanode不会漏掉任何一个数据包。为存储在另一个正常datanode的当前数据块制定一个新的标识,并将该标识传送给namenode,以便故障datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节点并把余下的数据块写入管线中的两个正常的datanode。namenode注意到复本量不足时,会在另一个节点上创建一个新的复本。

  客户端完成数据的写入后,会对数据流调用close()方法(step6)。该操作将剩余的所有数据包写入datanode管线中,并在联系namenode且发送文件写入完成信号之前,等待确认(step 7)。

3. 复本的布局

  Hadoop的默认布局策略是在运行客户端的节点上放第1个复本,第2个复本放在与第1个不同且随机另外选择的机架中节点上。其他复本放在集群中随机选择的节点上。一旦选定复本的放置位置,就会根据网络拓扑创建一个管线,如果复本数为3,则有如图所示的管线:

4 一致模型

  在创建一个文件后,希望能在文件系统的命名空间中立即可见,如:

Path p = new Path("p");
Fs.create(p);
assertThat(fs.exists(p),is(true))

  但是写入文件的内容并不能保证立即可见,即使数据流已经刷新并存储,所以文件长度显示为0:

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(),is(0L))

  当写入的数据超过一个块后,新的reader就能看见第一个块,总之,其他reader不能看见正在写入的块。

  HDFS调用FSDataOutputStream调用sync()方法可以强制所有的缓存与数据节点同步,因此,对所有的reader,HDFS能保证文件中到目前为止写入的数据均可见且一致。

Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(),is((long)"content".length()));

5. 通过distcp并行复制

  distcp的典型应用就是在两个HDFS集群之间传输数据,如果两个集群运行相同版本的Hadoop,就非常适合使用hdfs方案:

  % hadoop distcp hdfs://namenode1/foo hdfs://namenode2/bar

  该指令将第一个集群的/foo目录复制到第二个集群的/bar目录下,注意:源路径必须是绝对路径!

  默认情况下,distcp会跳过目标路径下已经存在的文件,但可通过-overwrite选项覆盖现有的文件。也可以通过-update选项来选择仅更新修改过的文件。

  当改变先前例子中第一个集群/foo子树下的一个文件,可通过如下命令将修改同步到第二个集群上。

  % hadoop distcp -update hdfs://namenode1/foo hdfs://namenode2/bar/foo

  当试图在两个运行着不同HDFS版本的集群上使用distcp复制数据,并且使用hdfs协议,由于两个版本的RPC系统不兼容,因此复制作业失败。可使用基于只读HTTP协议的HFTP文件系统并从源文件系统中读取数据。

  % hadoop distcp hftp://namenode1:50070/foo hdfs://namenode2/bar

6. Hadoop存档

  Hadoop存档文件或HAR文件,将文件存入HDFS块,减少namenode内存使用的同时,还能允许对文件进行透明的访问。

// 希望存档的文档
% hadoop fs -lsr /my/files

// 运行archive指令存档,第一个选项files.har是存档文件的名称,
// /my/files是需要存档的文件,/my是HAR文件的输出目录
% hadoop archive -archiveName files.har  /my/files /my

// 查看创建的文档
% hadoop fs -ls /my

// 如下命令可现实HAR文件的组成部分:两个索引文件及部分文件的集合
% hadoop fs -ls /my/files.har

// 递归列出存档文件中的部分文件
% hadoop fs -lsr har:///my/files.har

// 删除HAR文件,对于基础文件系统,HAR文件是一个目录
% hadoop fs -rmr /my/files.bar
时间: 2024-10-24 17:00:11

Hadoop学习笔记(3) Hadoop文件系统二的相关文章

Hadoop学习笔记_7_分布式文件系统HDFS --DataNode体系结构

分布式文件系统HDFS --DataNode体系结构 1.概述 DataNode作用:提供真实文件数据的存储服务. 文件块(block):最基本的存储单位[沿用的Linux操作系统地概念].对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block. 与Linux操作系统不同的是,一旦上传了一个小于Block大小的文件,则该文件会占用实际文件大小的空间. 2.进入hdfs-default.xml <prope

Hadoop学习笔记_5_分布式文件系统HDFS --shell操作

分布式文件系统HDFS --shell操作 分布式文件系统[Distributed File System]概述 数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 . 分布式文件系统特点: 是一种允许文件通过网络在多台主机上分享的文件系统,可让多机器上的多用户分享文件和存储空间. 通透性.让实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般

Hadoop学习笔记_6_分布式文件系统HDFS --NameNode体系结构

分布式文件系统HDFS --NameNode体系结构 NameNode 是整个文件系统的管理节点. 它维护着整个文件系统的文件目录树[为了使得检索速度更快,该目录树放在内存中], 文件/目录的元信息和每个文件对应的数据块列表. 接收用户的操作请求. Hadoop确保了NameNode的健壮性,不容易死亡.文件目录树以及文件/目录的元信息等归根到底是存放在硬盘中的,但是在Hadoop运行时,需要将其加载到内存中. 文件包括: fsimage:元数据镜像文件.存储某一时段NameNode内存元数据信

hadoop学习笔记:hadoop文件系统浅析

1.什么是分布式文件系统? 管理网络中跨多台计算机存储的文件系统称为分布式文件系统. 2.为什么需要分布式文件系统了? 原因很简单,当数据集的大小超过一台独立物理计算机的存储能力时候,就有必要对它进行分区(partition)并存储到若干台单独计算机上. 3.分布式系统比传统的文件的系统更加复杂 因为分布式文件系统架构在网络之上,因此分布式系统引入了网络编程的复杂性,所以分布式文件系统比普通文件系统更加复杂. 4.Hadoop的文件系统 很多童鞋会把hdfs等价于hadoop的文件系统,其实ha

hadoop学习笔记(一)——hadoop安装及测试

这几天乘着工作之余,学习了一下hadoop技术,跌跌撞撞的几天,终于完成了一个初步的hadoop的安装及测试,具体如下: 动力:工作中遇到的数据量太大,服务器已经很吃力,sql语句运行老半天,故想用大数据技术来改善一下 环境:centos5.11+jdk1.7+hadoop2.5.2 1.  伪分布安装步骤 关闭防火墙 修改ip 修改hostname 设置ssh自动登录 安装jdk 安装hadoop 注:此部分涉及到的Linux操作部分可以再下面的链接中找到,Linux初级操作 2.  安装jd

Hadoop学习笔记0001——Hadoop安装配置

Hadoop配置主要事项 1. 保证Master和Slave能够ping通: 2. 配置/etc/hosts文件: 3. 能够ssh无密码切换各台主机: 4. 安装sun公司的jdk,在/etc/profile中设置好环境变量: 5. 下载Hadoop,安装.配置.搭建Hadoop集群: 1.Hadoop简介 Hadoop是Apache软件基金会旗下的一个开源分布式计算平台.以Hadoop分布式文件系统(HDFS,Hadoop Distributed Filesystem)和MapReduce(

Hadoop 学习笔记五 ---Hadoop系统通信协议介绍

本文约定: DN: DataNode TT: TaskTracker NN: NameNode SNN: Secondry NameNode JT: JobTracker 本文介绍Hadoop各节点和Client之间通信协议. Hadoop的通信是建立在RPC的基础上,关于RPC的详解介绍大家可以参照 "hadoop rpc机制 && 将avro引入hadoop rpc机制初探" Hadoop中节点之间的通信是比较复杂的一个网络,若可以把它们之间的通信网络了解清楚,那么

Hadoop学习笔记—6.Hadoop Eclipse插件的使用

开篇:Hadoop是一个强大的并行软件开发框架,它可以让任务在分布式集群上并行处理,从而提高执行效率.但是,它也有一些缺点,如编码.调试Hadoop程序的难度较大,这样的缺点直接导致开发人员入门门槛高,开发难度大.因此,Hadop的开发者为了降低Hadoop的难度,开发出了Hadoop Eclipse插件,它可以直接嵌入到Hadoop开发环境中,从而实现了开发环境的图形界面化,降低了编程的难度. 一.天降神器插件-Hadoop Eclipse Hadoop Eclipse是Hadoop开发环境的

Hadoop 学习笔记四 ---Hadoop系统通信协议介绍

本文约定: DN: DataNode TT: TaskTracker NN: NameNode SNN: Secondry NameNode JT: JobTracker 本文介绍Hadoop各节点和Client之间通信协议. Hadoop的通信是建立在RPC的基础上,关于RPC的详解介绍大家可以参照 "hadoop rpc机制 && 将avro引入hadoop rpc机制初探" Hadoop中节点之间的通信是比较复杂的一个网络,若可以把它们之间的通信网络了解清楚,那么