HDFS怎样检測并删除多余副本块

前言



在HDFS中,每时每刻都在进行着大量block块的创建和删除操作,这些庞大的block块构建起了这套复杂的分布式系统.普通block的读写删除操作一般人都或多或少了解过一些,可是过量的副本清理机制是否有人知道呢,就是overReplicatedBlock的处理,针对过量的副本块,HDFS怎么处理,何时处理,处理的策略机制怎样,本文就给大家分享HDFS在这方面的知识.

过量副本块以及发生的场景



过量副本块的意思通俗解释就是集群中有A副本3个,满足标准的3副本策略,可是此时发生了某种场景后,A副本块突然变为5个了,为了达到副本块的标准系数3个,系统就会进行多余2块副本的清除动作,而这个清除动作就是本文所要重点描写叙述的.过量副本块的现象是比較好解释的,那么问题来了,究竟有哪些潜在的原因或条件会触发多余副本块的发生呢(在此指的是HDFS中)?

本人通过对HDFS源代码的阅读,总结出一下3点

  • ReCommission节点又一次上线.这类操作是运维操作引起的.节点下线操作会导致大量此节点的block块在集群中大量拷贝,一旦此节点取消下线,之前已拷贝的大量块必定会成为多余的副本块.
  • 人为又一次设置block replication副本数.还是以A副本举例,A副本当前满足标准副本数3个,此时用户张三通过使用hdfs的API方法setReplication人为设置副本数为1.此时也会早A副本数多余2个的情况,即使说HDFS中的副本标准系数还是3个.
  • 新增加的block块记录在系统中被丢失.这种可能相对于前2种case的情况,是内部因素造成.这些新增加的丢失的block块记录会在BlockManager进行再次扫描检測,防止出现过量副本的现象.

OK,以上3种情形就是可能发生过量副本块的原因.至于这3种情况是怎样一步步的终于调用到处理多余副本块的过程在后面的描写叙述中会再给出,先来看下多余副本块是怎样被选出并处理掉的.

OverReplication多余副本块处理



多余副本块的处理分为2个子过程:

  • 多余副本块的选出
  • 选出的多余副本块的处理

我们从源代码中进行一一寻找原因,首先副本块的选出.

多余副本块的选择



进入blockManager的processOverReplicatedBlock方法,非常显然,方法名已经表明了方法操作的本意了.

/**
 * Find how many of the containing nodes are "extra",          if any.
 * If there are any extras, call chooseExcessReplicates() to
 * mark them in the excessReplicateMap.
 */
private void processOverReplicatedBlock(final Block block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {

此方法的凝视的意思是找出存在”多余”的节点,假设他们是多余的,调用chooseExcessReplicates并标记他们,增加增加到excessReplicateMap中.以下进行细节的处理

// 节点列表变量的声明
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
// 从corruptReplicas变量中获取是否存在坏的block所在的节点
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);

继续后面的处理

    // 遍历此过量副本块所在的节点列表
    for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
      ...
      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
          .getDatanodeUuid());
      // 假设在当前过量副本图对象excessReplicateMap中不存在
      if (excessBlocks == null || !excessBlocks.contains(block)) {
        //而且所在节点不是已下线或下线中的节点
        if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
          // 而且这个副本块不是损坏的副本块
          // exclude corrupt replicas
          if (corruptNodes == null || !corruptNodes.contains(cur)) {
            // 将此过滤副本块的一个所在节点增加候选节点列表中
            nonExcess.add(storage);
          }
        }
      }
    }

所以从这里看出nonExcess对象事实上是一个候选节点的概念,将block副本块所在的节点列表进行多种条件的再推断和剔除.最后就调用到选择终于过量副本块节点的方法

chooseExcessReplicates(nonExcess, block, replication,
        addedNode, delNodeHint, blockplacement);

进入chooseExcessReplicates方法

    // first form a rack to datanodes map and
    // 首先会形成机架对datanode节点的映射关系图
    BlockCollection bc = getBlockCollection(b);
    final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
    final List<StorageType> excessTypes = storagePolicy.chooseExcess(
        replication, DatanodeStorageInfo.toStorageTypes(nonExcess));

    // 初始化机架->节点列表映射图对象
    final Map<String, List<DatanodeStorageInfo>> rackMap
        = new HashMap<String, List<DatanodeStorageInfo>>();
    // 超过1个副本数的节点列表
    final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
    // 恰好1个副本数的节点列表
    final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();

为什么要划分不同节点列表的选择呢.由于在这里设计者做了优先选择,在相同拥有多余副本块的节点列表中,优先选择节点中副本数多于1个的,其次再是副本数恰好有1个的节点.这个设计非常好理解,由于你上面的多余副本数很多其它嘛,我当然要先从多的開始删.

    // 节点划分成相应2个集合
    // split nodes into two sets
    // moreThanOne contains nodes on rack with more than one replica
    // exactlyOne contains the remaining nodes
    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);

进入划分方法

  public void splitNodesWithRack(
      final Iterable<DatanodeStorageInfo> storages,
      final Map<String, List<DatanodeStorageInfo>> rackMap,
      final List<DatanodeStorageInfo> moreThanOne,
      final List<DatanodeStorageInfo> exactlyOne) {
    // 遍历候选节点列表,形成机架->节点列表的相应关系
    for(DatanodeStorageInfo s: storages) {
      final String rackName = getRack(s.getDatanodeDescriptor());
      List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
      if (storageList == null) {
        storageList = new ArrayList<DatanodeStorageInfo>();
        rackMap.put(rackName, storageList);
      }
      storageList.add(s);
    }

以下给出的划分算法

    // split nodes into two sets
    for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
      if (storageList.size() == 1) {
        // exactlyOne contains nodes on rack with only one replica
        // 假设机架中相应的节点数量仅仅有1个,则节点上副本数就为1,否则就为多个
        exactlyOne.add(storageList.get(0));
      } else {
        // moreThanOne contains nodes on rack with more than one replica
        moreThanOne.addAll(storageList);
      }
    }

上面划分的原理应该是与相应的block副本存放策略原理相关,这个我到没有细致去了解原因,读者能够自行阅读相关BlockPlacementPolicy相关代码进行了解.于是在这段代码过后,节点组就被分为了2大类,exactlyOne和moreThanOne.至此chooseExcessReplicates的上半段代码运行完毕,接下来看下半段代码的运行过程

    // 选择一个待删除的节点会偏向delNodeHintStorage的节点
    // pick one node to delete that favors the delete hint
    // 否在会从节点列表中选出一个可用空间最小
    // otherwise pick one with least space from priSet if it is not empty
    // otherwise one node with least space from remains
    boolean firstOne = true;
    final DatanodeStorageInfo delNodeHintStorage
        = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
    final DatanodeStorageInfo addedNodeStorage
        = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);

上面这3行凝视传达出2个意思

  • 能够直接传入要删除的节点,假设能够,则优选选择传入的delHint节点
  • 在每一个节点的内部列表中,优选会选择出可用空间最少的,这个也好理解,相同的副本数的节点列表中,当然要选择可用空间尽可能少的,以便释放出多的空间.
    // 假设眼下过量副本所在节点数大于标准副本数,则进行循环移除
    while (nonExcess.size() - replication > 0) {
      final DatanodeStorageInfo cur;
      // 推断能否够使用delNodeHintStorage节点进行取代
      if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
          moreThanOne, excessTypes)) {
        cur = delNodeHintStorage;
      } else { // regular excessive replica removal
        // 否则进行常规的节点选择
        cur = replicator.chooseReplicaToDelete(bc, b, replication,
            moreThanOne, exactlyOne, excessTypes);
      }

推断能否够使用delNodeHintStorage节点的推断逻辑这里就忽略了,主要看一下关键的chooseReplicaToDelete方法,这个分支处理才是最经经常使用到的处理方式.

    // 选择的节点要么是心跳时间最老的或者是可用空间最少的
    // Pick the node with the oldest heartbeat or with the least free space,
    // if all hearbeats are within the tolerable heartbeat interval
    for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
      if (!excessTypes.contains(storage.getStorageType())) {
        continue;
      }

first和second的节点选择逻辑例如以下,非常的简单

  /**
   * Pick up replica node set for deleting replica as over-replicated.
   * First set contains replica nodes on rack with more than one
   * replica while second set contains remaining replica nodes.
   * So pick up first set if not empty. If first is empty, then pick second.
   */
  protected Collection<DatanodeStorageInfo> pickupReplicaSet(
      Collection<DatanodeStorageInfo> first,
      Collection<DatanodeStorageInfo> second) {
    return first.isEmpty() ? second : first;
  }

在节点列表的每次迭代循环中会进行以下2个指标的对照

      // 进行心跳时间的对照
      if (lastHeartbeat < oldestHeartbeat) {
        oldestHeartbeat = lastHeartbeat;
        oldestHeartbeatStorage = storage;
      }
      // 进行可用空间的对照
      if (minSpace > free) {
        minSpace = free;
        minSpaceStorage = storage;
      }

然后进行选择,优先选择心跳时间最老的

    final DatanodeStorageInfo storage;
    if (oldestHeartbeatStorage != null) {
      storage = oldestHeartbeatStorage;
    } else if (minSpaceStorage != null) {
      storage = minSpaceStorage;
    } else {
      return null;
    }

然后进行以下2个操作

      // 又一次进行rackMap对象关系的调整
      // adjust rackmap, moreThanOne, and exactlyOne
      replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
          exactlyOne, cur);
      // 将选出的节点从候选节点列表中移除
      nonExcess.remove(cur);

能够说,到了这里,多余副本块所在节点就被选出了.

多余副本块的处理



多余副本块的处理就显得非常easy了,反正目标对象以及所在节点已经找到了,增加到相应的对象中就可以.

      // 增加到excessReplicateMap对象中
      addToExcessReplicate(cur.getDatanodeDescriptor(), b);

      //
      // The ‘excessblocks‘ tracks blocks until we get confirmation
      // that the datanode has deleted them; the only way we remove them
      // is when we get a "removeBlock" message.
      //
      // The ‘invalidate‘ list is used to inform the datanode the block
      // should be deleted.  Items are removed from the invalidate list
      // upon giving instructions to the namenode.
      //
      // 将此节点上的b block增加到无效节点中
      addToInvalidates(b, cur.getDatanodeDescriptor());

增加到invalidates无效block列表后不久,此block就将被清除.

多余副本块清除的场景调用



又一次回到之前提到过的多余副本块的3大场景调用.有人可能会好奇我是怎么找到这3种使用场景的,通过查看chooseExcessReplicates哪里被调用就能够了,例如以下图所看到的

针对上述的5种调用情况,于是我总结了3种使用场景.以下一一进行对照.

场景1: ReCommission又一次上线过程



在方法processOverReplicatedBlocksOnReCommission中调用了清除过量副本块的方法

  /**
   * Stop decommissioning the specified datanode.
   * @param node
   */
  @VisibleForTesting
  public void stopDecommission(DatanodeDescriptor node) {
    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      // Update DN stats maintained by HeartbeatManager
      hbManager.stopDecommission(node);
      // Over-replicated blocks will be detected and processed when
      // the dead node comes back and send in its full block report.
      if (node.isAlive()) {
        blockManager.processOverReplicatedBlocksOnReCommission(node);
      }
      // Remove from tracking in DecommissionManager
      pendingNodes.remove(node);
      decomNodeBlocks.remove(node);
    } else {
      LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
          node, node.getAdminState());
    }
  }

下线操作又一次恢复,必定要停止正在下线的动作,所以会在这种方法中进行调用.

场景2: SetReplication人为设置副本数



人为设置副本数是一个主动因素,调用的直接方法例如以下:

  /** Set replication for the blocks. */
  public void setReplication(final short oldRepl, final short newRepl,
      final String src, final Block... blocks) {
    if (newRepl == oldRepl) {
      return;
    }

    // update needReplication priority queues
    for(Block b : blocks) {
      updateNeededReplications(b, 0, newRepl-oldRepl);
    }
    // 当设置的新的副本数值比原有的小的时候,须要进行过量副本的清除操作
    if (oldRepl > newRepl) {
      // old replication > the new one; need to remove copies
      LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
          + " for " + src);
      for(Block b : blocks) {
        processOverReplicatedBlock(b, newRepl, null, null);
      }
    } else { // replication factor is increased
      LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
          + " for " + src);
    }
  }

这个API方法是能够被外面的Client端程序调用触发的.

场景3: 丢失新增加的block记录信息



丢失新增加的block信息导致集群中存在多余的副本.官方的解释是这种:

  /*
   * Since the BlocksMapGset does not throw the ConcurrentModificationException
   * and supports further iteration after modification to list, there is a
   * chance of missing the newly added block while iterating. Since every
   * addition to blocksMap will check for mis-replication, missing mis-replication
   * check for new blocks will not be a problem.
   */

由于存在丢失block信息的可能性,所以会开单独的线程去又一次检測是否存在过量副本的现象.

  private void processMisReplicatesAsync() throws InterruptedException {
    ...
    while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
      int processed = 0;
      namesystem.writeLockInterruptibly();
      try {
        while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
          BlockInfoContiguous block = blocksItr.next();
          MisReplicationResult res = processMisReplicatedBlock(block);
          ...

场景4: 其它场景的检測



其它场景有的时候也会调用到processOverReplicatedBlock的方法,但不是外界的因素导致,而是出于一种慎重性的考虑,比方在addStoredBlock,当新增加的块被增加到blockMap中时,会再次进行块的检測.还有1种是在文件终于写入完毕的时候,也会调用一次checkReplication此方法,来确认集群中没有多余的相同块的情况.这2种情况的调用如上图所看到的,这里就不放出详细的代码了,可见,HDFS的设计者在细节方面的处理真的是非常用心啊.

时间: 2024-08-08 09:59:00

HDFS怎样检測并删除多余副本块的相关文章

模式识别 - 有害视频检測程序的策略

有害视频检測程序的策略 本文地址: http://blog.csdn.net/caroline_wendy/article/details/26346831 有害(色情\恐怖\暴力)视频, 严重危害网络的健康, 须要进行检測和过滤. 检測色情\恐怖视频, 通过检測程序, 检測出多个场景的概率, 然后进行排序, 当场景多余6个时, 仅仅取最大的6个场景; 返回的概率值是前3个最大检測值场景的概率的均值; 色情\恐怖汇总时, 首先检測色情, 假设为色情视频, 则不进行恐怖的检測, 否则继续检測恐怖,

C++内存泄露检測原理

转自:http://hi.baidu.com/jasonlyy/item/9ca0cecf2c8f113a99b4981c 本文针对 linux 下的 C++ 程序的内存泄漏的检測方法及事实上现进行探讨.当中包含 C++ 中的 new 和 delete 的基本原理,内 存检測子系统的实现原理和详细方法,以及内存泄漏检測的高级话题.作为内存检測子系统实现的一部分,提供了一个具有更好的使用特性的相互排斥体 (Mutex)类. 1.开发背景 在 windows 下使用 VC 编程时,我们通常须要 DE

怎样才能彻底地删除多余输入法软件

怎样才能彻彻底底地删除输入法软件 [删除多余系统注册表] 开始/运行,在对话框中输入"Regedit"(不含引号)调出注册表进行如下逐项操作: HKEY_LOCAL_MACHINE\System\CurrentControlSet\Control\Keyboard Layouts\ 保留. 00000409 (默认) - 美国英语 (禁止删除) 00000804 中文 (简体) - 美式键盘 (禁止删除) E0010804 中文 (简体) - 全拼 E0030804 中文 (简体) -

LeakCanary:简单粗暴的内存泄漏检測工具

差点儿每一个程序猿在开发的过程中都会遇到内存泄漏.那么我们怎样检測到app是否哪里出现内存泄漏呢?square公司推出了一款简单粗暴的检測内存泄漏的工具-- LeakCanary 什么是内存泄漏? 内存泄漏是指因为疏忽或者错误造成程序未能释放已经不再使用的内存,内存泄漏不是指内存在物理上的消失,而是应用程序分配某段内存后,因为设计错误失去了对于这段内存的控制.因而造成内存的浪费. 内存泄漏和内存溢出是两码事,不要混淆,内存溢出通俗的讲就是内存不够用,如今的仅仅能手机内存越来越大,内存溢出的情况不

ASP怎样检測某目录是否存在,不存在则自己主动创建

ASP怎样检測某目录是否存在,不存在则自己主动创建 folder=server.mappath("/imagess") Set fso = CreateObject("Scripting.FileSystemObject") if fso.fileexists(Server.mappath(filepath)) then respnse.write("都有了还建什么建") else fso.createfolder(folder) end if

图像边缘检測小结

边缘是图像中灰度发生急剧变化的区域边界. 图像灰度的变化情况能够用图像灰度分布的梯度来表示,数字图像中求导是利用差分近似微分来进行的,实际上经常使用空域微分算子通过卷积来完毕. 一阶导数算子 1)  Roberts算子 Roberts算子是一种斜向偏差分的梯度计算方法.梯度的大小代表边缘的强度.梯度的方向与边缘的走向垂直.Roberts操作实际上是求旋转 \pm" title="\pm" >45度两个方向上微分值的和. Roberts算子定位精度高,在水平和垂直方向的效

opencv实现gamma灰阶检測

简单介绍 本篇解说使用opencv来測试,表示camera gamma參数的灰阶卡图片指标:YA Block.DynamicRange.Gray Scale. 详细实现 实现代码 #include <math.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <opencv2/core/core.hpp> #include <opencv2/high

网络接口的检測

假设没有下面命令,能够先去加入一个http://repoforge.org/use/ 的yum源. ifstat命令 ifstat能够监控网络接口.比較简单地查看网络流量 ifstat默认是不监控回环接口的流量的流量的单位是KB/s 使用ifstat -a能够监控全部的接口 -l    监測环路网络接口(lo). 缺省情况下.ifstat监測活动的全部非环路网络接口.经使用发现,加上-l參数能监測全部的网络接口的信息.       而不是仅仅监測 lo的接口信息,也就是说,加上-l參数比不加-l

图像处理之霍夫变换(直线检測算法)

图像处理之霍夫变换(直线检測算法) 霍夫变换是图像变换中的经典手段之中的一个,主要用来从图像中分离出具有某种同样特征的几何 形状(如,直线,圆等).霍夫变换寻找直线与圆的方法相比与其他方法能够更好的降低噪 声干扰.经典的霍夫变换经常使用来检測直线,圆,椭圆等. 霍夫变换算法思想: 以直线检測为例,每一个像素坐标点经过变换都变成都直线特质有贡献的统一度量,一个简单 的样例例如以下:一条直线在图像中是一系列离散点的集合,通过一个直线的离散极坐标公式, 能够表达出直线的离散点几何等式例如以下: X *