Hadoop 副本放置策略的源码阅读和设置

本文通过MetaWeblog自动发布,原文及更新链接:https://extendswind.top/posts/technical/hadoop_block_placement_policy

大多数的叫法都是副本放置策略,实质上是HDFS对所有数据的位置放置策略,并非只是针对数据的副本。因此Hadoop的源码里有block replicator(configuration)、 BlockPlacementPolicy(具体逻辑源码)两种叫法。

主要用途:上传文件时决定文件在HDFS上存储的位置(具体到datanode上的具体存储介质,如具体到存储在哪块硬盘);rebalance、datanode退出集群、副本数量更改等导致数据移动的操作中,数据移动的具体位置。

BlockPlacementPolicy

BlockPlacementPolicy 作为虚基类提供了基本的接口,具体的子类重点实现下面 选择副本验证副本放置是否满足要求选择能够删除的副本 三个函数:

 /**
   * 核心的副本放置策略实现,返回副本放置数量的存储位置
   * **如果有效节点数量不够(少于副本数),返回尽可能多的节点,而非失败**
   *
   * @param srcPath 上传文件的路径
   * @param numOfReplicas 除下面chosen参数里已经选择的datanode,还需要的副本数量
   * @param writer 写数据的机器, null if not in the cluster. 一般用于放置第一个副本以降低网络通信
   * @param chosen 已经选择的节点
   * @param returnChosenNodes 返回结果里是否包含chosen的datanode
   * @param excludedNodes 不选的节点
   * @param blocksize 块大小
   * @return 排序好的选择结果
   */
  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
                                             int numOfReplicas,
                                             Node writer,
                                             List<DatanodeStorageInfo> chosen,
                                             boolean returnChosenNodes,
                                             Set<Node> excludedNodes,
                                             long blocksize,
                                             BlockStoragePolicy storagePolicy);

  /**
   * 判断传入的放置方式是否符合要求
   */
  abstract public BlockPlacementStatus verifyBlockPlacement(
      DatanodeInfo[] locs, int numOfReplicas);

    /**
   * 当副本数量较多时,选择需要删除的节点
   */
  abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
      Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,
      List<StorageType> excessTypes, DatanodeDescriptor addedNode,
      DatanodeDescriptor delNodeHint);

Hadoop 提供的 BlockPlacementPolicy 实现

Hadoop提供了BlockPlacementPolicyDefault、BlockPlacementPolicyWithNodeGroup、AvailableSpaceBlockPlacementPolicy三种实现(hadoop 2.7.7)。

其中BlockPlacementPolicyDefault是经典三副本策略的实现:第一个副本尽可能放在写入数据的节点,第二个副本放在与第一个副本不在同一机架下的节点,第三个副本与第二副本放在同一个机架。

通过改变dfs.block.replicator.classname 能够选择具体的实现类,默认值为org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault。(Hadoop 2.7.7下,貌似不同版本的Hadoop的命名还不一样,而且2.7.7默认的配置文件里还没有,需要在源码中查)

BlockPlacementPolicyDefault 源码阅读

  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
                                             int numOfReplicas,
                                             Node writer,
                                             List<DatanodeStorageInfo> chosen,
                                             boolean returnChosenNodes,
                                             Set<Node> excludedNodes,
                                             long blocksize,
                                             BlockStoragePolicy storagePolicy);

chooseTarget函数实现了具体的三副本策略。各种特殊情况(如只有1个副本、datanode数量不够、集群拓扑不满足要求等)的考虑让代码看起来比较复杂,常规情况直接跟着调试代码走会跳过很多异常处理部分,便于裂解正常流程。

在副本的选择上用了各种带chooseTarget函数,注意有几个函数结果是通过参数传出而不是返回值。

主要实现思路:

  1. 各种变量初始化
  2. 考虑favoredNodes的放置
  3. 除满足条件的favoredNodes后的副本放置策略(三副本)
  4. 结果排序

首先

srcPath没有被考虑,被直接舍弃:

return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
        excludedNodes, blocksize, storagePolicy, flags); // ignore srcPath

因此默认的副本放置策略,在同一文件包含多个block时,每个block的存储位置独立考虑,并非存储在同一datanode

处理favoredNodes

上传文件时可以指定favoredNodes(默认为空),首先对favoredNodes所在的节点判断是否合适。如果满足条件的节点数还低于副本数,则添加新的副本。

 // --------------Choose favored nodes ---------------
 // 从favored nodes中选择,在上传文件时可以指定
 List<DatanodeStorageInfo> results = new ArrayList<>();
 boolean avoidStaleNodes = stats != null
     && stats.isAvoidingStaleDataNodesForWrite();

 int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);
 numOfReplicas = maxNodesAndReplicas[0];
 int maxNodesPerRack = maxNodesAndReplicas[1];

 chooseFavouredNodes(src, numOfReplicas, favoredNodes,
     favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,
     avoidStaleNodes, storageTypes);

 // ---------------如果满足要求的favored nodes数量不足-----------
 if (results.size() < numOfReplicas) {
   // Not enough favored nodes, choose other nodes, based on block
   // placement policy (HDFS-9393).
   numOfReplicas -= results.size();
   for (DatanodeStorageInfo storage : results) {
     // add localMachine and related nodes to favoriteAndExcludedNodes
     addToExcludedNodes(storage.getDatanodeDescriptor(),
         favoriteAndExcludedNodes);
   }
   DatanodeStorageInfo[] remainingTargets =
       chooseTarget(src, numOfReplicas, writer,
           new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
           favoriteAndExcludedNodes, blocksize, storagePolicy, flags);
   for (int i = 0; i < remainingTargets.length; i++) {
     results.add(remainingTargets[i]);
   }
 }

三副本选择

实现逻辑在 chooseTargetInOrder(…) 函数中

// 第一个副本的选择
if (numOfResults == 0) {
  writer = chooseLocalStorage(writer, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
      .getDatanodeDescriptor();
  if (--numOfReplicas == 0) {
    return writer;
  }
}

// 选择与第一个副本不在同一Rack下的第二个副本
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
  chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
      results, avoidStaleNodes, storageTypes);
  if (--numOfReplicas == 0) {
    return writer;
  }
}

// 第三个副本
if (numOfResults <= 2) {
  final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
  // 第一、二副本在同一Rack下时选第三个副本
  // (前面的favoredNodes以及集群条件可能造成这种情况)
  if (clusterMap.isOnSameRack(dn0, dn1)) {
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  } else if (newBlock){ // 正常情况,第二副本的localRack下选第三副本
    chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  } else {  // 其它的以外
    chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  }
  if (--numOfReplicas == 0) {
    return writer;
  }
}

// 如果副本数量还没到0,剩下的副本随机选择
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
    maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;

再到具体的选择

选择具体的存储位置被上面包装到了 chooseRemoteRack 和 chooseLocalRack 两个函数。

实际调用时只是 chooseRandom 函数,在限定的rack下选择一个随机的节点。

源码阅读的几个注意

代码在直接阅读时各种跳,但主线思路比较明确。主要带来阅读困难的位置:

  1. 很多函数调用不是通过返回值传出结果,而是通过参数。
  2. 注意某些if后的return会直接返回结果,后面的代码不会被调用。
  3. 递归的形式多次调用同一个函数以选择多个副本。
  4. 很多代码为了避免一些特殊情况,可以暂时略过(如catch里的异常处理)。

修改HDFS默认的副本放置机制

可以选择直接复制或继承BlockPlacementPolicyDefault的实现,或者直接继承BlockPlacementPolicy类编写对应的接口具体实现。

将编译好的jar包放入$HADOOP_PREFIX/share/hadoop/common下(或者其它的Hadoop jar包路径)。

改变dfs.block.replicator.classname 为上面的实现类,要带包的名称。

RackAwareness 机架感知

Hadoop 并不能自动检测集群的机架状态,而是要预先设置机架的状态,通过脚本或java类将datanode的ip转换成具体的机架上的位置。

官方文档介绍了基本思路,虽然实现上介绍得不是太清楚,只要将输入的ip转换成”/rackNum”的形式即可。

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/RackAwareness.html

原文地址:https://www.cnblogs.com/fly2wind/p/10244124.html

时间: 2024-10-08 05:03:49

Hadoop 副本放置策略的源码阅读和设置的相关文章

Hadoop源码阅读环境搭建

Hadoop源码阅读环境搭建 一.说明 作为一个学习hadoop的同学,必须在本机上搭建hadoop源码阅读环境,这样,在方便阅读源码的同时也方便进行调试和源码修改.好了,下面开始搭建环境. 1.环境说明:hadoop 版本:1.2.1. IDE:eclipse.操作系统:centos 2.网上有人是通过eclipse的新建项目指定目录的方式将hadoop目录转换成Eclipse工程同时导入eclipse,具体做法如下: File-->new-->Java Project-->勾掉Use

Mac搭建Hadoop源码阅读环境

1.本次Hadoop源码阅读环境使用的阅读工具是idea,Hadoop版本是2.7.3.需要安装的工具包括idea.jdk.maven.protobuf等 2.jdk,使用的版本是1.8版,在jdk官网下载jdk-8u111-macosx-x64.dmg,点击安装,一路next. 3.idea安装,略 4.maven,使用的版本是3.3.9,下载apache-maven-3.3.9-bin.tar,解压: tar -zxvf  apache-maven-3.3.9-bin.tar 进入 Mave

HDFS副本放置策略

前言 前一篇文章中刚刚分析完HDFS的异构存储以及相关的存储类型选择策略,浏览量还是不少的,说明大家对于HDFS的异构存储方面的功能还是很感兴趣的.但是其实一个文件Block块从最初的产生到最后的落盘,存储类型选择策略只是其中1步,因为存储类型选择策略只是帮你先筛选了一些符合存储类型要求的存储节点目录位置列表,通过这些候选列表,你还需要做进一步的筛选,这就是本文所准备阐述的另外一个主题,HDFS的副本放置策略.在写本文之前,我搜过网上关于此方面的资料与文章,还是有许多文章写的非常不错的,所以我会

JDK部分源码阅读与理解

本文为博主原创,允许转载,但请声明原文地址:http://www.coselding.cn/article/2016/05/31/JDK部分源码阅读与理解/ 不喜欢重复造轮子,不喜欢贴各种东西.JDK代码什么的,让整篇文章很乱...JDK源码谁都有,没什么好贴的...如果你没看过JDK源码,建议打开Eclipse边看源码边看这篇文章,看过的可以把这篇文章当成是知识点备忘录... JDK容器类中有大量的空指针.数组越界.状态异常等异常处理,这些不是重点,我们关注的应该是它的一些底层的具体实现,这篇

如何阅读Java源码 阅读java的真实体会

刚才在论坛不经意间,看到有关源码阅读的帖子.回想自己前几年,阅读源码那种兴奋和成就感(1),不禁又有一种激动. 源码阅读,我觉得最核心有三点:技术基础+强烈的求知欲+耐心. 说到技术基础,我打个比方吧,如果你从来没有学过Java,或是任何一门编程语言如C++,一开始去啃<Core Java>,你是很难从中吸收到营养的,特别是<深入Java虚拟机>这类书,别人觉得好,未必适合现在的你. 虽然Tomcat的源码很漂亮,但我绝不建议你一开始就读它.我文中会专门谈到这个,暂时不展开. 强烈

014_HDFS存储架构、架构可靠性分析、副本放置策略、各组件之间的关系

1.HDFS存储架构 (1)HDFS 架构 —— 文件 1)文件切分成块(默认大小64M),以块为单位,每个块有多个副本存储在不同的机器上,副本数可在文件生成时指定(默认3)2)NameNode 是主节点,存储文件的元数据如文件名,文件目录结构,文件属性(生成时间,副本数,文件权限),以及每个文件的块列表以及块所在的DataNode等等3)DataNode 在本地文件系统存储文件块数据,以及块数据的校验和.4)可以创建.删除.移动或重命名文件,当文件创建.写入和关闭之后不能修改文件内容. (2)

HDFS副本放置策略及机架感知

副本放置策略 副本放置策略的基本思想是: 第一个block副本放在和client所在的node里(如果client不在集群范围内,则这第一个node是随机选取的,当然系统会尝试不选择哪些太满或者太忙的node). 第二个副本放置在与第一个节点不同的机架中的node中(随机选择). 第三个副本和第二个在同一个机架,随机放在不同的node中. 如果还有更多的副本就随机放在集群的node里. Hadoop的副本放置策略在可靠性(block在不同的机架)和带宽(一个管道只需要穿越一个网络节点)中做了一个

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

【原】AFNetworking源码阅读(六)

[原]AFNetworking源码阅读(六) 本文转载请注明出处 —— polobymulberry-博客园 1. 前言 这一篇的想讲的,一个就是分析一下AFSecurityPolicy文件,看看AFNetworking的网络安全策略,尤其指HTTPS(大家可以先简单了解下HTTPS).再一个就是分析下AFNetworkReachabilityManager文件,看看AFNetworking如何解决网络状态的检测. 2. AFSecurityPolicy - 网络安全策略 之前我们在AFURLS