Hadoop源码系列(一)FairScheduler申请和分配container的过程

1、如何申请资源

1.1 如何启动AM并申请资源

1.1.1 如何启动AM

val yarnClient = YarnClient.createYarnClient
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()
// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)

1.1.2 FairScheduler如何处理AM的ResourceRequest

1、FairScheduler接收到SchedulerEventType.APP_ADDED之后,调用addApplication方法把把RMApp添加到队列里面,结束之后发送RMAppEventType.APP_ACCEPTED给RMApp

2、RMApp启动RMAttempt之后,发送SchedulerEventType.APP_ATTEMPT_ADDED给FairScheduler

LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user);

3、FairScheduler调用addApplicationAttempt方法,发送RMAppAttemptEventType.ATTEMPT_ADDED事件给RMAppAttempt,RMAppAttempt随后调用Scheduler的allocate方法发送AM的ResourceRequest

4、FairScheduler在allocate方法里面对该请求进行处理,FairScheduler对于AM的资源请求的优先级上并没有特殊的照顾,详细请看章节2 如何分配资源

1.2 AM启动之后如何申请资源

1.2.1、注册AM

amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)

1.2.2、发送资源请求

// 1.创建资源请求
amClient.addContainerRequest(request)
// 2.发送资源请求
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
  // 3.请求返回之后处理Container
  handleAllocatedContainers(allocatedContainers.asScala)
}

1.2.3、启动Container

def startContainer(): java.util.Map[String, ByteBuffer] = {
 val ctx = Records.newRecord(classOf[ContainerLaunchContext])
   .asInstanceOf[ContainerLaunchContext]
 val env = prepareEnvironment().asJava

 ctx.setLocalResources(localResources.asJava)
 ctx.setEnvironment(env)

 val credentials = UserGroupInformation.getCurrentUser().getCredentials()
 val dob = new DataOutputBuffer()
 credentials.writeTokenStorageToStream(dob)
 ctx.setTokens(ByteBuffer.wrap(dob.getData()))

 val commands = prepareCommand()

 ctx.setCommands(commands.asJava)
 ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)

 // If external shuffle service is enabled, register with the Yarn shuffle service already
 // started on the NodeManager and, if authentication is enabled, provide it with our secret
 // key for fetching shuffle files later
 if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
   val secretString = securityMgr.getSecretKey()
   val secretBytes =
   if (secretString != null) {
     // This conversion must match how the YarnShuffleService decodes our secret
     JavaUtils.stringToBytes(secretString)
   } else {
     // Authentication is not enabled, so just provide dummy metadata
     ByteBuffer.allocate(0)
   }
   ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
 }

 // Send the start request to the ContainerManager
 try {
   nmClient.startContainer(container.get, ctx)
 } catch {
   case ex: Exception =>
     throw new SparkException(s"Exception while starting container ${container.get.getId}" +
       s" on host $hostname", ex)
 }
}

2、如何分配资源

2.1 接受资源请求步骤

在FairScheduler的allocate方法里面仅仅是记录ResourceRequest,并不会真正的立马分配。

流程如下:

1、检查该APP是否注册过

2、检查资源的请求是否超过最大内存和最大CPU的限制

3、记录资源请求的时间,最后container分配的延迟会体现在队列metrics的appAttemptFirstContainerAllocationDelay当中

4、释放AM发过来的已经不需要的资源,主要逻辑在FSAppAttempt的containerCompleted方法里

5、更新资源请求,所有资源请求都是记录在AppSchedulingInfo当中的requests(注意:只有是ANY的资源请求才会被立马更新到QueueMetrics的PendingResources里)

6、找出该APP被标记为抢占的container ID列表preemptionContainerIds

7、更新APP的黑名单列表,该信息被记录在AppSchedulingInfo当中

8、从FSAppAttempt的newlyAllocatedContainers当中获取最新被分配的container

9、返回preemptionContainerIds、HeadRoom、ContainerList、NMTokenList。(注:Headroom = Math.min(Math.min(queueFairShare - queueUsage, 0), maxAvailableResource)

2.2 请求和分配的关系

请求和分配的过程是异步的,关系如上图,每次调用allocate获得的container,其实是之前的请求被分配的结果

2.3 如何分配

2.3.1 分配方式

分配有两种方式:

1、接收到NodeManager的心跳的时候进行分配

NodeManager每隔一秒(yarn.resourcemanager.nodemanagers.heartbeat-interval-ms)给ResourceManager发送一个心跳事件NODE_UPDATE,接收到心跳事件之后,在FairScheduler的nodeUpdate方法里进行处理。

NodeManager会汇报新启动的Container列表newlyLaunchedContainers和已经结束的Container列表completedContainers。然后在attemptScheduling方法里面进行分配。

2、持续调度方式

它有一个单独的线程,线程名称是FairSchedulerContinuousScheduling,每5毫秒对所有节点的资源进行排序,然后遍历所有节点,调用attemptScheduling方法进行分配。

开启持续调度模式之后,在接收到心跳事件NODE_UPDATE的时候,只有在completedContainers不为空的情况下,才会进行调度

attemptScheduling首先会检查是否有资源预留,如果有预留,则直接为预留的APP分配container

没有预留的分配过程如下:

1、最大可分配资源为这台机器的可用资源的一半,从root队列开始自上而下进行分配Resource assignment = queueMgr.getRootQueue().assignContainer(node);

2、分配到一个Container之后,判断是否要连续分配多个,最大支持连续分配多少个?

以下是涉及到的各个参数以及参数的默认值:

yarn.scheduler.fair.assignmultiple false (建议设置为true)

yarn.scheduler.fair.dynamic.max.assign true (hadoop2.7之后就没有这个参数了)

yarn.scheduler.fair.max.assign -1 (建议设置为2~3,不要设置得太多,否则会有调度倾斜的问题)

2.3.2 如何从队列当中选出APP进行资源分配

入口在queueMgr.getRootQueue().assignContainer(node);

1、检查当前队列的使用量是否小于最大资源量

2、首先对子队列进行排序,优先顺序请参照章节 2.3.4 如何确定优先顺序

3、排序完再调用子队列的assignContainer方法分配container

4、一直递归到叶子队列

叶子队列如何进行分配?

1、先对runnableApps进行排序,排序完成之后,for循环遍历一下

2、先检查该Node是否在APP的黑名单当中

3、检查该队列是否可以运行该APP的AM,主要是检查是否超过了maxAMShare(根据amRunning字段判断是否已经启动了AM了)

检查逻辑的伪代码如下:

maxResource = getFairShare()
if (maxResource == 0) {
  // 最大资源是队列的MaxShare和集群总资源取一个小的值
  maxResource = Math.min(getRootQueue().AvailableResource(), getMaxShare());
}
maxAMResource = maxResource * maxAMShare
if (amResourceUsage + amResource) > maxAMResource) {
  // 可以运行
  return true
} else {
  // 不可以运行
  return false
}

4、给该APP分配container

下面以一个例子来说明分配的过程是如何选择队列的:

假设队列的结构是这样子的

root

---->BU_1

-------->A

-------->B

---->BU_2

-------->C

-------->D

2.3.3 任务分配Container的本地性

任务分配Container的时候会考虑请求的本地性,对于调度器来说,它的本地性分为三种:NODE_LOCAL, RACK_LOCAL, OFF_SWITCH

具体方法位于FSAppAttempt的assignContainer方法

遍历优先级

给该优先级的调度机会+1

获取RackLocal和NodeLocal的任务

计算允许分配的本地性级别allowedLocality,默认是NODE_LOCAL

1、心跳分配方式

计算调度机会,如果该优先级的任务的调度机会超过了(节点数 * NODE_LOCAL阈值),降级为RACK_LOCAL,如果该优先级的任务的调度机会超过了(节点数 * RACK_LOCAL阈值),降级为OFF_SWITCH

2、连续分配方式

计算等待时间waitTime -= lastScheduledContainer.get(priority);

如果waitTime超过了NODE_LOCAL允许的delay时间,就降级为RACK_LOCAL,再超过RACK_LOCAL允许的delay的时间,就降级为OFF_SWITCH

分配NODE_LOCAL的container

允许分配的本地性级别>=RACK_LOCAL,分配RACK_LOCAL的container

允许分配的本地性级别=OFF_SWITCH,分配OFF_SWITCH的container

都分不到,等待下一次机会

相关参数:

默认值全是-1,则允许的本地性级别是OFF_SWITCH

yarn.scheduler.fair.locality-delay-node-ms -1

yarn.scheduler.fair.locality-delay-rack-ms -1

yarn.scheduler.fair.locality.threshold.node -1

yarn.scheduler.fair.locality.threshold.rack -1

2.3.4 Container分配

1、检查该节点的资源是否足够,如果资源充足

2、如果当前的allowedLocality比实际分配的本地性低,则重置allowedLocality

3、把新分配的Container加到newlyAllocatedContainers和liveContainers列表中

4、把分配的container信息同步到appSchedulingInfo当中

5、发送RMContainerEventType.START事件

6、更新FSSchedulerNode记录的container信息

7、如果被分配的是AM,则设置amRunning为true

如果资源不够,则检查是否可以预留资源

条件:

1)Container的资源请求必须小于Scheduler的增量分配内存 * 倍数(默认应该是2g)

2)如果已经存在的预留数 < 本地性对应的可用节点 * 预留比例

3)一个节点只允许同时为一个APP预留资源

相关参数:

yarn.scheduler.increment-allocation-mb 1024

yarn.scheduler.increment-allocation-vcores 1

yarn.scheduler.reservation-threshold.increment-multiple 2

yarn.scheduler.fair.reservable-nodes 0.05

2.3.4 如何确定优先顺序

该比较规则同时适用于队列和APP,详细代码位于FairSharePolicy当中

MinShare = Math.min(getMinShare(), getDemand())

1、(当前资源使用量 / MinShare)的比值越小,优先级越高

2、如果双方资源使用量都超过MinShare,则(当前资源使用量 / 权重)的比值越小,优先级越高

3、启动时间越早,优先级越高

4、最后实在比不出来,就比名字...

从上面分配的规则当中能看出来MinShare是非常重要的一个指标,当资源使用量没有超过MinShare之前,队列在分配的时候就会比较优先,切记一定要设置啊!

注:getMinShare()是FairScheduler当中队列的minResources

<minResources>6887116 mb,4491 vcores</minResources>
时间: 2024-10-13 21:49:52

Hadoop源码系列(一)FairScheduler申请和分配container的过程的相关文章

安装Hadoop系列 — 导入Hadoop源码项目

将Hadoop源码导入Eclipse有个最大好处就是通过 "ctrl + shift + r" 可以快速打开Hadoop源码文件. 第一步:在Eclipse新建一个Java项目,hadoop-1.0.3 第二步:将Hadoop程序src下core, hdfs, mapred, tools几个目录copy到上述新建项目的src目录 第三步:修改将Java Build Path,删除src,添加src/core, src/hdfs....几个源码目录 第四步:为Java Build Pat

hbase源码系列(十三)缓存机制MemStore与Block Cache

这一章讲hbase的缓存机制,这里面涉及的内容也是比较多,呵呵,我理解中的缓存是保存在内存中的特定的便于检索的数据结构就是缓存. 之前在讲put的时候,put是被添加到Store里面,这个Store是个接口,实现是在HStore里面,MemStore其实是它底下的小子. 那它和Region Server.Region是什么关系? Region Server下面有若干个Region,每个Region下面有若干的列族,每个列族对应着一个HStore. HStore里面有三个很重要的类,在这章的内容都

Spark源码系列(一)spark-submit提交作业过程

前言 折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程.有不明白Spark的原理的话,有另外一位大牛已经写了一个系列的Spark的源码分析了,大家可以去参考他的,他的过程图画得非常好,他写过的我可能就不写了,实在没办法比人家写得更好. 下面给出他的地址: http://www.cnblogs.com/hseagle/p/3664933.html,屌丝们,赶紧去膜拜大神吧. 这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配

Android 源码系列之&lt;十三&gt;从源码的角度深入理解LeakCanary的内存泄露检测机制(中)

转载请注明出处:http://blog.csdn.net/llew2011/article/details/52958563 在上篇文章Android 源码系列之<十二>从源码的角度深入理解LeakCanary的内存泄露检测机制(上)中主要介绍了Java内存分配相关的知识以及在Android开发中可能遇见的各种内存泄露情况并给出了相对应的解决方案,如果你还没有看过上篇文章,建议点击这里阅读一下,这篇文章我将要向大家介绍如何在我们的应用中使用square开源的LeakCanary库来检测应用中出

Spark源码系列(七)Spark on yarn具体实现

本来不打算写的了,但是真的是闲来无事,整天看美剧也没啥意思.这一章打算讲一下Spark on yarn的实现,1.0.0里面已经是一个stable的版本了,可是1.0.1也出来了,离1.0.0发布才一个月的时间,更新太快了,节奏跟不上啊,这里仍旧是讲1.0.0的代码,所以各位朋友也不要再问我讲的是哪个版本,目前为止发布的文章都是基于1.0.0的代码. 在第一章<spark-submit提交作业过程>的时候,我们讲过Spark on yarn的在cluster模式下它的main class是or

Spark源码系列(三)作业运行过程

导读 看这篇文章的时候,最好是能够跟着代码一起看,我是边看代码边写的,所以这篇文章的前进过程也就是我看代码的推进过程. 作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方法就能出结果,那我们就从collect开始看吧,进入RDD,找到collect方法. def collect(): Array[T] = { val results = sc.runJob(this, (iter: It

Hadoop源码编译与调试汇总

虽然在运行Hadoop的时候可以打印出大量的运行日志,但是很多时候只通过打印这些日志是不能很好地跟踪Hadoop各个模块的运行状况.这时候编译与调试Hadoop源码就得派上场了.这也就是今天本文需要讨论的. 先说说怎么编译Hadoop源码,本文主要介绍在Linux环境下用Maven来编译Hadoop.在编译Hadoop之前,我们需要准备好编译环境: 1.安装好1.6或以上的JDK;2.安装Maven,被做好相应的配置;3.安装ProtocolBuffer 2.5.0,MapReduce和HDFS

hbase源码系列(二)HTable 如何访问客户端

hbase的源码终于搞一个段落了,在接下来的一个月,着重于把看过的源码提炼一下,对一些有意思的主题进行分享一下.继上一篇讲了负载均衡之后,这一篇我们从client开始讲吧,从client到master再到region server,按照这个顺序来开展,网友也可以对自己感兴趣的部分给我留言或者直接联系我的QQ. 现在我们讲一下HTable吧,为什么讲HTable,因为这是我们最常见的一个类,这是我们对hbase中数据的操作的入口. 1.Put操作 下面是一个很简单往hbase插入一条记录的例子.

Hadoop源码如何查看

如何查看hadoop源码 1解压hadoop安装压缩文件成为文件夹,再进入解压后的文件夹下的src文件夹,选中core,hdfs,mapred三个文件夹 2打开eclipse新建一个Java工程项目 3将步骤1中的三个文件夹复制到新建的工程的src目录下 此时会出现2个问题:1是因为缺少工程文件的jar包 2是因为src下的3个文件的路径不对,需要修改其对应的路径 4右键工程名选择最后一项proprity(属性)--->选择左面的Java build path--->先删掉右面的Source下