任务调度器的接口类。应用程序可以定制自己的调度器来执行。当前spark只实现了一个任务调度器TaskSchedulerImpl
===================task scheduler begin====================
-> TaskSchedulerImpl::submitTasks(taskSet: TaskSet) 处理接受task,它做了同步操作。
-> new TaskSetManager(this, taskSet, maxTaskFailures) 每个taskSet都会对应 一个TaskSetManager接管任务调度
->activeTaskSets(taskSet.id) = manager 记录为active taskSet
->schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) --将taskManeger提交到调度器。
-> starvationTimer.scheduleAtFixedRate( --起一个防饿死的定时器,默认15s后提示错误
->new TimerTask()
->backend.reviveOffers() backend=(CoarseGrainedSchedulerBackend) --通知后台调度器,现在需要资源
->driverActor ! ReviveOffers
->makeOffers
->tasks = scheduler.resourceOffers(offers: Seq[WorkerOffer])-- 将所有资源作为参数
返回值Seq[Seq[TaskDescription]]为每个executor要执行的task列表
->val shuffledOffers = Random.shuffle(offers) --将所有资源随机排列后使用
->val sortedTaskSets = rootPool.getSortedTaskSetQueue --按优先级顺序获取要执行的taskSet序列
->schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
将所有taskSet按优先级排列,后面专门再看排列的算法
->for (taskSet <- sortedTaskSets)
->taskSet.executorAdded()
->for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels)
->until (launchedTask) --每个taskSet只要得到一个资源,就不再获取资源了
->for (i <- 0 until shuffledOffers.size) --查找所有可用的资源
->for (task <- taskSet.resourceOffer(execId, host, maxLocality)) --对taskSet给出的所有task,都记录为需要执行
->launchTasks(tasks: scala.Seq[scala.Seq[TaskDescription]]) --Launch tasks returned by a set of resource offers
->for (task <- tasks.flatten)
->//如果task过大(默认10m),直接失败退出)
->executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
给调度到task的executor发送task,由Executor接管task的执行。到此,离最后的task.run()不远了~~
==================end================================
调度的过程和常理有点不同:我们一般认为是拿task去寻找合适的资源,但是spark中是拿资源去寻找合适的task。这可能是因为task(由RDD分解而来)是不变的,但是资源是不断变化的缘故。
TaskScheduler :调度接口,当前只有TaskSchedulerImpl实现了。应用可以提供自己的调度器。
/** * Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl. * This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks * for a single SparkContext. These schedulers get sets of tasks submitted to them from the * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running * them, retrying if there are failures, and mitigating stragglers. They return events to the * DAGScheduler. */private[spark] trait TaskScheduler { def rootPool: Pool def schedulingMode: SchedulingMode def start(): Unit // Invoked after system has successfully initialized (typically in spark context). // Yarn uses this to bootstrap allocation of resources based on preferred locations, // wait for slave registerations, etc. def postStartHook() { } // Disconnect from the cluster. def stop(): Unit // Submit a sequence of tasks to run. def submitTasks(taskSet: TaskSet): Unit // Cancel a stage. def cancelTasks(stageId: Int, interruptThread: Boolean) // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called. def setDAGScheduler(dagScheduler: DAGScheduler): Unit // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int /** * Update metrics for in-progress tasks and let the master know that the BlockManager is still * alive. Return true if the driver knows about the given block manager. Otherwise, return false, * indicating that the block manager should re-register. */ def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)], blockManagerId: BlockManagerId): Boolean}
/** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. * It can also work with a local setup by using a LocalBackend and setting isLocal to true. * It handles common logic, like determining a scheduling order across jobs, waking up to launch * speculative tasks, etc. * * Clients should first call initialize() and start(), then submit task sets through the * runTasks method. * * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple * threads, so it needs locks in public API methods to maintain its state. In addition, some * SchedulerBackends synchronize on themselves when they want to send events here, and then * acquire a lock on us, so we need to make sure that we don‘t try to lock the backend while * we are holding a lock on ourselves. */private[spark] class TaskSchedulerImpl( val sc: SparkContext, val maxTaskFailures: Int, isLocal: Boolean = false) extends TaskScheduler with Logging
主要属性:
// TaskSetManagers are not thread safe, so any access to one should be synchronized// on this class.val activeTaskSets = new HashMap[String, TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String]val taskIdToExecutorId = new HashMap[Long, String]
// Which executor IDs we have executors onval activeExecutorIds = new HashSet[String] // The set of executors we have on each host; this is used to compute hostsAlive, which// in turn is used to decide when we can attain data locality on a given hostprotected val executorsByHost = new HashMap[String, HashSet[String]] protected val hostsByRack = new HashMap[String, HashSet[String]] protected val executorIdToHost = new HashMap[String, String]
// Listener object to pass upcalls intovar dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker var schedulableBuilder: SchedulableBuilder = nullvar rootPool: Pool = null
初始化不同的调度算法
def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools()}
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = new TaskSetManager(this, taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient memory") } else { this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) } hasReceivedTask = true } backend.reviveOffers()}
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
TaskSetManager和TaskSet是意义对应的,它们可以等同看待。resourceOffer是他的主要功能:根据输入的资源,给出taskSet需要执行的task列表。
/** * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of * each task, retries tasks if they fail (up to a limited number of times), and * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished). * * THREADING: This class is designed to only be called from code with a lock on the * TaskScheduler (e.g. its event handlers). It should not be called from other threads. * * @param sched the TaskSchedulerImpl associated with the TaskSetManager * @param taskSet the TaskSet to manage scheduling for * @param maxTaskFailures if any particular task fails more than this number of times, the entire * task set will be aborted */private[spark] class TaskSetManager( sched: TaskSchedulerImpl, val taskSet: TaskSet, val maxTaskFailures: Int, clock: Clock = SystemClock) extends Schedulable with Logging {
val copiesRunning = new Array[Int](numTasks)val successful = new Array[Boolean](numTasks)private val numFailures = new Array[Int](numTasks)// key is taskId, value is a Map of executor id to when it failedprivate val failedExecutors = new HashMap[Int, HashMap[String, Long]]() val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)var tasksSuccessful = 0
// Set of pending tasks for each executor. These collections are actually// treated as stacks, in which new tasks are added to the end of the// ArrayBuffer and removed from the end. This makes it faster to detect// tasks that repeatedly fail because whenever a task failed, it is put// back at the head of the stack. They are also only cleaned up lazily;// when a task is launched, it remains in all the pending lists except// the one that it was launched from, but gets removed from them later.private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] // Set of pending tasks for each host. Similar to pendingTasksForExecutor,// but at host level.private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] // Set of pending tasks for each rack -- similar to the above.private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]] // Set containing pending tasks with no locality preferences.var pendingTasksWithNoPrefs = new ArrayBuffer[Int] // Set containing all pending tasks (also used as a stack, as above).val allPendingTasks = new ArrayBuffer[Int] // Tasks that can be speculated. Since these will be a small fraction of total// tasks, we‘ll just hold them in a HashSet.val speculatableTasks = new HashSet[Int] // Task index, start and finish time for each task attempt (indexed by task ID)val taskInfos = new HashMap[Long, TaskInfo]
// Figure out which locality levels we have in our TaskSet, so we can do delay schedulingvar myLocalityLevels = computeValidLocalityLevels()var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level // Delay scheduling variables: we keep track of our current locality level and the time we// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.// We then move down if we manage to launch a "more local" task.var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevelsvar lastLaunchTime = clock.getTime() // Time we last launched a task at this level
/** * Return a speculative task for a given executor if any are available. The task should not have * an attempt running on this host, in case the host is slow. In addition, the task should meet * the given locality constraint. */private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] =
调度器眼中的executor: 调度的过程和常理有点不同:我们一般认为是拿task去寻找合适的资源,但是spark中是拿资源去寻找合适的task。这可能是因为task(由RDD分解而来)是不变的,但是资源是不断变化的缘故。
/*** Respond to an offer of a single executor from the scheduler by finding a task * * NOTE: this function is either called with a maxLocality which * would be adjusted by delay scheduling algorithm or it will be with a special * NO_PREF locality which will be not modified * * @param execId the executor Id of the offered resource * @param host the host Id of the offered resource * @param maxLocality the maximum locality we want to schedule the tasks at */def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] ={
SchedulableBuilder定义了FIFO/FAIR两种调度算法。他们都是对Pool的封装,所以重点是Pool
/** * An interface to build Schedulable tree * buildPools: build the tree nodes(pools) * addTaskSetManager: build the leaf nodes(TaskSetManagers) */private[spark] trait SchedulableBuilder { def rootPool: Pool def buildPools() def addTaskSetManager(manager: Schedulable, properties: Properties)}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder with Logging {
Pool: 对应文档中调度的group。每个Pool可以设置独立的调度算法。
/** * An Schedulable entity that represent collection of Pools or TaskSetManagers */ private[spark] class Pool( val poolName: String, val schedulingMode: SchedulingMode, initMinShare: Int, initWeight: Int) extends Schedulable with Logging {
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
Schedulable: 调度器操作的对象需要实现这个接口,也就是实现这个接口的类都可以交给调度器根据优先级调度。就像Task对应executor一样。getSortedTaskSetQueue是主要方法,它按优先级顺序将task返回给外层调度器。
/** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */private[spark] trait Schedulable { var parent: Pool // child queues def schedulableQueue: ConcurrentLinkedQueue[Schedulable] def schedulingMode: SchedulingMode def weight: Int def minShare: Int def runningTasks: Int def priority: Int def stageId: Int def name: String def addSchedulable(schedulable: Schedulable): Unit def removeSchedulable(schedulable: Schedulable): Unit def getSchedulableByName(name: String): Schedulable def executorLost(executorId: String, host: String): Unit def checkSpeculatableTasks(): Boolean def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]}
CoarseGrainedSchedulerBackend:它是屏蔽各种资源管理算法的抽象。local、standalone、yarn-client等都是它的子类。而spark内部仅与CoarseGrainedSchedulerBackend交互,所以说他起了屏蔽左右。
/** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. * This backend holds onto each executor for the duration of the Spark job rather than relinquishing * executors whenever a task is done and asking the scheduler to launch a new executor for * each new task. Executors may be launched in a variety of ways, such as Mesos tasks for the * coarse-grained Mesos mode or standalone processes for Spark‘s standalone deploy mode * (spark.deploy.*). */private[spark]class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem) extends SchedulerBackend with Logging
主要属性:
// Use an atomic variable to track total number of cores in the cluster for simplicity and speedvar totalCoreCount = new AtomicInteger(0)var totalRegisteredExecutors = new AtomicInteger(0)val conf = scheduler.sc.confprivate val timeout = AkkaUtils.askTimeout(conf)private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)// Submit tasks only after (registered resources / total expected resources)// is equal to at least this value, that is double between 0 and 1.var minRegisteredRatio = math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))// Submit tasks after maxRegisteredWaitingTime milliseconds// if minRegisteredRatio has not yet been reachedval maxRegisteredWaitingTime = conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)val createTime = System.currentTimeMillis() class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive { override protected def log = CoarseGrainedSchedulerBackend.this.log private val executorActor = new HashMap[String, ActorRef] private val executorAddress = new HashMap[String, Address] private val executorHost = new HashMap[String, String] private val freeCores = new HashMap[String, Int] private val totalCores = new HashMap[String, Int] private val addressToExecutorId = new HashMap[Address, String]