</pre></h2><div><p>本节课内容:</p><p>1. TaskSchedulerBackend与SchedulerBackend</p><p>2. FIFO与FAIR两种调度模式</p><p>3. Task数据本地性资源的分配</p></div><h3>一、Scheduler运行过程(Spark-shell角度)</h3><h4>1.启动Spark-shell</h4><div><img src="http://img.blog.csdn.net/20160515013848589?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" /></div><div><img src="http://img.blog.csdn.net/20160515013852433?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" /></div><div><img src="http://img.blog.csdn.net/20160515013856370?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQv/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="" /></div><div></div><div><p>当我们spark-shell本身的时候命令终端返回来的主要是ClientEndpoint和SparkDeploySchedulerBakcend。这是因为此时还没有任何应用程序Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext并注册当前的应用程序给Master,并从集群中获得ExecutorBackend的计算资源;(这就是为什么启动时日志没有DriverEndpoint信息的原因,因为此时应用程序内部还未发生具体计算资源的调度)</p><h4>2.TaskScheduler运行时机</h4><p> DAGScheduler划分好Stage后,会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有的任务TaskSet,TaskSetManager会根据locality aware来为Task奉陪计算资源,监控Task的执行状态。(例如重试、慢任务以及进行推测式执行等)</p><p> </p><h3>二、TaskScheduler与SchedulerBackend</h3><h4> 1.底层调度的总流程</h4><p><strong>(1)TaskScheduler提交Tasks</strong></p><p> TaskScheduler#submitTasks方法主要作用是将TaskSet加入到TaskSetManager中进行管理。</p><pre name="code" class="plain">override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//创建TaskSetManager,并设置最大失败重试次数
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
//记录Stage中提交的TaskSetManager
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
//如果重复提交同一个TaskSet或者Tasks不在当前的TaskSet中则会报错
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
//添加TaskManager到调度队列中,schedulableBuilder是应用程序级别的调度器
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) //1
//为慢任务启动备份任务
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//调用SparkDeploySchedulerBackend分配具体计算资源
backend.reviveOffers() //2
}
& Task执行提交失败后会重试,Task的默认重试次数为4次。
& def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures",4)) (TaskSchedulerImpl)
(2)添加TaskSetManager
SchedulerBuilder(根据SchedulerMode的不同,FIFO与FAIR实现不同)#addTaskSetManger方法会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在那个ExecutorBackend中。
& 默认的调度顺序为FIFO;Spark应用程序目前支持两种调度模式FIFO和FAIR可以通过Spark-env.sh中的Spark.Scheduler.mode来进行具体的设置。
// default scheduler is FIFO
private valschedulingModeConf = conf.get("spark.scheduler.mode","FIFO") (TaskSchedulerImpl)
//在1处调用addTaskSetManager def addTaskSetManager(manager: Schedulable, properties: Properties)
//FIFO模式下的,addTaskSetManager override def addTaskSetManager(manager: Schedulable, properties: Properties) { rootPool.addSchedulable(manager) }
//直接将可调度对象TaskSetManager加入SchedulerQueue的尾端。 override def addSchedulable(schedulable: Schedulable) { require(schedulable != null) schedulableQueue.add(schedulable) schedulableNameToSchedulable.put(schedulable.name, schedulable) schedulable.parent = this }
//FAIR模式下,addTaskSetManager override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME //获得根节点的默认调度池的引用 var parentPool = rootPool.getSchedulableByName(poolName) if (properties != null) { //根据优先级获得父可调度对象pool的引用。 poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) if (parentPool == null) { //如果父可调度对象不存在,则根据应用程序配置信息创建之 parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) //做为根节点default pool的孩子加入default pool中 rootPool.addSchedulable(parentPool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } //与FIFO类似,在每个父pool中采用队列形式,将TaskSetManager加入队尾。 parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) }
(1) CoarseGrainedSchedulerBackend分配资源
CoarseGrainedSchedulerBackend#reviveOffers方法,给DriverEndpoint发送ReviveOffers消息;ReviveOffers本身是一个空的case object对象,只是起到触发底层调度的左右,在有Task提交或者资源变动时,就会发送ReviveOffers消息。每提交一个Stage都要申请一次资源,发送一个ReviveOffers消息。
//2 处被调用的方法 override def reviveOffers() { driverEndpoint.send(ReviveOffers) }
& ReviveOffers相当于触发器,在资源变化时触发。
& TaskScheduler要负责为Task分配计算资源(分配的是程序启动时向Master申请的集群资源),根据计算的本地性原则确定Task具体要运行在哪个ExecutorBackend中。
(4)接收ReviveOffers消息与分配资源
在DriverEndpoint接收ReviveOffers消息并路由到MakeOffers方法中;在MakeOffers方法中首先准备好所有可用于计算的workOffers(代表了应用程序从Master获得的Executor中所有可用的Core信息)。
//CoarseGrainedSchedulerBackend.DriverEndpoint#receive override def receive: PartialFunction[Any, Unit] = { //省略部分代码 case ReviveOffers => makeOffers() }
//在逻辑上,让所有Executor都成为计算资源的提供者 private def makeOffers() { // 过滤掉挂掉的Executor val activeExecutors = executorDataMap.filterKeys(executorIsAlive) //生成有所有aliver的Executor元信息组成的序列 val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq //生成资源分配的二位数组,并以此为基础进行Tasks加载、执行 launchTask(scheduler.resourceOffers(workOffers)) //3 4 }
(a)resourceOffers方法
调用TaskSchedulerImpl#resourceOffers方法,为每一个Task具体分配计算资源,其输入是ExecutorBackend机器上可用的cores,输出是TaskDescription二维数组,在其中定义了每个Task具体运行在在哪个ExecutorBackend。
//3处调用resourceOffers,该方法输入为一个Executor的列表,输出为一个 //TaskDescription的二位数组 def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // 每个slave节点为alive并且记录其hostname // 如果有新的slave节点加入,对其进行追踪。 var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host executorIdToTaskCount.getOrElseUpdate(o.executorId, 0) if (!executorsByHost.contains(o.host)) { //如果有新的Executors即新的slave节点加入 executorsByHost(o.host) = new HashSet[String]() //通知DAGScheduler添加Executors executorAdded(o.executorId, o.host) //标记有新的Executor可用 newExecAvail = true } //更新可用的节点信息 for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } //利用随机打乱offers的方式(round-robin manner)分配计算资源Executor,避免了Task //集中分配到某些机器上。 val shuffledOffers = Random.shuffle(offers) //为每一个worker创建一个tasks分配的列表,参见下图 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray //获取按照调度策略排好序的TaskSetManager val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { //如果有新的slave中Executor可用,需要重新计算该TaskSetManager的就近原则 taskSet.executorAdded() } } //为从rootPool里获取的TaskSetManager列表分配资源。分配的原则是就近原则,优先分配顺 //序为PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) 〖{ 〗^⑦ do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
图36-1 worker与tasks及availableCpus对应关系
& TaskDescription中以确定好Task具体运行在那个ExecutorBackend上;而确定Task具体运行在那个ExecutorBackend的算法是由TaskSetManager的resourceOffers方法决定。
//为每一个TaskSetManager分配资源 private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality: TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int], tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask = false //顺序遍历当前存在的Executor for (i <- 0 until shuffledOffers.size) { //获得Executor的Id和hostname val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host //该Executor可以被分配任务核心实现,通过调用TaskSetManager来为Executor分配Task if (availableCpus(i) >= CPUS_PER_TASK) { //保证可用的Cores个数不小于Task运行所需的最小cores个数,CPUS_PER_TASK默认 //为1 try { //获取最高级别的本地性,并记录executor与Task的运行对应关系 for (task <- taskSet.resourceOffer(execId, host, maxLocality)^?) { tasks(i) += task val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToTaskCount(execId) += 1 executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } } catch { //Task未进行序列化 case e: TaskNotSerializableException => logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") return launchedTask } } } //返回信息:Task是否已分配好资源。 return launchedTask }
resourceOffers算法思想
ResourceOffers确定了Task具体运行在哪个ExecutorBackend上的。算法实现具体如下:
a) 通过Random#shuffle,将计算资源重新洗牌,以寻求计算子奥运的负载均衡。
b) 根据每个ExecutorBackend的cores个数,声明类型为TaskDescription的ArrayBuffer数组。
c) 如果有新的ExecutorBackend分配给我们的Job,此时会调用ExecutorAdded来获得完整的可用的计算资源。
& 这里说一下,数据本地性的级别(Locality level)的由高到低优先级的次序:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,其中NO_PREF是指机器本地性。RACK_LOCAL是机架本地性。
d) 通过下述代码追踪最高级别的的本地性。(见7)
for (taskSet <-sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
每个Task默认采用一个线程来进行计算的。
// 执行Task默认需要一个cores,即一个线程。 val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
e) 通过调用用TaskSetManager#resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend以及具体的Locality level。
//p121 3处调用 TaskSetManager#resourceOffer def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = {//不是僵尸TaskSet,即还可以提交Task的TaskSet if (!isZombie) { val curTime = clock.getTimeMillis() //获得当前的最大本地性级别 var allowedLocality = maxLocality //如果最大本地性级别不是机器本地性 if (maxLocality != TaskLocality.NO_PREF) { //重新计算当前时间节点的最高本地性级别,由于存在延迟调度,所以我们需要根据基于等 //待时间的延迟调度算法来获取当前的本地性。 allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { //如果得到的加载Task本地性低于原来的maxLocality,则将Task加载的本地性置为 //maxLocality allowedLocality = maxLocality } } //根据不同的Task的本地性级别进行不同的处理。 dequeueTask(execId, host, allowedLocality) match { //index表示Task在TaskSet中的下标,taskLocality:本地性,speculative:表示是否 //是投机产生的,由于其他Task已排定,进而确定该Task。 case Some((index, taskLocality, speculative)) => { // 为Task找到一个executor(也可以认为是为当前executor找到了TaskSet中一个 //Task),对Task返回信息进行一些登记处理 //在TaskSet中找到这个Task val task = tasks(index) //创建Task的id val taskId = sched.newTaskId() // Do various bookkeeping (???) copiesRunning(index) += 1 //设置尝试提交次数 val attemptNum = taskAttempts(index).size //实例化Task的元信息 val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // 为延迟调度策略更新本地性级别 // NO_PREF不会影响延迟调度相关的变量 if (maxLocality != TaskLocality.NO_PREF) { currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime } //序列化并返回Task val startTime = clock.getTimeMillis() val serializedTask: ByteBuffer = try { Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) } catch { //task序列化失败,则丢弃整个TaskSet case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) abort(s"$msg Exception during serialization: $e") throw new TaskNotSerializableException(e) } //Task广播时序列化的大小限制(为什么后面还要序列化,这是广播作用??) if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && !emittedTaskSizeWarning) { emittedTaskSizeWarning = true logWarning(s"Stage ${task.stageId} contains a task of very large size " + s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") } //Task加入到运行Task队列中 addRunningTask(taskId) //序列化一些日志处理。 val taskName = s"task ${info.id} in stage ${taskSet.id}" logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," + s"$taskLocality, ${serializedTask.limit} bytes)") //向高层调度器DAGScheduler报告Task开始执行 sched.dagScheduler.taskStarted(task, info) //返回封装了TaskDescription的Some类 return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, taskName, index, serializedTask)) } case _ => } } None }
& DAGScheduler是从数据(存储)层面考虑preferedLocation的,而TaskScheduler则是从具体计算Task的角度考虑计算的本地性的。
f)通过Lanch Task把任务发送给ExecutorBackend去执行。(见(4)中代码注释4?处)
// Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { //对tasks中的所有task进行序列化 val serializedTask = ser.serialize(task) //序列化后的Task的限制 if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " + "spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) //Task大小超出限制,丢弃TaskSet taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else {//否则,Task大小符合要求 //更新executor信息 val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK //发送Task序列化后的Task给executor。 executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
& 对于任务大小的设置:Task进行广播时,AkkaFrameSize大小是128MB,Akka保留的字节大小为200k,如果任务大于或等于128MB-200K的话,则任务会直接被丢弃掉;如果小于128MB-200K的话会通过CoarseGrainedSchedulerBackend去LaunchTask到具体的ExecutorBackend上。
至此,在Driver端的处理完成了,下一节将会讲解ExecutorBackend端接收Task后的处理。(可参照第35课图)
说明:
本文是由DT大数据梦工厂的IFM课程第36课为基础所做的笔记