spark 笔记 14: spark中的delay scheduling实现

延迟调度算法的实现是在TaskSetManager类中的,它通过将task存放在四个不同级别的hash表里,当有可用的资源时,resourceOffer函数的参数之一(maxLocality)就是这些资源的最大(或者最优)locality级别,如果存在task满足资源的locality,那从最优级别的hash表。也就是task和excutor都有loclity级别,如果能找到匹配的task,那从匹配的task中找一个最优的task。

=====================延迟调度算法=============================

->TaskSetManager::resourceOffer(execId: String, host: String,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription]

->if (maxLocality != TaskLocality.NO_PREF) --如果资源是有locality特征的

->allowedLocality = getAllowedLocalityLevel(curTime) --获取当前taskSet允许执行的locality。getAllowedLocalityLevel随时间而变化

->if (allowedLocality > maxLocality)  --如果资源的locality级别高于taskSet允许的级别

->allowedLocality = maxLocality --那么提升taskSet的级别

->task =  findTask(execId, host, allowedLocality) --根据允许的locality级别去找一个满足要求的task

->从最优的locality级别(process_local)开始找,返回一个满足locolity的task(为最优级别)

->task match case Some((index, taskLocality, speculative)) --找到了一个task

-> val info = new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative)

->if (maxLocality != TaskLocality.NO_PREF) // NO_PREF will not affect the variables related to delay scheduling

->currentLocalityIndex = getLocalityIndex(taskLocality) // Update our locality level for delay scheduling

->lastLaunchTime = curTime --更新最近执行task的时间,计算当前locality时需要

->addRunningTask(taskId) --加入执行task中

->logInfo("Starting %s (TID %d, %s, %s, %d bytes)"

->sched.dagScheduler.taskStarted(task, info) --通知调度器有task开始运行

->eventProcessActor ! BeginEvent(task, taskInfo)

->return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) --返回task

->case _ => return None --没有满足locality要求的task,返回None

=====================end==================================

myLocalityLevels :记录当前所有有效的locality级别

localityWaits :记录不同locality级别的等待时间

currentLocalityIndex :当前的locality级别,随着等待时间而不断变化

pendingTasksForExecutor: PROCESS_LOCAL进程级别的task

pendingTasksForHost :NODE_LOCAL主机界别的task

pendingTasksForRack :机架级别的task

pendingTasksWithNoPrefs :没有locality要求的task

// Figure out which locality levels we have in our TaskSet, so we can do delay schedulingvar myLocalityLevels = computeValidLocalityLevels()var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
// Delay scheduling variables: we keep track of our current locality level and the time we// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.// We then move down if we manage to launch a "more local" task.var currentLocalityIndex = 0    // Index of our current locality level in validLocalityLevels
// Set of pending tasks for each executor. These collections are actually// treated as stacks, in which new tasks are added to the end of the// ArrayBuffer and removed from the end. This makes it faster to detect// tasks that repeatedly fail because whenever a task failed, it is put// back at the head of the stack. They are also only cleaned up lazily;// when a task is launched, it remains in all the pending lists except// the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,// but at host level.private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each rack -- similar to the above.private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

计算当前调度器中有效的locality级别

var lastLaunchTime = clock.getTime()  // Time we last launched a task at this level/** * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been * added to queues using addPendingTask. * */private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {  import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}  val levels = new ArrayBuffer[TaskLocality.TaskLocality]  if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&      pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {    levels += PROCESS_LOCAL  }  if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&      pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {    levels += NODE_LOCAL  }  if (!pendingTasksWithNoPrefs.isEmpty) {    levels += NO_PREF  }  if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&      pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {    levels += RACK_LOCAL  }  levels += ANY  logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))  levels.toArray}

获取每个locality级别的等待时间

private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {  val defaultWait = conf.get("spark.locality.wait", "3000")  level match {    case TaskLocality.PROCESS_LOCAL =>      conf.get("spark.locality.wait.process", defaultWait).toLong    case TaskLocality.NODE_LOCAL =>      conf.get("spark.locality.wait.node", defaultWait).toLong    case TaskLocality.RACK_LOCAL =>      conf.get("spark.locality.wait.rack", defaultWait).toLong    case _ => 0L  }}

locality的级别定义

@DeveloperApiobject TaskLocality extends Enumeration {  // Process local is expected to be used ONLY within TaskSetManager for now.  val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value

type TaskLocality = Value

def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {    condition <= constraint  }}

根据输入的locality级别,获取一个在本taskSet有效的locality级别。因为当前taskSet可能有一些级别没有task。向低优先级的靠拢的原则。

/** * Find the index in myLocalityLevels for a given locality. This is also designed to work with * localities that are not in myLocalityLevels (in case we somehow get those) by returning the * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY. */def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = {  var index = 0  while (locality > myLocalityLevels(index)) {    index += 1  }  index}

获取当前允许的locality级别。它通过已经等待的时间和需要等待的时间做比较得到当前处于什么样的locality级别中。

/** * Get the level we can launch tasks according to delay scheduling, based on current wait time. */private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {  while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&      currentLocalityIndex < myLocalityLevels.length - 1)  {    // Jump to the next locality level, and remove our waiting time for the current one since    // we don‘t want to count it again on the next one    lastLaunchTime += localityWaits(currentLocalityIndex)    currentLocalityIndex += 1  }  myLocalityLevels(currentLocalityIndex)}

当一个task得到执行后,重新初始化locality级别

def recomputeLocality() {  val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)  myLocalityLevels = computeValidLocalityLevels()  localityWaits = myLocalityLevels.map(getLocalityWait)  currentLocalityIndex = getLocalityIndex(previousLocalityLevel)}

获取本taskSet有效的locality级别

/** * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been * added to queues using addPendingTask. * */private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {  import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}  val levels = new ArrayBuffer[TaskLocality.TaskLocality]  if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&      pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {    levels += PROCESS_LOCAL  }  if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&      pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {    levels += NODE_LOCAL  }  if (!pendingTasksWithNoPrefs.isEmpty) {    levels += NO_PREF  }  if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&      pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {    levels += RACK_LOCAL  }  levels += ANY  logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))  levels.toArray}

查找一个可符合locality要求的task。从最优的locality开始找。所以最优的locality总是优先被执行。

/** * Dequeue a pending task for a given node and return its index and locality level. * Only search for tasks matching the given locality constraint. * * @return An option containing (task index within the task set, locality, is speculative?) */private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value)  : Option[(Int, TaskLocality.Value, Boolean)] ={  for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {    return Some((index, TaskLocality.PROCESS_LOCAL, false))  }。。。  // find a speculative task if all others tasks have been scheduled  findSpeculativeTask(execId, host, maxLocality).map {    case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}

来自为知笔记(Wiz)

时间: 2024-10-12 19:10:34

spark 笔记 14: spark中的delay scheduling实现的相关文章

spark 笔记 3:Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling

spark论文中说他使用了延迟调度算法,源于这篇论文:http://people.csail.mit.edu/matei/papers/2010/eurosys_delay_scheduling.pdf 同时它也是hadoop的调度算法. Abstract delay scheduling: when the job that should be scheduled next according to fairness cannot launch a local task, it waits f

spark 笔记 2: Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing

http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf ucb关于spark的论文,对spark中核心组件RDD最原始.本质的理解,没有比这个更好的资料了.必读. Abstract RDDs provide a restricted form of shared memory, based on coarse grained transformations rather than fine-grained updates to s

spark 笔记 10: TaskScheduler相关

任务调度器的接口类.应用程序可以定制自己的调度器来执行.当前spark只实现了一个任务调度器TaskSchedulerImpl ===================task scheduler begin==================== -> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作. -> new TaskSetManager(this, taskSet, maxTaskFailures)

spark 笔记 7: DAGScheduler

在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的.这是一个很重要的类.在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了.另外,应该先看看DAG相关的概念

Spark笔记——技术点汇总

目录 · 概况 · 手工搭建集群 · 引言 · 安装Scala · 配置文件 · 启动与测试 · 应用部署 · 部署架构 · 应用程序部署 · 核心原理 · RDD概念 · RDD核心组成 · RDD依赖关系 · DAG图 · RDD故障恢复机制 · Standalone模式的Spark架构 · YARN模式的Spark架构 · 应用程序资源构建 · API · WordCount示例 · RDD构建 · RDD缓存与持久化 · RDD分区数 · 共享变量 · RDD Operation · R

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

spark学习笔记总结-spark入门资料精化

Spark学习笔记 Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用.减少了延时处理,提高性能效率实用灵活性.也可以与hadoop切实相互结合. spark核心部分分为RDD.Spark SQL.Spark Streaming.MLlib.GraphX.Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎.其相应的生态环境包括zepplin等可视化方面

spark 笔记 8: Task/TaskContext

DAGScheduler最终创建了task set,并提交给了taskScheduler.那先得看看task是怎么定义和执行的. Task是execution执行的一个单元. /** * A unit of execution. We have two kinds of Task's in Spark: * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] *

二、spark入门之spark shell:文本中发现5个最常用的word

scala> val textFile = sc.textFile("/Users/admin/spark-1.5.1-bin-hadoop2.4/README.md") scala> val topWord = textFile.flatMap(_.split(" ")).filter(!_.isEmpty).map((_,1)).reduceByKey(_+_).map{case (word,count) =>(count,word)}.sor