DataNode与NameNode交互机制相关代码分析

HDFS Federation是为解决HDFS单点故障而提出的NameNode水平扩展方案,该方案允许HDFS创建多个Namespace以提高集群的扩展性和隔离性。在Federation中新增了block-pool的概念,block-pool就是属于单个Namespace的一组block,每个DataNode为所有的block-pool存储block,可以理解block-pool是一个重新将block划分的逻辑概念,同一个DataNode中可以存储属于多个block-pool的多个block。所以在NameNode和DataNode通信相关的代码方面,也做了很大的改动以支持上述特性。

在cdh3x中,DataNode与NameNode交互主要集中在DataNode这个类中,类结构比较简单,随着Federation概念的引入,新增了一些比较重要的类来管理逻辑层面划分的block-pool和block-pool下的block分布,并以block-pool为单位来与NameNode进行相关的通信。类图如下

    BPServiceActor类实现Runnable接口,以线程的方式运行,一个BPServiceActor实例可以和一个active或standby模式的NameNode实例进行交互,它是真正的任务执行者。主要有四大职能

1.预先与NameNode进行握手

2.向NameNode注册

3.周期性的向NameNode发送心跳

4.处理NameNode发送回的命令

一个BPOfferService实例代表在某个DataNode上的某个block-pool(一个block-pool对应一个独立的Namespace),对block-pool对应的active和standby状态的NameNode进行交互的操作。BPOfferService管理和每个NameNode进行实际通信的BPServiceActor实例,并作为代理与处于active状态和standby状态的两个NameNode进行交互,同时标识与active状态NameNode通信的BPServiceActor实例。相关代码如下

class BPOfferService {
  static final Log LOG = DataNode.LOG;

  //本block-pool服务代表的Namespace信息,和NameNode握手的第一阶段分配所得
  NamespaceInfo bpNSInfo;

    
  //block-pool所在DataNode相关的注册信息,和NameNode握手的第二阶段分配所得
  volatile DatanodeRegistration bpRegistration;
  
  //所属datanode实例
  private final DataNode dn;
    
  //代表和当前active状态的NameNode关联的BPServiceActor实例
  //如果所有NameNode处于standby状态,此属性可以为空
  //如果此属性非空,则必指向bpServices集合中的某个实例
  private BPServiceActor bpServiceToActive = null;
  
  //在本nameservice服务下指向所有NameNode的BPServiceActor实例
  //不论代表的NameNode是active状态还是standby状态
  private List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();
    
  //构造方法中根据NameNode的地址来初始化BPServiceActor,并加入到bpServices集合中
  BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
    Preconditions.checkArgument(!nnAddrs.isEmpty(),
        "Must pass at least one NN.");
    this.dn = dn;
    
    for (InetSocketAddress addr : nnAddrs) {
      this.bpServices.add(new BPServiceActor(addr, this));
    }
  }
}

BlockPoolManager类主要用于管理DataNode上的BPOfferService,对BPOfferService对象的创建,删除,启动,停止,关闭的操作都需要通过BlockPoolManager提供的方法来控制。代码如下

class BlockPoolManager {
  private static final Log LOG = DataNode.LOG;
  
  //nameserviceId和BPOfferService的映射集合
  private final Map<String, BPOfferService> bpByNameserviceId = Maps.newHashMap();
  //blockPoolId和BPOfferService的映射集合
  private final Map<String, BPOfferService> bpByBlockPoolId = Maps.newHashMap();
  //BPOfferService集合
  private final List<BPOfferService> offerServices = Lists.newArrayList();
  //当前所属的datanode实例
  private final DataNode dn;

  //更新NameNode列表时的lock
  private final Object refreshNamenodesLock = new Object();
  
  BlockPoolManager(DataNode dn) {
    this.dn = dn;
  }
}

BlockPoolSliceScanner类用于扫描block-pool下的block文件并校验文件是否损坏,它对block和最后的校验时间进行跟踪,目前不提供修改block元数据的操作。一个DataNode对应一个DataBlockScanner,DataBlockScanner对不同block-pool的BlockPoolSliceScanner进行管理。

BlockPoolSliceStorage用于管理DataNode上对应同一个block pool的BlockPoolSlices集合,由于一个DataNode上可能会挂载多个存储设备,即逻辑上对应多个volume,一个BlockPoolSlice对应一个volume,所以对同一个DataNode上的同一个block pool,可以管理多个BlockPoolSlice。BlockPoolSliceStorage的主要职能如下

1.对新生成的block-pool对应的存储进行格式化

2.恢复存储状态以保持一致性

3.在升级的时候对block-pool进行快照处理

4.回滚block-pool到上一个快照

5.删除快照并提交block

在cdh3x中,DataNode启动过程中与NameNode交互的操作,都是在DataNode类中进行的,包括握手,注册,数据块上报和发送心跳等。代码调用关系如下图所示

握手

注册

数据块上报

发送心跳

在cdh5.1中,这些操作最终都交给了BPServiceActor来处理,下面来详细分析下具体的代码和相互间的调用关系。

BlockPoolManager在startDataNode方法中被实例化,startDataNode调用关系如下

DataNode.startDataNode(Configuration conf, 
                     List<StorageLocation> dataDirs,
                     SecureResources resources
                     ) throws IOException {
    blockPoolManager = new BlockPoolManager(this);
    //刷新加载NameNodes
    blockPoolManager.refreshNamenodes(conf);
}

BlockPoolManager.refreshNamenodes(Configuration conf)
      throws IOException {
    LOG.info("Refresh request received for nameservices: "
        + conf.get(DFSConfigKeys.DFS_NAMESERVICES));
    
    //地址映射列表,Map<nameserviceId,<namenodeId,nnAddress>>
    Map<String, Map<String, InetSocketAddress>> newAddressMap = 
      DFSUtil.getNNServiceRpcAddresses(conf);
    
    synchronized (refreshNamenodesLock) {
      doRefreshNamenodes(newAddressMap);
    }
}

BlockPoolManager.doRefreshNamenodes(Map<String, Map<String, InetSocketAddress>> addrMap){
    Set<String> toRefresh = Sets.newLinkedHashSet();
    Set<String> toAdd = Sets.newLinkedHashSet();
    Set<String> toRemove;
    
    synchronized (this) {
      for (String nameserviceId : addrMap.keySet()) {
        if (bpByNameserviceId.containsKey(nameserviceId)) {
          //已经存在,可能有更新的nameserviceId
          toRefresh.add(nameserviceId);
        } else {
          //加入新的nameserviceId
          toAdd.add(nameserviceId);
        }
      }
      
      //找出bpByNameserviceId存在的,但不存在于addrMap的nameserviceId
      //等待删除
      toRemove = Sets.newHashSet(Sets.difference(
          bpByNameserviceId.keySet(), addrMap.keySet()));
      
      //启动新的nameservice
      if (!toAdd.isEmpty()) {
        for (String nsToAdd : toAdd) {
          ArrayList<InetSocketAddress> addrs =
            Lists.newArrayList(addrMap.get(nsToAdd).values());
          //根据NameNode地址集合创建新的BPOfferService实例
          BPOfferService bpos = createBPOS(addrs);
          //建立nameserviceId到BPOfferService的映射
          bpByNameserviceId.put(nsToAdd, bpos);
          //加入到offerServices集合
          offerServices.add(bpos);
        }
      }
      //启动BPOfferService服务
      startAll();
    }
    
    //删除toRemove中的nameserviceId的映射关系,并停止相关服务
    if (!toRemove.isEmpty()) {
      for (String nsToRemove : toRemove) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRemove);
        bpos.stop();
        bpos.join();
      }
    }
    
    //刷新变化的nameserviceId
    if (!toRefresh.isEmpty()) {
      for (String nsToRefresh : toRefresh) {
        BPOfferService bpos = bpByNameserviceId.get(nsToRefresh);
        ArrayList<InetSocketAddress> addrs =
          Lists.newArrayList(addrMap.get(nsToRefresh).values());
        bpos.refreshNNList(addrs);
      }
    }
}

BlockPoolManager.startAll() throws IOException {
    try {
      UserGroupInformation.getLoginUser().doAs(
          new PrivilegedExceptionAction<Object>() {
            @Override
            public Object run() throws Exception {
              for (BPOfferService bpos : offerServices) {
                //启动BPOfferService服务
                bpos.start();
              }
              return null;
            }
          });
    } catch (InterruptedException ex) {
      IOException ioe = new IOException();
      ioe.initCause(ex.getCause());
      throw ioe;
    }
}

BPOfferService.start() {
    for (BPServiceActor actor : bpServices) {
      //启动BPServiceActor服务
      actor.start();
    }
}

经过层层调用之后,真正和NameNode进行通信的BPServiceActor服务被启动,启动后的BPServiceActor开始和它对应状态的NameNode进行握手注册等一系列操作,BPServiceActor服务对应的NameNode可能是active或standby状态。详细代码如下

BPServiceActor.run() {
    LOG.info(this + " starting to offer service");

    try {
      while (true) {
        try {
          //连接到NameNode并进行握手
          connectToNNAndHandshake();
          break;
        } catch (IOException ioe) {
          runningState = RunningState.INIT_FAILED;
          if (shouldRetryInit()) {
            LOG.error("Initialization failed for " + this + " "
                + ioe.getLocalizedMessage());
            sleepAndLogInterrupts(5000, "initializing");
          } else {
            runningState = RunningState.FAILED;
            LOG.fatal("Initialization failed for " + this + ". Exiting. ", ioe);
            return;
          }
        }
      }

      runningState = RunningState.RUNNING;

      while (shouldRun()) {
        try {
          //循环调用offerService()
          //在本方法中,周期性的向NameNode发送心跳并执行NameNode返回的相关命令
          offerService();
        } catch (Exception ex) {
          LOG.error("Exception in BPOfferService for " + this, ex);
          sleepAndLogInterrupts(5000, "offering service");
        }
      }
      runningState = RunningState.EXITED;
    } catch (Throwable ex) {
      LOG.warn("Unexpected exception in block pool " + this, ex);
      runningState = RunningState.FAILED;
    } finally {
      LOG.warn("Ending block pool service for: " + this);
      cleanUp();
    }
  }
  
  
 BPServiceActor.connectToNNAndHandshake() throws IOException {
    //连接到NameNode并获得NameNode代理对象
    bpNamenode = dn.connectToNN(nnAddr);

    //第一阶段获取NamespaceInfo
    NamespaceInfo nsInfo = retrieveNamespaceInfo();
    
    //校验namespaceInfo是否和HA中的其他NameNode信息一致
    //并建立blockPoolManager和BPOfferService的对应关系
    bpos.verifyAndSetNamespaceInfo(nsInfo);
    
    //第二阶段向NameNode注册
    register();
  }

上述主要分析了加入Federation特性和HA特性后,DataNode和NameNode在代码层面交互方式的改变,相比之前的代码,逻辑更加清晰并且类之间的耦合度更低。

时间: 2024-10-25 04:30:03

DataNode与NameNode交互机制相关代码分析的相关文章

Kafka Producer相关代码分析

Kafka Producer相关代码分析 标签(空格分隔): kafka Kafka Producer将用户的消息发送到Kafka集群(准确讲是发送到Broker).本文将分析Producer相关的代码实现. 类kafka.producer.Producer 如果你自己实现Kafka客户端来发送消息的话,你就是用到这个类提供的接口来发送消息.(如果你对如何利用Producer API来发送消息还不是很熟悉的话,可以参看官方的例子).这个类提供了同步和异步两种方式来发送消息. 异步发送消息是基于同

Linux -- 内存控制之oom killer机制及代码分析

近期,线上一些内存占用比較敏感的应用.在訪问峰值的时候,偶尔会被kill掉,导致服务重新启动.发现是Linux的out-of-memory kiiler的机制触发的. http://linux-mm.org/OOM_Killer oom kiiler会在内存紧张的时候,会依次kill内存占用较高的进程,发送Signal 15(SIGTERM).并在/var/log/message中进行记录.里面会记录一些如pid,process name.cpu mask,trace等信息,通过监控能够发现类似

线程相关代码分析-&gt;常见面试题(一、Thead类)

As always,我们直接看jdk的代码切入: 首先是最简单的Runnable接口: public interface Runnable { public abstract void run(); } 我们可以看到Runnable其实特别简单,就是接口,里面只有一个方法(其实public abstract根本没必要,不过是不是老版本jdk需要添加倒是还不清楚). 我们主要需要分析的对象是Thread类: publicclass Thread implements Runnable { ---t

fshc之请求仲裁机制的代码分析

1 always@(posedge spi_clk or negedge spiclk_rst_n) 2 begin 3 if(~spiclk_rst_n) 4 arbiter2cache_ack_r <=1'b0; 5 else if(cache_req_sclk && flash_idle && ~atom_op_en_sync2 && ~arbiter2mcu_ack_r) 6 arbiter2cache_ack_r <=1'b1; 7 e

AngularJS PhoneCat代码分析

原文:http://blog.javachen.com/2015/01/09/angular-phonecat-examples/ AngularJS 官方网站提供了一个用于学习的示例项目:PhoneCat.这是一个Web应用,用户可以浏览一些Android手机,了解它们的详细信息,并进行搜索和排序操作. 本文主要分析 AngularJS 官方网站提供的一个用于学习的示例项目 PhoneCat 的构建.测试过程以及代码的运行原理.希望能够对 PhoneCat 项目有一个更加深入全面的认识.这其中

小记--------spark资源调度机制源码分析-----Schedule

Master类位置所在:spark-core_2.11-2.1.0.jar的org.apache.spark.deploy.master下的Master类 /** * driver调度机制原理代码分析Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability change

驱动相关的内核代码分析

arch\arm\include\asm\Io.h #define __raw_readl(a) (__chk_io_ptr(a), *(volatile unsigned int __force   *)(a)) #define __raw_writel(v,a) (__chk_io_ptr(a), *(volatile unsigned int __force   *)(a) = (v)) 注:(volatile unsigned int __force   *)指针强制转换为unsigne

wifi display代码 分析

转自:http://blog.csdn.net/lilian0118/article/details/23168531 这一章中我们来看Wifi Display连接过程的建立,包含P2P的部分和RTSP的部分,首先来大致看一下Wifi Display规范相关的东西. HIDC: Human Interface Device Class  (遵循HID标准的设备类)UIBC: User Input Back Channel  (UIBC分为两种,一种是Generic,包含鼠标.键盘等:另一种是HI

linux kernel的中断子系统之(七):GIC代码分析

一.前言 GIC(Generic Interrupt Controller)是ARM公司提供的一个通用的中断控制器,其architecture specification目前有四个版本,V1-V4(V2最多支持8个ARM core,V3/V4支持更多的ARM core,主要用于ARM64服务器系统结构).目前在ARM官方网站只能下载到Version 2的GIC architecture specification,因此,本文主要描述符合V2规范的GIC硬件及其驱动. 具体GIC硬件的实现形态有两