前面提到,submitMissingTask是分发任务的开始,首先submitMissingTasks判断该stage是否为shuffle map stage,是则getPreferredLocs,实例化一个ShuffleMapTasks返回一组task集合,否则是final stage,getPreferredLocs,实例化Result Task返回一组tasks集合;向listenerBus发送SparkListenerStageSubmitted事件;提前序列化一个task以保证其可以被序列化;最后taskScheduler.submitTasks提交TaskSet
/** Called when stage‘s parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks(" + stage + ")") // Get our pending tasks and remember them in our pendingTasks entry val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) myPending.clear() var tasks = ArrayBuffer[Task[_]]() if (stage.isShuffleMap) { for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { val locs = getPreferredLocs(stage.rdd, p) tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } else { // This is a final stage; figure out its job‘s missing partitions val job = resultStageToJob(stage) for (id <- 0 until job.numPartitions if !job.finished(id)) { val partition = job.partitions(id) val locs = getPreferredLocs(stage.rdd, partition) tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } val properties = if (jobIdToActiveJob.contains(jobId)) { jobIdToActiveJob(stage.jobId).properties } else { // this stage will be assigned to "default" pool null } // must be run listener before possible NotSerializableException // should be "StageSubmitted" first and then "JobEnded" listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) if (tasks.size > 0) { // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception // down the road, where we have several different implementations for local scheduler and // cluster schedulers. try { SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head) } catch { case e: NotSerializableException => abortStage(stage, "Task not serializable: " + e.toString) runningStages -= stage return } logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) } else { logDebug("Stage " + stage + " is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) runningStages -= stage } }
taskScheduler.submitTasks,实例化TaskSetManager,记录activeTaskSets,判断任务isLocal且hasReceivedTask,启动一个Timer函数,以一定的时间间隔提交task,最后执行backend.reviveOffers
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { val manager = new TaskSetManager(this, taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient memory") } else { this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) } hasReceivedTask = true } backend.reviveOffers() }
CoarseGrainedSchedulerBackend继承schedulerBackend,重载reviveOffers,向driverActor发送ReviveOffers消息
override def reviveOffers() { driverActor ! ReviveOffers }
定义receive接收,接着执行makeOffers
def receive = { case RegisterExecutor(executorId, hostPort, cores) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorActor.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else { logInfo("Registered executor: " + sender + " with ID " + executorId) sender ! RegisteredExecutor(sparkProperties) executorActor(executorId) = sender executorHost(executorId) = Utils.parseHostPort(hostPort)._1 totalCores(executorId) = cores freeCores(executorId) = cores executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) makeOffers() } case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { freeCores(executorId) += scheduler.CPUS_PER_TASK makeOffers(executorId) } else { // Ignoring the update since we don‘t know about the executor. val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s" logWarning(msg.format(taskId, state, sender, executorId)) } } case ReviveOffers => makeOffers() case KillTask(taskId, executorId, interruptThread) => executorActor(executorId) ! KillTask(taskId, executorId, interruptThread) case StopDriver => sender ! true context.stop(self) case StopExecutors => logInfo("Asking each executor to shut down") for (executor <- executorActor.values) { executor ! StopExecutor } sender ! true case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true case DisassociatedEvent(_, address, _) => addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated")) }
makeoffers发现空闲资源,并launchTasks
// Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers( executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) }
首先看WorkerOffers,查看worker中executor可用的资源,以freeCores计量
/** * Represents free resources available on an executor. */ private[spark] case class WorkerOffer(executorId: String, host: String, cores: Int)
再看scheduler.resourceOffers,标记可用的worker记录hostname,打乱资源offers避免将任务分发到相同的worker集,按照调度顺序为TaskSets分配资源,并Locality按递增顺序(round-robin)为其分配每一个node,这样可以让其有机会本地执行
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers. val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { do { launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert (availableCpus(i) >= 0) launchedTask = true } } } } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
taskSet.resourceOffer回应单独executor的一个offer,对于一个task,分配一些资源并返回TaskDescription,根据Locality级别延迟调度
/** * Respond to an offer of a single executor from the scheduler by finding a task */ def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { if (!isZombie) { val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) if (allowedLocality > maxLocality) { allowedLocality = maxLocality // We‘re not allowed to search for farther-away tasks } findTask(execId, host, allowedLocality) match { case Some((index, taskLocality)) => { // Found a task; do some bookkeeping and return a task description val task = tasks(index) val taskId = sched.newTaskId() // Figure out whether this should count as a preferred launch logInfo("Starting task %s:%d as TID %s on executor %s: %s (%s)".format( taskSet.id, index, taskId, execId, host, taskLocality)) // Do various bookkeeping copiesRunning(index) += 1 val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality) taskInfos(taskId) = info taskAttempts(index) = info :: taskAttempts(index) // Update our locality level for delay scheduling currentLocalityIndex = getLocalityIndex(taskLocality) lastLaunchTime = curTime // Serialize and return the task val startTime = clock.getTime() // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here // we assume the task can be serialized without exceptions. val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime addRunningTask(taskId) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) } case _ => } } None }
值得注意的是,依赖文件和Jar包的添加也在其中
val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime addRunningTask(taskId)
准备完成提交Task,向executorActor(task.executorID)发送LaunchTask(task)消息,即将task发送到分配的executor上执行
// Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= scheduler.CPUS_PER_TASK executorActor(task.executorId) ! LaunchTask(task) } }
Task创建分发完成。
END