在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的。这是一个很重要的类。在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了。另外,应该先看看DAG相关的概念和论文 http://en.wikipedia.org/wiki/Directed_acyclic_graph http://www.netlib.org/utk/people/JackDongarra/PAPERS/DAGuE_technical_report.pdf
/** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a * minimal schedule to run the job. It then submits stages as TaskSets to an underlying * TaskScheduler implementation that runs them on the cluster. * * In addition to coming up with a DAG of stages, this class also determines the preferred * locations to run each task on, based on the current cache status, and passes these to the * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * */
package org.apache.spark.schedulerprivate[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
extends Logging {
状态机(actor 消息响应):
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { // set DAGScheduler for taskScheduler to ensure eventProcessActor is always // valid when the messages arrive dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }
重要的属性:
private val nextStageId = new AtomicInteger(0)private[scheduler] val nextJobId = new AtomicInteger(0)private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
// Stages we need to run whose parents aren‘t doneprivate[scheduler] val waitingStages = new HashSet[Stage]// Stages we are running right nowprivate[scheduler] val runningStages = new HashSet[Stage]// Stages that must be resubmitted due to fetch failuresprivate[scheduler] val failedStages = new HashSet[Stage]private[scheduler] val activeJobs = new HashSet[ActiveJob]// Contains the locations that each RDD‘s partitions are cached onprivate val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
private val dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
// A closure serializer that we reuse.// This is only safe because DAGScheduler runs in a single thread.private val closureSerializer = SparkEnv.get.closureSerializer.newInstance() private[scheduler] var eventProcessActor: ActorRef = _
/** * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object * can be used to block until the the job finishes executing or can be used to cancel the job. */def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, allowLocal: Boolean, resultHandler: (Int, U) => Unit, properties: Properties = null): JobWaiter[U] ={ // Check to make sure we are not launching a task on a partition that does not exist. val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessActor ! JobSubmitted( jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) waiter }
/** * Run a job on an RDD locally, assuming it has only a single partition and no dependencies. * We run the operation in a separate thread just in case it takes a bunch of time, so that we * don‘t block the DAGScheduler event loop or other concurrent jobs. */protected def runLocally(job: ActiveJob) { logInfo("Computing the requested partition locally") new Thread("Local computation of job " + job.jobId) { override def run() { runLocallyWithinThread(job) } }.start()}
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, listener: JobListener, properties: Properties = null){
/** Submits stage, but first recursively submits any missing parents. */private def submitStage(stage: Stage) {
/** Called when stage‘s parents are available and we can now do its task. */private def submitMissingTasks(stage: Stage, jobId: Int) {
/** Finds the earliest-created active job that needs the stage */// TODO: Probably should actually find among the active jobs that need this// stage the one with the highest priority (highest-priority pool, earliest created).// That should take care of at least part of the priority inversion problem with// cross-job dependencies.private def activeJobForStage(stage: Stage): Option[Int] = { val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted jobsThatUseStage.find(jobIdToActiveJob.contains)}
/** * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue * architecture where any thread can post an event (e.g. a task finishing or a new job being * submitted) but there is a single "logic" thread that reads these events and takes decisions. * This greatly simplifies synchronization. */private[scheduler] sealed trait DAGSchedulerEvent
/** * Asynchronously passes SparkListenerEvents to registered SparkListeners. * * Until start() is called, all posted events are only buffered. Only after this listener bus * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). */private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
/** * A SparkListenerEvent bus that relays events to its listeners */private[spark] trait SparkListenerBus extends Logging { // SparkListeners attached to this event bus protected val sparkListeners = new ArrayBuffer[SparkListener] with mutable.SynchronizedBuffer[SparkListener] def addListener(listener: SparkListener) { sparkListeners += listener } /** * Post an event to all attached listeners. * This does nothing if the event is SparkListenerShutdown. */ def postToAll(event: SparkListenerEvent) { /** * Apply the given function to all attached listeners, catching and logging any exception. */ private def foreachListener(f: SparkListener => Unit): Unit = { sparkListeners.foreach { listener => try { f(listener) } catch { case e: Exception => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } } }
===========================Job 提交流程======================================
submitJob --每个action都会调用到一个submitJob的操作
-> send: JobSubmitted --它发送一个消息给DAGScheduler(因为提交得机器可能不是master?)
-> handleJobSubmitted --DAGScheduler处理接收到的消息
-> newStage --创建一个stage
-> new ActiveJob ---找到一个active状态的
-> [runLocally]} --如果是简单的job,直接在本地执行。
localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
->runLocally(job) --don‘t block the DAGScheduler event loop or other concurrent jobs
->runLocallyWithinThread(job) --创建新的线程执行本地job,不阻塞DAG进程
->TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)
->result = job.func(taskContext, rdd.iterator(split, taskContext)) 执行job
->job.listener.taskSucceeded(0, result) --通知监听者job结果
->listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult)) --通知job结束
->submitStage(finalStage) -- Submits stage, but first recursively submits any missing parents递归提交
-> activeJobForStage --Finds the earliest-created active job that needs the stage。在jobIdToActiveJob找
-> getMissingParentStages --如果一个stage依赖于一个shuffle stage,这个RDD就是missing的
->waitingForVisit.push(stage.rdd)
->waitingForVisit.pop()
->getShuffleMapStage
->registerShuffleDependencies 将所有父节点的shuffle注册到shuffleToMapStage和mapOutputTracker
->getAncestorShuffleDependencies :返回一个栈,里面装着包含shuffle的父依赖节点;
->newOrUsedStage --给RDD创建shuffle stage;如果存在,使用老的loc覆盖新的loc
->mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) or
->mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.size)
->shuffleToMapStage(currentShufDep.shuffleId) = stage --加入DAG的hash属性中
->newOrUsedStage -- 给当前RDD创建shuffle stage
->shuffleToMapStage(shuffleDep.shuffleId) = stage --加入DAG的hash属性中
->NarrowDependency ->waitingForVisit.push(narrowDep.rdd) --narrowDeps的不分析,直接加入栈去找它的父节点。
-> submitMissingTasks --Called when stage‘s parents are available and we can now do its task。这个stage没有依赖缺失了。
-> stage.pendingTasks.clear() 清空正在执行的task。
-> partitionsToCompute = ? --First figure out the indexes of partition ids to compute.
找出需要执行的分片。shuffle要执行更多分片
->runningStages += stage 更新running记录
->listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) --通知应用程序stage被提交。
->Broadcasted binary for the task, used to dispatch tasks to executors. serialized copy of the RDD and for each task,
which means each task gets a different copy of the RDD, This is necessary in Hadoop
where the JobConf/Configuration object is not thread-safe
->// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
->// For ResultTask, serialize and broadcast (rdd, func).
->new ShuffleMapTask(stage.id, taskBinary, part, locs) 创建task
->new ResultTask(stage.id, taskBinary, part, locs, id)
-> Preemptively serialize a task to make sure it can be serialized. For catch exception.
->stage.pendingTasks ++= tasks
->taskScheduler.submitTasks --将task提交到taskScheduler
-> submitStage(parent) --(递归)如果能找到一个stage是missing状态,那就将它的依赖节点submit
======================end=========================================