spark 笔记 7: DAGScheduler

在前面的sparkContex和RDD都可以看到,真正的计算工作都是同过调用DAGScheduler的runjob方法来实现的。这是一个很重要的类。在看这个类实现之前,需要对actor模式有一点了解:http://en.wikipedia.org/wiki/Actor_model http://www.slideshare.net/YungLinHo/introduction-to-actor-model-and-akka 粗略知道actor模式怎么实现就可以了。另外,应该先看看DAG相关的概念和论文 http://en.wikipedia.org/wiki/Directed_acyclic_graph    http://www.netlib.org/utk/people/JackDongarra/PAPERS/DAGuE_technical_report.pdf

/** * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a * minimal schedule to run the job. It then submits stages as TaskSets to an underlying * TaskScheduler implementation that runs them on the cluster. * * In addition to coming up with a DAG of stages, this class also determines the preferred * locations to run each task on, based on the current cache status, and passes these to the * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task * a small number of times before cancelling the whole stage. * */
package org.apache.spark.scheduler

private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = SystemClock)
extends Logging {

状态机(actor 消息响应):

private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler)  extends Actor with Logging {

override def preStart() {    // set DAGScheduler for taskScheduler to ensure eventProcessActor is always    // valid when the messages arrive    dagScheduler.taskScheduler.setDAGScheduler(dagScheduler)  }

/**   * The main event loop of the DAG scheduler.   */  def receive = {    case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,        listener, properties)

case StageCancelled(stageId) =>      dagScheduler.handleStageCancellation(stageId)

case JobCancelled(jobId) =>      dagScheduler.handleJobCancellation(jobId)

case JobGroupCancelled(groupId) =>      dagScheduler.handleJobGroupCancelled(groupId)

case AllJobsCancelled =>      dagScheduler.doCancelAllJobs()

case ExecutorAdded(execId, host) =>      dagScheduler.handleExecutorAdded(execId, host)

case ExecutorLost(execId) =>      dagScheduler.handleExecutorLost(execId)

case BeginEvent(task, taskInfo) =>      dagScheduler.handleBeginEvent(task, taskInfo)

case GettingResultEvent(taskInfo) =>      dagScheduler.handleGetTaskResult(taskInfo)

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>      dagScheduler.handleTaskCompletion(completion)

case TaskSetFailed(taskSet, reason) =>      dagScheduler.handleTaskSetFailed(taskSet, reason)

case ResubmitFailedStages =>      dagScheduler.resubmitFailedStages()  }

重要的属性:

private val nextStageId = new AtomicInteger(0)
private[scheduler] val nextJobId = new AtomicInteger(0)

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

// Stages we need to run whose parents aren‘t doneprivate[scheduler] val waitingStages = new HashSet[Stage]// Stages we are running right nowprivate[scheduler] val runningStages = new HashSet[Stage]// Stages that must be resubmitted due to fetch failuresprivate[scheduler] val failedStages = new HashSet[Stage]private[scheduler] val activeJobs = new HashSet[ActiveJob]// Contains the locations that each RDD‘s partitions are cached onprivate val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
private val dagSchedulerActorSupervisor =  env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this)))
// A closure serializer that we reuse.// This is only safe because DAGScheduler runs in a single thread.private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()

private[scheduler] var eventProcessActor: ActorRef = _
/** * Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object * can be used to block until the the job finishes executing or can be used to cancel the job. */def submitJob[T, U](    rdd: RDD[T],    func: (TaskContext, Iterator[T]) => U,    partitions: Seq[Int],    callSite: CallSite,    allowLocal: Boolean,    resultHandler: (Int, U) => Unit,    properties: Properties = null): JobWaiter[U] ={  // Check to make sure we are not launching a task on a partition that does not exist.  val maxPartitions = rdd.partitions.length  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>    throw new IllegalArgumentException(      "Attempting to access a non-existent partition: " + p + ". " +        "Total number of partitions: " + maxPartitions)  }

val jobId = nextJobId.getAndIncrement()  if (partitions.size == 0) {    return new JobWaiter[U](this, jobId, 0, resultHandler)  }

assert(partitions.size > 0)  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)  eventProcessActor ! JobSubmitted(    jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)  waiter

}
/** * Run a job on an RDD locally, assuming it has only a single partition and no dependencies. * We run the operation in a separate thread just in case it takes a bunch of time, so that we * don‘t block the DAGScheduler event loop or other concurrent jobs. */protected def runLocally(job: ActiveJob) {  logInfo("Computing the requested partition locally")  new Thread("Local computation of job " + job.jobId) {    override def run() {      runLocallyWithinThread(job)    }  }.start()}
private[scheduler] def handleJobSubmitted(jobId: Int,    finalRDD: RDD[_],    func: (TaskContext, Iterator[_]) => _,    partitions: Array[Int],    allowLocal: Boolean,    callSite: CallSite,    listener: JobListener,    properties: Properties = null){
/** Submits stage, but first recursively submits any missing parents. */private def submitStage(stage: Stage) {
/** Called when stage‘s parents are available and we can now do its task. */private def submitMissingTasks(stage: Stage, jobId: Int) {
/** Finds the earliest-created active job that needs the stage */// TODO: Probably should actually find among the active jobs that need this// stage the one with the highest priority (highest-priority pool, earliest created).// That should take care of at least part of the priority inversion problem with// cross-job dependencies.private def activeJobForStage(stage: Stage): Option[Int] = {  val jobsThatUseStage: Array[Int] = stage.jobIds.toArray.sorted  jobsThatUseStage.find(jobIdToActiveJob.contains)}
/** * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue * architecture where any thread can post an event (e.g. a task finishing or a new job being * submitted) but there is a single "logic" thread that reads these events and takes decisions. * This greatly simplifies synchronization. */private[scheduler] sealed trait DAGSchedulerEvent
/** * Asynchronously passes SparkListenerEvents to registered SparkListeners. * * Until start() is called, all posted events are only buffered. Only after this listener bus * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when it receives a SparkListenerShutdown event, which is posted using stop(). */private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
/** * A SparkListenerEvent bus that relays events to its listeners */private[spark] trait SparkListenerBus extends Logging {

// SparkListeners attached to this event bus  protected val sparkListeners = new ArrayBuffer[SparkListener]    with mutable.SynchronizedBuffer[SparkListener]

def addListener(listener: SparkListener) {    sparkListeners += listener  }

/**   * Post an event to all attached listeners.   * This does nothing if the event is SparkListenerShutdown.   */  def postToAll(event: SparkListenerEvent) {

/**   * Apply the given function to all attached listeners, catching and logging any exception.   */  private def foreachListener(f: SparkListener => Unit): Unit = {    sparkListeners.foreach { listener =>      try {        f(listener)      } catch {        case e: Exception =>          logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)      }    }  }

}

===========================Job 提交流程======================================

submitJob   --每个action都会调用到一个submitJob的操作

-> send: JobSubmitted --它发送一个消息给DAGScheduler(因为提交得机器可能不是master?)

-> handleJobSubmitted   --DAGScheduler处理接收到的消息

-> newStage   --创建一个stage

-> new ActiveJob   ---找到一个active状态的

-> [runLocally]}  --如果是简单的job,直接在本地执行。

localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1

->runLocally(job)  --don‘t block the DAGScheduler event loop or other concurrent jobs

->runLocallyWithinThread(job)  --创建新的线程执行本地job,不阻塞DAG进程

->TaskContext(job.finalStage.id, job.partitions(0), 0, runningLocally = true)

->result = job.func(taskContext, rdd.iterator(split, taskContext))  执行job

->job.listener.taskSucceeded(0, result)  --通知监听者job结果

->listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))  --通知job结束

->submitStage(finalStage)   -- Submits stage, but first recursively submits any missing parents递归提交

-> activeJobForStage   --Finds the earliest-created active job that needs the stage。在jobIdToActiveJob找

-> getMissingParentStages   --如果一个stage依赖于一个shuffle stage,这个RDD就是missing的

->waitingForVisit.push(stage.rdd)

->waitingForVisit.pop()

->getShuffleMapStage

->registerShuffleDependencies 将所有父节点的shuffle注册到shuffleToMapStage和mapOutputTracker

->getAncestorShuffleDependencies :返回一个栈,里面装着包含shuffle的父依赖节点;

->newOrUsedStage  --给RDD创建shuffle stage;如果存在,使用老的loc覆盖新的loc

->mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) or

->mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.size)

->shuffleToMapStage(currentShufDep.shuffleId) = stage  --加入DAG的hash属性中

->newOrUsedStage -- 给当前RDD创建shuffle stage

->shuffleToMapStage(shuffleDep.shuffleId) = stage   --加入DAG的hash属性中

->NarrowDependency  ->waitingForVisit.push(narrowDep.rdd) --narrowDeps的不分析,直接加入栈去找它的父节点。

-> submitMissingTasks  --Called when stage‘s parents are available and we can now do its task。这个stage没有依赖缺失了。

-> stage.pendingTasks.clear() 清空正在执行的task。

-> partitionsToCompute = ? --First figure out the indexes of partition ids to compute.

找出需要执行的分片。shuffle要执行更多分片

->runningStages += stage  更新running记录

->listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  --通知应用程序stage被提交。

->Broadcasted binary for the task, used to dispatch tasks to executors. serialized copy of the RDD and for each task,

which means each task gets a different copy of the RDD, This is necessary in Hadoop

where the JobConf/Configuration object is not thread-safe

->// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).

->// For ResultTask, serialize and broadcast (rdd, func).

->new ShuffleMapTask(stage.id, taskBinary, part, locs)  创建task

->new ResultTask(stage.id, taskBinary, part, locs, id)

-> Preemptively serialize a task to make sure it can be serialized. For catch exception.

->stage.pendingTasks ++= tasks

->taskScheduler.submitTasks  --将task提交到taskScheduler

-> submitStage(parent) --(递归)如果能找到一个stage是missing状态,那就将它的依赖节点submit

======================end=========================================

来自为知笔记(Wiz)

时间: 2024-10-14 03:09:23

spark 笔记 7: DAGScheduler的相关文章

Spark分析之DAGScheduler

DAGScheduler的主要功能1.接收用户提交的job;2.将job根据类型划分为不同的stage,并在每一个stage内产生一系列的task,并封装成TaskSet;3.向TaskScheduler提交TaskSet; 以如下示例描述Job提交过程: val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.geten

spark 笔记 13: 再看DAGScheduler,stage状态更新流程

当某个task完成后,某个shuffle Stage X可能已完成,那么就可能会一些仅依赖Stage X的Stage现在可以执行了,所以要有响应task完成的状态更新流程. =======================DAG task完成后的更新流程=================== ->CoarseGrainedSchedulerBackend::receiveWithLogging  --调度器的事件接收器 ->case StatusUpdate(executorId, taskId

spark 笔记 14: spark中的delay scheduling实现

延迟调度算法的实现是在TaskSetManager类中的,它通过将task存放在四个不同级别的hash表里,当有可用的资源时,resourceOffer函数的参数之一(maxLocality)就是这些资源的最大(或者最优)locality级别,如果存在task满足资源的locality,那从最优级别的hash表.也就是task和excutor都有loclity级别,如果能找到匹配的task,那从匹配的task中找一个最优的task. =====================延迟调度算法====

spark 笔记 10: TaskScheduler相关

任务调度器的接口类.应用程序可以定制自己的调度器来执行.当前spark只实现了一个任务调度器TaskSchedulerImpl ===================task scheduler begin==================== -> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作. -> new TaskSetManager(this, taskSet, maxTaskFailures)

spark 笔记 8: Stage

Stage 是一组独立的任务,他们在一个job中执行相同的功能(function),功能的划分是以shuffle为边界的.DAG调度器以拓扑顺序执行同一个Stage中的task. /** * A stage is a set of independent tasks all computing the same function that need to run as part * of a Spark job, where all the tasks have the same shuffle

spark 笔记 8: Task/TaskContext

DAGScheduler最终创建了task set,并提交给了taskScheduler.那先得看看task是怎么定义和执行的. Task是execution执行的一个单元. /** * A unit of execution. We have two kinds of Task's in Spark: * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] *

spark 笔记 5: SparkContext,SparkConf

SparkContext 是spark的程序入口,相当于熟悉的'main'函数.它负责链接spark集群.创建RDD.创建累加计数器.创建广播变量. /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast varia

spark 笔记 11: SchedulingAlgorithm 两种调度算法的优先级比较

调度算法的最基本工作之一,就是比较两个可执行的task的优先级.spark提供的FIFO和FAIR的优先级比较在SchedulingAlgorithm这个接口体现. FIFO: --计算优先级的差.注意,在程序中,大部分时候是优先级的数字越小,它优先级越高 --如果优先级相同,那么stage编号越靠前,优先级越高 --如果优先级字段和stage id都相同,那么s2比s1更优先.(有这种情况?) FAIR: --没有达到最小资源的task比已经达到最小资源的task优先级高 --如果两个task

spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

无论是Hadoop还是spark,shuffle操作都是决定其性能的重要因素.在不能减少shuffle的情况下,使用一个好的shuffle管理器也是优化性能的重要手段. ShuffleManager的主要功能是在task直接传递数据,所以getWriter和getReader是它的主要接口. 大流程: 1)需求方:当一个Stage依赖于一个shuffleMap的结果,那它在DAG分解的时候就能识别到这个依赖,并注册到shuffleManager: 2)供应方:也就是shuffleMap,它在结束