Hadoop 2.0 Yarn代码:心跳驱动服务分析


当RM(ResourcesManager)和NM(NodeManager)陆续将所有模块服务启动,最后启动是NodeStatusUpdater,NodeStatusUpdater将用Hadoop
RPC远程调用ResourcesTrackerService中的函数,进行资源是初始化等操作,为将要运行的Job做好准备。以下主要分析在Job提交之前 RM与NM在心跳的驱动下操作。

AD:

hadoop-yarn-server-resourcemanager下的包

org.apache.hadoop.yarn.server.resourcemanager

ResourceTrackerService.java

org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo

FifoScheduler.java

org.apache.hadoop.yarn.server.resourcemanager.rmnode

RMNodeImpl.java

hadoop-yarn-server-nodemanager下的包

org.apache.hadoop.yarn.server.nodemanager

NodeStatusUpdaterImpl.java

2.代码分析

各个服务代码已经启动,NodeStatusUpdate启动后开始驱动整个Hadoop运行

1).NodeStatusUpdaterImpl(NodeManager端):

NodeStatusUpdaterImpl一经被启动,start()函数被调用,进行Hadoop RPC服务端的初始化操作(调用getServer函数创建服务等等)。

start()函数主要依次调用registerWithRM()函数和startStatusUpdater()函数

registerWithRM()函数

设置必要配置信息,和安全认证操作

利用Hadoop RPC远程调用RM端ResourcesTrackerService下的registerNodeManager()方法,详细见后面ResourcesTrackerService下的registerNodeManager()代码分析

startStatusUpdater()函数

创建一个线程,然后启动,所有操作都在运行while的循环中

设置、获取和输出必要配置信息,其中比较重要的调用getNodeStatus()方法,获取本地Container和本地Node的状态,以供后面的nodeHeartbeat()方法使用

通过Hadoop RPC远程调用RM端ResourcesTrackerService下的nodeHeartbeat()函数,用while循环以一定时间间隔向RM发送心跳信息,心跳操作见下面ResourcesTrackerService下nodeHeartbeat()函数

nodeHeartbeat()将返回给NM信息,根据返回的response,根据response返回的信息标记不需要的Container和Application发送相应的FINISH_CONTAINERS和FINISH_APPS给ContainerManager,进行清理操作----详细见后面的代码分析

2).ResourceTrackerService(ResourcesManager端):

ResourceTrackerService开头与NodeStatusUpdaterImpl相似,start()函数被调用,初始化Hadoop
RPC服务端,等待远程来调用ResourceTrackerService中的函数

接上面的NodeStatusUpdaterImpl中对registerNodeManager()和nodeHeartbeat()的Hadoop
RPC调用,详细调用细节见下文

以下分成主要从两个函数registerNodeManager()和nodeHeartbeat()开始分析,所以分成两部分---

第一部分:

1).接前文ResourceTrackerService下的registerNodeManager()函数

首先获取本地的NodeID,还有相应的主机名、端口、请求资源信息。

进行安全认证等辅助操作,检查NodeID所标记的Node是否 有效 .如果 无效的话,立即返回

Node 有效说明此Node可用,于是创建RMNode(new RMNodeImpl)来识别这个Node的状态和监测在这个Node上运行的Container和Application

判断其是否为新RMNode,如果是则向其发送RMNodeEventType.STARTED

如果不是新的RMNode,则发送RMNodeEventType.RECONNECTED到RMNode,重新连接Node,见附加代码分析。

最后返回给调用方操作结果。

2).RMNodeImpl:当接收RMNodeEventType.STARTED后(接1)),发生状态转移NodeState(NEW
RUNNING),Transition函数被调用

向调度器(FifoScheduler)发送NODE_ADDED。

判断这个Node是否Inactive,如果在Inactive中则,则先将这个Node移除出Inactive,否则增加ActiveNodes的数目。

3).FifoScheduler:接受NODE_ADDED事件,调用addNode()函数,向RM报告新添加的Node的状态

addNode函数被调用,首先将接收到的RMNode的NodeID和其相应新创建的SchedulerNode(包含对资源的各种操作)放在ConcurrentHashMap类型的node对象中。

再调用Resources下的addTo()函数,累加Node的资源数量,来计算集群中拥有的资源数量

至此NM端的Node已经添加到RM的管辖范围下,NM成功注册到RM

附加代码分析

附加2).RMNodeImpl:当RMNode接收RMNodeEventType.RECONNECTED(接1)),则保持当前状态不变(RUNNING或者UNHEALTHY),Transition函数被调用

首先向调度器(FifoScheduler)发送NODE_REMOVED事件,删除当前Node

然后重新连接操作,如果新连接的Node与上一次断开的Node为同一个,则直接向调度器发送NODE_ADDED事件,如果两个Node不是同一个,则更新关于Node的参数,再将新的Node加入ConcurrentHashMap类型的node对象中(与前面FifoScheduler中的是同一个)

最后向新的RMNode发送RMNodeEventType.STARTED

附加3).FifoScheduler:接到NODE_REMOVED事件,调用removeNode()函数

removeNode()函数中,先将此Node上的Container全部Kill掉,通过发送RMContainerEventType.KILL实现,因为现在讨论没有Job运行,所以没有Container可以Kill

从nodes中移出此Node,最后计算集群资源,将相应Node的资源数量从集群资源总量扣除,完毕

第二部分

1).接前文ResourceTrackerService下的nodeHeartbeat()函数,各个NM已经注册到RM上,则各个NM开始调用这个函数向RM发送 心跳 保持联系,另外这里讨论的是未提交Job的情况下

获取所需操作的参数变量,例如NodeStatus、NodeId等

验证发送这次 心跳的 NM是否已经注册到RM,若未注册则返回给NM让其进行重新(reboot)注册到RM中(实际上就是让NodeStatusUpdater跳过此次循环)。

验证这个NM是否 有效 (有效:主机队列包含这个NM或者黑名单没有这个NM),如 无效 ,则发送RMNodeEventType.DECOMMISSION到NM相应的RMNode中,并关闭(shutdown)这个NM---下接附加2)

验证这次 心跳是否与上一个 心跳 重复或者是不是新的 心跳 ,这个通过心跳ID实现,如果重复则输出心跳重复信息,并且直接返回,如果不是新的心跳,则向RMNode发送RMNodeEventType.REBOOTING,然后返回reboot,让NM 重启 (和上面一样NodeStatusUpdater跳过当此次循环)---下接附加2)

设置新的 心跳 ID,获取Container和Application的信息

向RMNode发送包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,然后返回相应设置好的response给调用者。

2).RMNodeImpl:RMNode接收到包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,RMNodeImpl状态迁移NodeState(RUNNING
UNHEALTHY或RUNNING RUNNING),Transition函数被调用

首先从RMNodeStatusEvent获得必要的变量,然后判断相应的Node的 健康 情况,如果出现问题,则向调度器NODE_REMOVED,则调度器将此NM从集群管理删除(详见第一部分 附加3)),然后发送NODE_UNUSABLE到NodeListManager,将其RMNode放到 unusable 的set集合当中,此时RMNodeImpl的NodeState(RUNNING
UNHEALTHY)

如果 健康则顺利运行,获取NM远程传过来的关于Container的信息(是在NM端用Hadoop
RPC调用nodeHeartbeat()时传送过来的),

说明:

由于这个地方讨论的时候,无Job提交过来,NM端无Container启动,NM发送到RM的事件里面的装有Container状态的List为空,所以只传送 心跳 表明NM的活动信息,并没有实际处理,RM端也无Application处理,接受 心跳 只会向RMNode发送RMNodeCleanContainerEvent事件,清理可能互动的Container(对应的代码见FifoScheduler下的containerLaunchedOnNode函数)。若详见处理情况的运行状态,参见后面的文章:RM与NM代码_心跳驱动服务分析_2
Container的配置和分配(Job提交)一篇。此时RMNodeImpl的NodeState(RUNNING
RUNNING)

到这为止,RM-NM端的代码已经启动完成,RM和NM之间以一定的时间间隔用心跳交互信息,等待Job的提交。

更多精彩内容请关注:http://bbs.superwu.cn

关注超人学院微信二维码:

关注超人学院java免费交流群:455085757

时间: 2024-07-30 06:27:46

Hadoop 2.0 Yarn代码:心跳驱动服务分析的相关文章

Hadoop 2.0 中的资源管理框架 - YARN(Yet Another Resource Negotiator)

1. Hadoop 2.0 中的资源管理 http://dongxicheng.org/mapreduce-nextgen/hadoop-1-and-2-resource-manage/ Hadoop 2.0指的是版本为Apache Hadoop 0.23.x.2.x或者CDH4系列的Hadoop,内核主要由HDFS.MapReduce和YARN三个系统组成,其中,YARN是一个资源管理系统,负责集群资源管理和调度,MapReduce则是运行在YARN上离线处理框架,它与Hadoop 1.0中的

Hadoop MapReduce2.0(Yarn)

MapReduce2.0(Yarn) MapReduce2.0是在Hadoop0.23开始采用的,叫做MapReduce2.0或者MRv2或者Yarn. MRv2的主要思想是把jobtracker的任务分为两个基本的功能,一个是资源管理,一个是任务监控,这两个任务分别用不同的进程来运行.这个想法使拥有一个全局的资源管理器(ResourceManager)和每个应用程序的应用程序管理器(ApplicationMaster).一个应用程序要么使用传统的MapReduce任务来运行,要么以DAG形式的

Hadoop 2.0 NameNode HA和Federation实践

参考链接:Hadoop 2.0 NameNode HA和Federation实践 Posted on 2012/12/10 一.背景 天云趋势在2012年下半年开始为某大型国有银行的历史交易数据备份及查询提供基于Hadoop的技术解决方案,由于行业的特殊性,客户对服务的可用性有着非常高的要求,而HDFS长久以来都被单点故障的问题所困扰,直到Apache Hadoop在2012年5月发布了2.0的alpha版本,其中MRv2还很不成熟,可HDFS的新功能已经基本可用,尤其是其中的的High Ava

Hadoop学习之YARN框架

转自:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/,非常感谢分享! 对于业界的大数据存储及分布式处理系统来说,Hadoop 是耳熟能详的卓越开源分布式文件存储及处理框架,对于 Hadoop 框架的介绍在此不再累述,读者可参考 Hadoop 官方简介.使用和学习过老 Hadoop 框架(0.20.0 及之前版本)的同仁应该很熟悉如下的原 MapReduce 框架图: 图 1.Hadoop 原 MapReduce

Hadoop 2.0中单点故障解决方案总结

项目构建 Hadoop 1.0内核主要由两个分支组成:MapReduce和HDFS,众所周知,这两个系统的设计缺陷是单点故障,即MR的JobTracker和HDFS的NameNode两个核心服务均存在单点问题,该问题在很长时间内没有解决,这使得Hadoop在相当长时间内仅适合离线存储和离线计算. 令人欣慰的是,这些问题在Hadoop 2.0中得到了非常完整的解决.Hadoop 2.0内核由三个分支组成,分别是HDFS.MapReduce和YARN,而Hadoop生态系统中的其他系统,比如HBas

Hadoop数据操作系统YARN全解析

“ Hadoop 2.0引入YARN,大大提高了集群的资源利用率并降低了集群管理成本.其在异构集群中是怎样应用的?Hulu又有哪些成功实践可以分享? 为了能够对集群中的资源进行统一管理和调度,Hadoop 2.0引入了数据操作系统YARN.YARN的引入,大大提高了集群的资源利用率,并降低了集群管理成本.首先,YARN允许多个应用程序运行在一个集群中,并将资源按需分配给它们,这大大提高了资源利用率,其次,YARN允许各类短作业和长服务混合部署在一个集群中,并提供了容错.资源隔离及负载均衡等方面的

hadoop备战:yarn框架的简介(mapreduce2)

新 Hadoop Yarn 框架原理及运作机制 重构根本的思想是将 JobTracker 两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度 / 监控.新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调.一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务.ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织.

基于Hadoop2.0、YARN技术的大数据高阶应用实战(Hadoop2.0\YARN\Ma

Hadoop的前景 随着云计算.大数据迅速发展,亟需用hadoop解决大数据量高并发访问的瓶颈.谷歌.淘宝.百度.京东等底层都应用hadoop.越来越多的企 业急需引入hadoop技术人才.由于掌握Hadoop技术的开发人员并不多,直接导致了这几年hadoop技术的薪水远高于JavaEE及 Android程序员. Hadoop入门薪资已经达到了8K以上,工作1年可达到1.2W以上,具有2-3年工作经验的hadoop人才年薪可以达到30万—50万. 一般需要大数据处理的公司基本上都是大公司,所以学

Hadoop2.0/YARN深入浅出(Hadoop2.0、Spark、Storm和Tez)

随着云计算.大数据迅速发展,亟需用hadoop解决大数据量高并发访问的瓶颈.谷歌.淘宝.百度.京东等底层都应用hadoop.越来越多的企 业急需引入hadoop技术人才.由于掌握Hadoop技术的开发人员并不多,直接导致了这几年hadoop技术的薪水远高于JavaEE及 Android程序员. Hadoop入门薪资已经达到了 8K 以上,工作1年可达到 1.2W 以上,具有2-3年工作经验的hadoop人才年薪可以达到 30万—50万 . 一般需要大数据处理的公司基本上都是大公司,所以学习had