DataNode文件系统源码分析

在DataNode的本地存储空间上,与存储服务密切相关的,比如创建数据块,恢复数据块,数据块校验等相关的代码都在org.apache.hadoop.hdfs.server.datanode.fsdataset包下(代码版本CDH5.1)

首先说下org.apache.hadoop.hdfs.server.datanode.fsdataset下的主要接口,FsDatasetSpi接口的方法比较多,主要分三类。第一类主要是和数据块相关的,如创建rbw状态和temporary状态的block,追加block,恢复block,提交block,缓存block,打开block的输出流等。第二类主要是目录管理方面的,获取volume列表,获取block对应的volume,trash目录相关的管理等。第三类主要是FsDataset自身的健康检查,资源回收等。FsDataSetSpi对应cdh3x版本中的FsDataSet接口。

FsVolumeSpi接口主要是用于volume管理,方法比较少,代码如下

public interface FsVolumeSpi {
  //获取volume下的存储UUID
  public String getStorageID();

  //获取BlockPool的列表
  public String[] getBlockPoolList();

  //获取可用的存储空间大小
  public long getAvailable() throws IOException;

  //获取volume的基本路径(current目录的父目录路径)
  public String getBasePath();

  //获取volume的绝对路径
  public String getPath(String bpid) throws IOException;

  //获取block的提交目录
  public File getFinalizedDir(String bpid) throws IOException;
  
  //获取存储类型
  public StorageType getStorageType();
}

RollingLogs接口用于对DataNode上的EditLog进行管理,内部包含两个重要的接口。LineIterator用于遍历EditLog的内容,Appender接口用于追加内容到EditLog。

VolumeChoosingPolicy接口提供写入block副本的volume选择策略,实现类有两个,和接口在同一个包下,分别是RoundRobinVolumeChoosingPolicy和AvailableSpaceVolumeChoosingPolicy。

org.apache.hadoop.hdfs.server.datanode.fsdataset.impl主要包括相关接口的实现类和一些工具类。FsDataSet将存储空间分为三个级别,LDir(cdh3x的FSDir),FsVolumeImpl(cdh3x的FsVolume)和FsVolumeList(cdh3x的FsVolumeSet)。LDir代表current目录下的子目录,FsVolumeImpl代表${dfs.datanode.data.dir}中的一项,DataNode可以有多个数据目录,FsVolumeList负责管理FsVolumeImpl对象。

以创建数据块为入口,详细分析下代码的执行流程,在HDFS支持Append特性之前,一个block副本在DataNode的状态要么是finalized,要么是temporary。temporary状态的block会在DataNode重启过程中被删除。但支持了Append之后,HDFS必须为正在构建中的block提供更强的可持久化支持,一些temporary的block副本必须在DataNode重启过程中持久存在。所以在后续的版本中代码的变动比较大。详情参考https://issues.apache.org/jira/browse/HDFS-265

在cdh3x中,FsDataSet中的writeToBlock方法用于选择block写入的volume,并创建写入的输出流,新版本中将这个方法拆分为三个方法,分别是createRbw,createTemporary和append。详情见https://issues.apache.org/jira/browse/HDFS-543。rbw状态代表一个block副本刚刚被创建或者被Append,temporary则代表DataNode间block的备份和reblance。下面详细分析下客户端写入文件创建block输出流的过程。首先看一段cdh3x的代码以做对比

BlockReceiver(Block block, DataInputStream in, String inAddr,
                String myAddr, boolean isRecovery, String clientName, 
                DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
      
      //通过调用writeToBlock方法创建流对象
      streams = datanode.data.writeToBlock(block, isRecovery,
                              clientName == null || clientName.length() == 0);
      this.finalized = false;
      if (streams != null) {
        //获取输出流对象
        this.out = streams.dataOut;
        //获取校验信息输出流对象
        this.checksumOut = new DataOutputStream(new BufferedOutputStream(streams.checksumOut, SMALL_BUFFER_SIZE));
      }
}

cdh3x中通过调用writeToBlock来创建对应的输出流,而在新代码中获取输出流对象的逻辑相对复杂一些。代码如下

      //如果是DataNode节点发起的数据块复制或者移动
      if (isDatanode) { 
        //创建temporary状态的replica
        replicaInfo = datanode.data.createTemporary(block);
      } else {
        //如果是客户端发起的,根据所处的阶段执行不同的操作
        switch (stage) {
        //pipeline启动创建
        case PIPELINE_SETUP_CREATE:
          //创建rbw状态的replicaInfo
          replicaInfo = datanode.data.createRbw(block);
          //向NameNode已经创建block replica的消息
          datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid());
          break;
        case PIPELINE_SETUP_STREAMING_RECOVERY:
          //获取rbw恢复的replicaInfo
          replicaInfo = datanode.data.recoverRbw(block, newGs, minBytesRcvd, maxBytesRcvd          );
          //设置block新版本号
          block.setGenerationStamp(newGs);
          break;
        case PIPELINE_SETUP_APPEND:
          //block追加操作
          replicaInfo = datanode.data.append(block, newGs, minBytesRcvd);
          if (datanode.blockScanner != null) {
            //通过blockScanner删除旧的block
            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),block.getLocalBlock());
          }
          //设置block的新版本号
          block.setGenerationStamp(newGs);
          //向NameNode发送变更消息
          datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid());
          break;
        case PIPELINE_SETUP_APPEND_RECOVERY:
          //获取append恢复的replicaInfo
          replicaInfo = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
          if (datanode.blockScanner != null) { 
            //通过blockScanner删除旧的block
            datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
                block.getLocalBlock());
          }
          //设置block的新版本号
          block.setGenerationStamp(newGs);
          //向NameNode发送变更消息
          datanode.notifyNamenodeReceivingBlock(block, replicaInfo.getStorageUuid());
          break;
        case TRANSFER_RBW:
        case TRANSFER_FINALIZED:
          //DataNode之间传输创建的replicaInfo
          replicaInfo = datanode.data.createTemporary(block);
          break;
        default: throw new IOException("Unsupported stage " + stage + 
              " while receiving block " + block + " from " + inAddr);
        }
      }
      this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
        datanode.getDnConf().dropCacheBehindWrites :
          cachingStrategy.getDropBehind();
      this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
      
      final boolean isCreate = isDatanode || isTransfer 
          || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
      //创建输出流对象
      streams = replicaInfo.createStreams(isCreate, requestedChecksum);

      //获取校验相关的信息
      this.clientChecksum = requestedChecksum;
      this.diskChecksum = streams.getChecksum();
      this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);
      this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
      this.checksumSize = diskChecksum.getChecksumSize();
      //获取数据输出流
      this.out = streams.getDataOut();

由于支持了Append操作,block副本所处的状态更多了,代码实现上也更加复杂。下面详细分析下FsDatasetImpl类下的createRbw方法

public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
      throws IOException {
    //从volumeMap中获取replicaInfo,因为是新创建的,如果在volumeMap存在,则不能继续创建
    ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), 
        b.getBlockId());
    if (replicaInfo != null) {
      throw new ReplicaAlreadyExistsException("Block " + b +
      " already exists in state " + replicaInfo.getState() +
      " and thus cannot be created.");
    }
    //根据相应的规则获取要写入的volume
    FsVolumeImpl v = volumes.getNextVolume(b.getNumBytes());
    //在得到的volume下创建rbw文件
    File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
    //创建ReplicaBeingWritten对象,此对象继承自ReplicaInfo
    ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), 
        b.getGenerationStamp(), v, f.getParentFile());
    //将新创建的replicaInfo加入到volumeMap中
    volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
    return newReplicaInfo;
  }

在选择写入的volume的时候,用户可以根据dfs.datanode.fsdataset.volume.choosing.policy属性来选择使用何种策略来选择volume。默认提供两种策略,分别是根据volume可用空间和轮询的方法来选择volume,对应的实现类分别是AvailableSpaceVolumeChoosingPolicy和RoundRobinVolumeChoosingPolicy,它们都继承自VolumeChoosingPolicy接口,用户也可以根据自己的需求来自定义选择策略。

时间: 2024-11-05 14:41:40

DataNode文件系统源码分析的相关文章

Hadoop之HDFS原理及文件上传下载源码分析(上)

HDFS原理 首先说明下,hadoop的各种搭建方式不再介绍,相信各位玩hadoop的同学随便都能搭出来. 楼主的环境: 操作系统:Ubuntu 15.10 hadoop版本:2.7.3 HA:否(随便搭了个伪分布式) 文件上传 下图描述了Client向HDFS上传一个200M大小的日志文件的大致过程: 首先,Client发起文件上传请求,即通过RPC与NameNode建立通讯. NameNode与各DataNode使用心跳机制来获取DataNode信息.NameNode收到Client请求后,

Hbase写入hdfs源码分析

版权声明:本文由熊训德原创文章,转载请注明出处: 文章原文链接:https://www.qcloud.com/community/article/258 来源:腾云阁 https://www.qcloud.com/community 本文档从源码角度分析了,hbase作为dfs client写入hdfs的hadoop sequence文件最终刷盘落地的过程.之前在<wal线程模型源码分析>中描述wal的写过程时说过会写入hadoop sequence文件,hbase为了保证数据的安全性,一般都

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

Hadoop2源码分析-HDFS核心模块分析

1.概述 这篇博客接着<Hadoop2源码分析-RPC机制初识>来讲述,前面我们对MapReduce.序列化.RPC进行了分析和探索,对Hadoop V2的这些模块都有了大致的了解,通过对这些模块的研究,我们明白了MapReduce的运行流程以及内部的实现机制,Hadoop的序列化以及它的通信机制(RPC).今天我们来研究另一个核心的模块,那就是Hadoop的分布式文件存储系统——HDFS,下面是今天分享的内容目录: HDFS简述 NameNode DataNode 接下来,我们开始今天的分享

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Linux内核源码分析--内核启动之(6)Image内核启动(do_basic_setup函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(6)Image内核启动(do_basic_setup函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938396.html 在基本分析完内核启动流程的之后,还有一个比较重要的初始化函数没有分析,那就是do_basic_setup.在内核init线程中调用了do_basic_setup,这个函数也做了很多内核和驱动的初始化工作,详解

Linux内核源码分析--内核启动之(3)Image内核启动(C语言部分)(Linux-3.0 ARMv7) 【转】

原文地址:Linux内核源码分析--内核启动之(3)Image内核启动(C语言部分)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938390.html 在构架相关的汇编代码运行完之后,程序跳入了构架无关的内核C语言代码:init/main.c中的start_kernel函数,在这个函数中Linux内核开始真正进入初始化阶段, 下面我就顺这代码逐个函数的解释,但是这里并不会过于深入

Linux tcp被动打开内核源码分析

[我是从2个角度来看,其实所谓2个角度,是发现我分析源码时,分析重复了,写了2个分析报告,所以现在都贴出来.] [如果你是想看看,了解一下内核tcp被动打开时如何实现的话,我觉得还是看看概念就可以了,因为即使看了源码,过一个个礼拜你就忘了,如果是你正在修改协议栈,为不知道流程而发愁,那么希望你能看看源码以及注释,希望你给你帮助.] 概念: tcp被动打开,前提是你listen,这个被动打开的前提.你listen过后,其实创建了一个监听套接字,专门负责监听,不会负责传输数据. 当第一个syn包到达

Hadoop之HDFS原理及文件上传下载源码分析(下)

上篇Hadoop之HDFS原理及文件上传下载源码分析(上)楼主主要介绍了hdfs原理及FileSystem的初始化源码解析, Client如何与NameNode建立RPC通信.本篇将继续介绍hdfs文件上传.下载源解析. 文件上传 先上文件上传的方法调用过程时序图: 其主要执行过程: FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(楼主上篇已经介绍过了) 调用FileSystem的create()方法,由于实现类为Dis