HDFS Block Replica Placement实现原理

1. 背景

Block Replica Placement——数据块复本存储策略,HDFS Namenode以此为依据选取数据块复本应存储至哪些HDFS Datanodes,策略的设计需要权衡以下三个因素:

  • 可靠性
  • 写带宽
  • 读带宽

注:本文均以数据块复本因子为3来讨论。

我们以两个比较极端的例子来说明上述三个因素之间的关系。

(1)数据块的三个复本集中存储至一台HDFS Datanode;

如果Client(数据写入客户端)与Datanode不是同一台机器,如下图:

数据块的第一个复本需要Client通过网络传输将数据写入Datanode,其余两个复本为本地写入,写带宽的开销为数据块的大小;

如果Client与Datanode是同一台机器,如下图:

数据块的三个复本均为本地写入,没有任何的网络数据传输,写带宽的开销为零。

这种策略的写带宽或为数据块大小,或为零,可以说是所有策略中写带宽开销最小的。但缺陷也比较明显,数据块的三个复本全部存储至一台Datanode,其它Datanodes中没有任何的冗余复本,如果这台Datanode出现故障,整个数据块的数据会全部丢失;数据的集中存储,也不利于“本地性计算”,如果这个数据块涉及的计算任务无法调度至这台Datanode实例所处的机器上运行,则数据需要远程读取,即需要跨机架(交换机)读取,读带宽开销较大。

(2)数据块的全部复本分散存储至不同的数据中心(Data Center);

数据块的三个复本分别被存储至不同的三个数据中心的Datanode,数据可靠性最高,但数据的写入和读取大多需要通过互联网,写带宽和读带宽的开销较大。

可以看出,数据块复本存储策略的设计需要综合考虑可靠性、写带宽、读带宽三者之间的相互影响。

为什么说跨机架(交换机)或跨数据中心的带宽开销较大?

跨机架的服务器之间的通信需要通过交换机,跨数据中心的服务器之间的通信需要通过专线连接,对于公司而言,交换机和专线的带宽资源都是有限的,跨机架或数据中心的服务器之间大量的数据传输通常都会带来比较高昂的带宽成本。

2. Hadoop默认数据块复本存储策略

Hadoop默认数据块复本存储策略是以“一个数据中心、多个机架”为基础设计的,如下图所示:

(1)从HDFS Cluster中随机选取一个Datanode(Rack r1/Datanode d12)用于存储第一个复本Replica1;

(2)从其它机架(非Rack r1)中选取一个Datanode(Rack r2/Datanode d21)用于存储第二个复本Replica2;

(3)从机架Rack r2中随机选取一个Datanode(Rack r2/Datanode d22)用于存储第三个复本Replica3;

(4)如果复制因子大于3,则继续从HDFS Cluster中随机选取Datanode用于存储第n(n >= 4)个复本Replica;

说明:

(1)选取的Datanode需要满足:磁盘空间使用不是很多,系统负载不是很高(Datanode不是很繁忙);

(2)同一个机架中不要存储同一个数据块太多的复本;

3. 核心源码剖析

HDFS提供的数据块复本存储策略类结构如下:

HDFS 默认数据块复本存储策略实现类:org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault,它有三个非常重要的组成部分:

(1)NetworkTopology clusterMap;

(2)chooseTarget;

(3)getPipeline;

3.1 NetworkTopology

NetworkTopology(org.apache.hadoop.net.NetworkTopology)使用树形层级结构表示集群内部的网络拓扑结构,如下图:

HDFS默认数据块复本存储策略只考虑目前比较常见的一个数据中心(Data Center)的场景,如下图:

网络拓扑结构(NetworkTopology)的建立过程中涉及到一个非常重要的问题:机架感知,即在一个集群(数据中心)中,HDFS Namenode如何知道一个HDFS Datanode归属于哪个机架位?

在HDFS的实现中,机架感知有一个接口DNSToSwitchMapping(org.apache.hadoop.net.DNSToSwitchMapping):

其中,方法resolve用于解析机器主机名(域名)或IP地址(后续讨论统一使用IP地址)对应的机架ID。也就是说,所谓的机架感知,实际是通过一定的方式根据IP地址解析出对应的机架ID。DNSToSwitchMapping有很多的实现类(机架ID的解析方式可以有很多种),HDFS具体使用哪一种实现,可以通过属性“net.topology.node.switch.mapping.impl”进行指定,这里我们仅仅介绍默认实现:ScriptBasedMapping(org.apache.hadoop.net.ScriptBasedMapping)。

ScriptBasedMapping是DNSToSwitchMapping的一种实现,允许我们提供一个自定义的脚本,用于完成IP地址与机架ID之间的解析。自定义的脚本使用时需要我们在配置文件core-default.xml中进行声明,与之相关的有两个重要的属性:

net.topology.script.file.name:自定义脚本的绝对路径;

net.topology.script.number.args:自定义脚本最多可接受的参数个数,默认值为100;

ScriptBasedMapping每次解析IP地址(可能有多个)对应的机架ID时,均需要调用resolve方法,工作过程如下:

(1)将需要解析的IP地址以列表(List)的参数形式传入resolve方法,即resolve方法可以一次性解析多个IP地址对应的机架ID;

(2)resolve方法内部将IP地址列表转换为自定义脚本可接受的“多个参数”,执行自定义脚本;

(3)获取自定义脚本的输出,并从中解析出与IP地址列表一一对应的机架ID,并以列表(List)的形式返回;

注:受限于自定义脚本最多可接受的参数个数(net.topology.script.number.args),resolve方法内部可能需要将IP地址列表分批多次调用自定义脚本,完成整个解析过程。

假设自定义脚本使用Python语言实现,且仅可以接受一个参数(net.topology.script.number.args=1),脚本示例如下:

如果IP列表为:

ip1

ip3

ip6

解析出的机架ID:

/rack1

/rack3

/rack1

自定义脚本的执行是由ShellCommandExecutor(org.apache.hadoop.util.Shell.ShellCommandExecutor)驱动的,它的内部使用java.lang.ProcessBuilder、java.lang.Process,将脚本以子进程的方式执行,然后通过java.lang.Process.getInputStream()获取脚本的输出内容(一行字符串),最后通过java.util.StringTokenizer从输出内容中获取机架ID。

ShellCommandExecutor执行自定义脚本的过程中,需要注意以下几个问题:

(1)脚本的执行过程是以子进程的形式进行的,高并发调用下可能带来比较大的性能开销;

(2)脚本需要支持接收多个参数,这里特指多个以空格分隔的IP地址字符串;

(3)因为脚本可能接收多个IP地址(2),因此脚本输出可能包含多个机架ID,与输入的IP地址一一对应;如上所述,脚本的输出内容为一行字符串,为了能够从中解析出多个机架ID,该字符串必须以空格、\t、\n、\r、\f之一作为分隔符,否则java.util.StringTokenizer无法正确解析。

综上所述,HDFS默认使用的机架感知策略是通过我们自定义的一个脚本实现的,这给了我们很大的自由度,可以根据自身的实际情况任意调整HDFS Datanode归属的机架ID。

那么,网络拓扑结构,即NetworkTopology实例的内部数据结构,是在什么时候被建立的?

众所周知,HDFS使用的是Master-Slave模式,Namenode相当于Master,Datanode相当于Slave,Datanode启动之后需要向Namenode进行注册,一个个Datanode向Namenode注册的过程中,就包含着机架感知、网络拓扑结构的建立过程,相关代码可以参考:org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.registerDatanode。

NetworkTopology实例的内部数据结构大致如下图所示:

InnerNode与Node的代码可以参考:org.apache.hadoop.net.NetworkTopology.InnerNode、org.apache.hadoop.net.Node。

3.2 chooseTarget

如上文2.1中所述,BlockPlacementPolicyDefault chooseTarget的工作过程可以简要归纳为四步,具体实现时这四步可以对应到四个方法:

(1)从HDFS Cluster中随机选取一个Datanode(Rack r1/Datanode d12)用于存储第一个复本Replica1;

chooseLocalStorage(org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseLocalStorage);

(2)从其它机架(非Rack r1)中选取一个Datanode(Rack r2/Datanode d21)用于存储第二个复本Replica2;

chooseRemoteRack(org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseRemoteRack);

(3)从机架Rack r2中随机选取一个Datanode(Rack r2/Datanode d22)用于存储第三个复本Replica3;

chooseLocalRack(org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseLocalRack);

(4)如果复制因子大于3,则依次从HDFS Cluster中随机选取Datanode用于存储第n(n >= 4)个复本Replica;

chooseRandom(org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault.chooseRandom);

实际上,chooseLocalStorage、chooseRemoteRack、chooseLocalRack最终都会将调用请求转发给chooseRandom进行处理,这是为什么呢?

chooseLocalStorage:从整个集群中“随机”选取一个Node或直接选取本机(忽略这种情况);

chooseRemoteRack:从整个集群中的其它(即排除指定机架)机架中“随机”选取一个Node;

chooseLocalRack:从集群中的指定机架中“随机”选取一个Node;

这三个方法的工作过程中都涉及到“随机”的选取过程,而chooseRandom的方法参数设计过程中已经考虑到了上述三个情况:

chooseLocalStorage调用chooseRandom时:

numOfReplicas:1,表示我们需要选取多少个Node;

scope:网络拓扑结构的根节点(root),即"";表示从整个集群中随机选取;

excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;

chooseRemoteRack调用chooseRandom时:

numOfReplicas:1,表示我们需要选取多少个Node;

scope:~rack,表示从整个集群中非rack机架中随机选取;

excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;

chooseLocalRack调用chooseRandom时:

numOfReplicas:1,表示我们需要选取多少个Node;

scope:rack,表示从集群机架rack中随机选取;

excludedNodes:空值或已经被选取的Nodes,表示选取出的Node不能出现在这些excludedNodes中;

chooseRandom核心源码如下:

可以看出,chooseRandom整个工作流程可以理解为一个循环选取的过程,循环条件即为:“numOfReplicas > 0 && numOfAvailableNodes > 0”,它表示着两个含义:

(1)numOfReplicas > 0,表示仍需要继续选取Node用于存在数据块复本;在我们的讨论中,它的值恒为1;

(2)numOfAvailableNodes > 0,集群中尚有可用的Node用于选取;它的值是通过NetworkTopology countNumOfAvailableNodes(org.apache.hadoop.net.NetworkTopology.countNumOfAvailableNodes)计算而得的;根据计算上下文(chooseLocalStorage、chooseRemoteRack、chooseLocalRack)的不同,它可以计算整个集群、整个集群排除某机架、集群中指定机架的可用Node数目;

每一次的选取可以大致分为以下几步:

(1)从集群网络拓扑结构(NetworkTopology)中随机选取一个Node firstChosen(DatanodeDescriptor,实现Node接口,用于描述HDFS Datanode信息);

(2)如果firstChosen没有出现在excludedNodes中,则继续;否则,结束本次选取;

(3)获取firstChosen中存储信息storages(每一个HDFS Datanode可以指定多个存储位置,每一个存储位置使用DatanodeStorageInfo表示),并对这些storages进行随机排序(shuffle);

(4)依次从这些storages中寻找“适合”(addIfIsGoodTarget)的存储位置(DatanodeStorageInfo);

也就是说,最终返回的不只是某一个HDFS Datanode,还包括具体的存储位置,即DatanodeStorageInfo。

NetworkTopology chooseRandom

我们以一个具体的示例来说明集群网络拓扑结构的“随机选取”的过程。

假设我们的集群网络拓扑结构如下:

“随机选取”仅仅选取“Leaf Node”(叶子节点,表示Datanode),以“深度优先”的方式依次输出这些“Leaf Node”:

(1)整个集群中选取;

整个集群中可供选取的Node数目为9,取1~9之间的随机数,假设为4,则选取Node的方式为:从头部开始,选取第4个Node即可。

(2)整个集群中选取,且排除机架rack2;

整个集群需要排除机架rack2包含的Nodes,可供选取的Node数目为6,取1~6之间的随机数,假设为6,则选取Node的方式为:从头部开始,选取第6个Node即可。

(3)集群机架rack1中选取;

集群机架rack1中可供选取的Node数目为4,取1~4之间的随机数,假设为3,则选取Node的方式为:从头部开始,选取第3个Node即可。

BlockPlacementPolicyDefault addIfIsGoodTarget

 

如前所述,每一个Datanode可以包含多个存储位置(Datanode指定多个目录用于存储数据块复本,每一个目录挂载一块磁盘),选取Datanode完成之后,还需要从中寻找出“合适”的存储位置(DatanodeStorageInfo),如果从这个Datanode中无法找到“合适”的存储位置,则需要继续选取Node的过程。

“合适”包括七个方面,源码如下:

(1)存储类型是否匹配,SSD/DISK;

(2)存储是否为只读;

(3)存储所处的Datanode是否处于下线状态;

(4)存储所处的Datanode状态是否异常;

(5)存储空间是否足够;

(6)存储所处的Datanode负载是否过高;

(7)存储所处的Datanode归属的机架中存储同一数据块的复本的Datanodes数目是否超过设置maxNodesPerRack;

注:(7)中maxNodesPerRack的计算公式:int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;

3.3 getPipeline

用于存储数据块复本的Nodes选取完毕之后,还需要将Client与这些Nodes形成一个“管道”,而且这个“管道”的“节点距离之和”(数据传输距离)必须最短,getPipeline就是用来实现这个“管道”的形成过程的,实际就是一个排序的过程。

注:关于“节点距离”的概念可参考<<Hadoop The Definitive Guide>>章节:“Anatomy of a File Read”。

假设我们有一个Client和选取出的5个Nodes:d1、d2、d3、d4、d5,排序过程如下:

(1)从d1、d2、d3、d4、d5中找出与Client之间距离最短的Node,假设为d2;

(2)从d1、d3、d4、d5中找出与d2之间距离最短的Node,假设为d4;

(3)从d1、d3、d5中找出与d4之间距离最短的Node,假设为d1;

(4)从d3、d5中找出与d1之间的距离最短的Node,假设为d5;

最后我们即得到一个“节点距离之和”的管道:Client、d2、d4、d1、d5、d3。

时间: 2024-07-31 20:15:04

HDFS Block Replica Placement实现原理的相关文章

Hdfs block数据块大小的设置规则

1.概述 hadoop集群中文件的存储都是以块的形式存储在hdfs中. 2.默认值 从2.7.3版本开始block size的默认大小为128M,之前版本的默认值是64M. 3.如何修改block块的大小? 可以通过修改hdfs-site.xml文件中的dfs.blocksize对应的值. 注意:在修改HDFS的数据块大小时,首先停掉集群hadoop的运行进程,修改完毕后重新启动. 4.block块大小设置规则 在实际应用中,hdfs block块的大小设置为多少合适呢?为什么有的是64M,有的

2本Hadoop技术内幕电子书百度网盘下载:深入理解MapReduce架构设计与实现原理、深入解析Hadoop Common和HDFS架构设计与实现原理

这是我收集的两本关于Hadoop的书,高清PDF版,在此和大家分享: 1.<Hadoop技术内幕:深入理解MapReduce架构设计与实现原理>董西成 著  机械工业出版社2013年5月出版 2.<Hadoop技术内幕:深入解析Hadoop Common和HDFS架构设计与实现原理>蔡斌.陈湘萍 著  机械工业出版社2013年4月出版 百度网盘下载地址: http://pan.baidu.com/s/1sjNmkFj

Kafka Topic Partition Replica Assignment实现原理及资源隔离方案

本文共分为三个部分: Kafka Topic创建方式 Kafka Topic Partitions Assignment实现原理 Kafka资源隔离方案 1. Kafka Topic创建方式 Kafka Topic创建方式有以下两种表现形式: (1)创建Topic时直接指定Topic Partition Replica与Kafka Broker之间的存储映射关系 /usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeep

Block中__block实现原理

三.Block中__block实现原理 我们继续研究一下__block实现原理. 1.普通非对象的变量 先来看看普通变量的情况. #import <Foundation/Foundation.h> int main(int argc, const char * argv[]) { __block int i = 0; void (^myBlock)(void) = ^{ i ++; NSLog(@"%d",i); }; myBlock(); return 0; } 把上述代

Hadoop学习第二次:HDFS的应用场景 部署 原理与基本框架

1.HDFS的定义与特色 以文件为基本存储单位的劣势:难以实现负载均衡——文件大小不同,负载均衡难实现:用户自己控制文件大小: 难以实现并行化处理——只能利用一个节点资源处理一个文件,无法动用集群资源: HDFS的定义:易于扩展的分布式文件系统:运行在大量廉价机器上,提供容错机制:为大量用户提供性能不错的文件存储服务: 优点:高容错性(数据自动保存多个副本,副本丢失后自动恢复)  适合批处理(移动计算而不是数据,数据位置暴露给计算框架) 适合大数据的处理 流式文件访问 可构建在廉价的机器上 不擅

block的底层实现原理?

block就是指向结构体的指针,编译器会将block的内部代码生成对应的函数,利用这个指针就可以调用这个函数.普通的局部变量是值传递,用__block ,static ,或者是全局变量就是地址传递 block的内存默认是存放在栈里面的,他不会对所引用的对象进行操作 如果对block做一次copy操作block的内存就会在堆区,他会对所引用的对象做一次retain操作.为了防止循环引用 MRC:用__block typeof(self) [这么写重用率比较高]就不会做retain操作  ARC:用

HDFS(二) 底层通信原理——RPC 及 动态代理

一.RPC(Remote Procedure Call  ) :远程过程调用 1.RPC是远程过程调用协议,实现调用者和被调用者二地之间的连接和通信.其基本通信模型是基于client/server进程间相互通信模型 ,如图1所示.                                图 1    使用RPC调用完成远程调用示意图 2.利用HADOOP的RPC框架实现Server和Client远程通信 (1)定义一个接口    MyInterface (2) 定义接口的实现类 (3)RP

hadoop balancer hbase balancer

Hadoop 均衡器 Hadoop在运行过程中,其datanode的块会越来越不平衡,不平衡的集群会导致部分datanode相对更繁忙. Hadoop的均衡器是一个守护进程.它会重新分配块,将块从忙碌的datanode移到相对空闲的datanode.同时坚持复本策略,将复本分散到不同机架,以降低数据损坏率. 集群均衡标准:每个datanode的使用率和集群的使用率非常接近,差距不超过给定的阀值. datanode使用率:该节点上已使用的空间与空间总量之间的比率: 集群的使用率:集群中已使用的空间

HDFS集中式的缓存管理原理与代码剖析--转载

原文地址:http://yanbohappy.sinaapp.com/?p=468 Hadoop 2.3.0已经发布了,其中最大的亮点就是集中式的缓存管理(HDFS centralized cache management).这个功能对于提升Hadoop系统和上层应用的执行效率与实时性有很大帮助,本文从原理.架构和代码剖析三个角度来探讨这一功能. 主要解决了哪些问题 1.用户可以根据自己的逻辑指定一些经常被使用的数据或者高优先级任务对应的数据常驻内存而不被淘汰到磁盘.例如在Hive或Impala