spark core源码分析10 Task的运行

这一节介绍具体task的运行以及最终结果的处理

看线程运行的run方法,见代码注释
override def run(): Unit = {
    val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)
    val deserializeStartTime = System.currentTimeMillis()
    Thread.currentThread.setContextClassLoader(replClassLoader)
    val ser = env.closureSerializer.newInstance()
    logInfo(s"Running $taskName (TID $taskId)")
    //这个就是就是向Driver发送StatusUpdate方法,状态是RUNNING,其实不做什么操作的
    execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
    var taskStart: Long = 0
    startGCTime = computeTotalGcTime()

    try {
      //将serializedTask解析出来
      val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
      //下载运行task需要的jar,文件等
      updateDependencies(taskFiles, taskJars)
      //把真正的task反序列化出来
      task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
      task.setTaskMemoryManager(taskMemoryManager)

      // If this task has been killed before we deserialized it, let's quit now. Otherwise,
      // continue executing the task.
      if (killed) {
        // Throw an exception rather than returning, because returning within a try{} block
        // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
        // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
        // for the task.
        throw new TaskKilledException
      }

      logDebug("Task " + taskId + "'s epoch is " + task.epoch)
      env.mapOutputTracker.updateEpoch(task.epoch)

      // Run the actual task and measure its runtime.
      taskStart = System.currentTimeMillis()
      val value = try {
	//任务执行,见下面解析
        task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
      } finally {
        // Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
        // when changing this, make sure to update both copies.
        //释放内存
        val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
        if (freedMemory > 0) {
          val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
          if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false)) {
            throw new SparkException(errMsg)
          } else {
            logError(errMsg)
          }
        }
      }
      val taskFinish = System.currentTimeMillis()

      // If the task has been killed, let's fail it.
      if (task.killed) {
        throw new TaskKilledException
      }

      val resultSer = env.serializer.newInstance()
      val beforeSerialization = System.currentTimeMillis()
      //将task运行结果序列化
      val valueBytes = resultSer.serialize(value)
      val afterSerialization = System.currentTimeMillis()

      for (m <- task.metrics) {
        // Deserialization happens in two parts: first, we deserialize a Task object, which
        // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
        m.setExecutorDeserializeTime(
          (taskStart - deserializeStartTime) + task.executorDeserializeTime)
        // We need to subtract Task.run()'s deserialization time to avoid double-counting
        m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
        m.setJvmGCTime(computeTotalGcTime() - startGCTime)
        m.setResultSerializationTime(afterSerialization - beforeSerialization)
      }

      val accumUpdates = Accumulators.values
      val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
      val serializedDirectResult = ser.serialize(directResult)
      val resultSize = serializedDirectResult.limit
      //这里将最终结果序列化成serializedDirectResult,并根据这个序列化之后的大小区分处理

      // directSend = sending directly back to the driver
      val serializedResult: ByteBuffer = {
        //最终结果序列化之后>1G
        if (maxResultSize > 0 && resultSize > maxResultSize) {
          logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
            s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
            s"dropping it.")
          ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
        }
        //最终结果序列化之后>10M,把序列化的结果作为一个Block存放在BlockManager里,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中
        else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val blockId = TaskResultBlockId(taskId)
          env.blockManager.putBytes(
            blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
          logInfo(
            s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
          ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
        } else {
          //小数据可以直接处理
          logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
          serializedDirectResult
        }
      }

      execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

    } catch {
      case ffe: FetchFailedException =>
        val reason = ffe.toTaskEndReason
        execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

      case _: TaskKilledException | _: InterruptedException if task.killed =>
        logInfo(s"Executor killed $taskName (TID $taskId)")
        execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))

      case cDE: CommitDeniedException =>
        val reason = cDE.toTaskEndReason
        execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))

      case t: Throwable =>
        // Attempt to exit cleanly by informing the driver of our failure.
        // If anything goes wrong (or this was a fatal exception), we will delegate to
        // the default uncaught exception handler, which will terminate the Executor.
        logError(s"Exception in $taskName (TID $taskId)", t)

        val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
          task.metrics.map { m =>
            m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
            m.setJvmGCTime(computeTotalGcTime() - startGCTime)
            m
          }
        }
        val taskEndReason = new ExceptionFailure(t, metrics)
        execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(taskEndReason))

        // Don't forcibly exit unless the exception was inherently fatal, to avoid
        // stopping other tasks unnecessarily.
        if (Utils.isFatalError(t)) {
          SparkUncaughtExceptionHandler.uncaughtException(t)
        }

    } finally {
      // Release memory used by this thread for shuffles
      env.shuffleMemoryManager.releaseMemoryForThisThread()
      // Release memory used by this thread for unrolling blocks
      env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
      // Release memory used by this thread for accumulators
      Accumulators.clear()
      runningTasks.remove(taskId)
    }
  }
}

看task.run做了什么?

final def run(taskAttemptId: Long, attemptNumber: Int): T = {
  //首先构造了一个TaskContext,它维护了task的整个生命周期
  context = new TaskContextImpl(
    stageId = stageId,
    partitionId = partitionId,//分区号
    taskAttemptId = taskAttemptId,//taskId
    attemptNumber = attemptNumber,//这个task的第几次尝试,从0开始
    taskMemoryManager = taskMemoryManager,
    runningLocally = false)
  TaskContext.setTaskContext(context)//设置到threadLocal中
  context.taskMetrics.setHostname(Utils.localHostName())
  taskThread = Thread.currentThread()
  if (_killed) {
    kill(interruptThread = false)
  }
  try {
    runTask(context)//根据不同的task类型启动
  } finally {
    context.markTaskCompleted()
    TaskContext.unset()
  }
}
这里的runTask其实是区分shuffleMapTask和ResultTask的。而我在之前举例的sparkPi是没有shuffle过程的,所以这里我列举一个wordcount的例子来说明shuffle的部分。

import org.apache.spark._
import SparkContext._
object WordCount {
  def main(args: Array[String]) {
    if (args.length != 3 ){
      println("usage is org.test.WordCount <master> <input> <output>")
      return
    }
    val sc = new SparkContext(args(0), "WordCount",
    System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
    val textFile = sc.textFile(args(1))
    val result = textFile.flatMap(line => line.split("\\s+"))
        .map(word => (word, 1)).reduceByKey(_ + _)
    result.saveAsTextFile(args(2))
  }
}

在sc.textFile(...).flatMap(...).map(...)之后得到的是一个MapPartitionsRDD,这个跟以前的例子是差不多的,就不介绍了。
我们直接看reduceByKey,方法在PairRDDFunctions中
/**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
   */
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

这里有3个不同的reduceByKey方法。我们可以手动设定reduce的个数,如果不指定的话,就可能不受控制了。


如果不指定reduce个数的话,规则如下:

1、如果自定义了分区函数partitioner的话,就按你的分区函数来走。

2、如果没有定义分区函数而是定义了reduce个数的话,默认分区函数就是根据reduce个数生成HashPartitioner

3、如果这个也没设置,那就按照reduce个数是"spark.default.parallelism"或者rdd.head.partitions.size来生成HashPartitioner
这里我们的K类型是String,V类型是Int,这里的C对于这个题来说也就是Int

def combineByKey[C](createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("Default partitioner cannot partition array keys.")
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
//如果目前RDD中的分区函数与我们设置的一样,那就根本不需要进行shuffle操作了
//它将一个匿名函数封装成MapPartitionsRDD返回
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
  //不然就产生一个ShuffledRDD返回
    new ShuffledRDD[K, V, C](self, partitioner)//这里没有传入关于依赖的参数,而之前的提到过的one-to-one依赖都是直接传入的,后面会说到
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

可以看到,这个reduceByKey代码看似很简单的样子。这是因为它只是一个transformation,真正发挥作用是在action触发之后,就可以体会到复杂性。

那就看最终的result.saveAsTextFile(args(2)),这就是一个action操作。

上面说到shuffledRDD的依赖不是在new的时候传入的,那么在构建stage的时候需要根据dep来划分。其实shuffledRDD有一个重载方法getDependencies
override def getDependencies: Seq[Dependency[_]] = {
  List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
}
所以在划分stage的时候就是因为这个重载方法的存在。
下面直接跳到剩下的runTask的介绍,分为shuffledMapTask和ResultTask
1、ShuffleMapTask的runTask
override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  //反序列化taskBinary成rdd和dep(注:rdd是这个stage的最后一个rdd,dep是这个stage与下一个stage相连的shuffleDependency)
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

  metrics = Some(context.taskMetrics)
  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager  //默认是SortShuffleManager
    //这里的shuffleHandle是封装了shuffleId, _rdd.partitions.size和反序列出来的dep
    //这里getWriter新建一个SortShuffleWriter对象
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    //看write的参数,其实是调用了rdd的compute方法,返回这个partition分区数据的一个迭代器,具体看下面介绍
    //调用write就是将数据写到本地磁盘,并将把blockManagerId和block的大小组合成一个mapStatus
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    return writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}
每个Stage的上边界,要么从外部存储读取数据,要么从上一个Stage的输出读取;而下边界,要么写入本地文件系统,以供下一个Stage读取,要么是ResultTask输出结果。
上例中其实是划分了两个stage,两个stage通过shuffle依赖建立连接
先看第一个stage,它反序列化出来的rdd是这个stage的最后一个rdd,即MapPartitionsRDD;Dep是与下一个stage连接的依赖,即shuffleDependency,这点很重要
我们看调用这个rdd的compute方法发生了什么,很简单,对于这个分区中的数据依次调用传入的方法,返回一个计算过后的数据的迭代器。
<span style="font-size:14px;background-color: rgb(255, 255, 255);">override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))</span>

我们看用默认的sortShuffleWriter调用write方法。

首先会将records写入ExternalSorter,ExternalSorter会使用一个map来存储新的计算结果。如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件。当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里,见writePartitionedFile。最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件.
至此,第一个stage,即从输入源到shuffle输出执行就结束了。
2、ResultTask的runTask
override def runTask(context: TaskContext): U = {
  // Deserialize the RDD and the func using the broadcast variables.
  val deserializeStartTime = System.currentTimeMillis()
  val ser = SparkEnv.get.closureSerializer.newInstance()
  //反序列化出finalrdd及func
  val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

  metrics = Some(context.taskMetrics)
  //调用我们设置的func
  func(context, rdd.iterator(partition, context))
}

我们看func的参数,同理是调用rdd的compute方法。因为我们这里的rdd是经过shuffle之后产生的,所以这里是shuffledRDD,它的compute方法如下

override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[(K, C)]]
  }

这里的SparkEnv.get.shuffleManager可以分为sort和hash,不管是哪种,getReader就是生成一个HashShuffleReader

看read方法
/** Read the combined key-values for this reduce task */
override def read(): Iterator[Product2[K, C]] = {
  val ser = Serializer.getSerializer(dep.serializer)
  //真正的从file中抓取reducer所需的内容,最终封装成InterruptibleIterator返回
  val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

  val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
    if (dep.mapSideCombine) {
      new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
    } else {
      new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
    }
  } else {
    require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

    // Convert the Product2s to pairs since this is what downstream RDDs currently expect
    iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
  }

  // Sort the output if there is a sort ordering defined.
  dep.keyOrdering match {
    case Some(keyOrd: Ordering[K]) =>
      // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
      // the ExternalSorter won't spill to disk.
      val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
      sorter.insertAll(aggregatedIter)
      context.taskMetrics.incMemoryBytesSpilled(sorter.memoryBytesSpilled)
      context.taskMetrics.incDiskBytesSpilled(sorter.diskBytesSpilled)
      sorter.iterator
    case None =>
      aggregatedIter
  }
}

之后真正调用我们的func方法返回结果

我们知道task的run方法返回值是T,所以对于子类shuffleMapTask返回MapStatus,对于ResultTask返回调用func之后的结果。
所以在Executor.scala中的run方法中,value就是run方法的返回值
val value = try {
          task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
        } finally {
         ...
         ...
        }
之后就是一开始讲到过的将返回结果序列化,并调用
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)结束整个流程。它其实是向driver发送StatusUpdate消息,包含了executorId,taskId,task的状态以及运算的序列化之后的结果
case StatusUpdate(executorId, taskId, state, data) =>
  scheduler.statusUpdate(taskId, state, data.value)
  if (TaskState.isFinished(state)) {
    executorDataMap.get(executorId) match {
      case Some(executorInfo) =>
        executorInfo.freeCores += scheduler.CPUS_PER_TASK//一个任务运行完成,该Executor上相应的freeCores增加
        makeOffers(executorId)//可在该Executor上调度可以运行的任务
      case None =>
        // Ignoring the update since we don't know about the executor.
        logWarning(s"Ignored task status update ($taskId state $state) " +
          s"from unknown executor with ID $executorId")
    }
  }
看driver端的处理。首先调用了TaskSchedulerImpl的statusUpdate方法。这个方法中根据状态区分处理,这里是FINISHED状态,且如果结果是DirectTaskResult,直接反序列化结果;如果是IndirectTaskResult,则根据反序列化之后得到的blockId去blockManager中远程读取。不管何种方式,数据取到之后,根据taskId获取tasksetId,再根据tasksetId获取tasksetManager,从而调用该tasksetManager的handleSuccessfulTask方法。该方法主要是调用DAGScheduler的taskEnded方法,向DAGScheduler事件循环发送CompletionEvent事件
我们主要看ResultTask和ShuffleMapTask的处理逻辑,见注释
/**
 * Responds to a task finishing. This is called inside the event loop so it assumes that it can
 * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
 */
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
  val task = event.task
  val stageId = task.stageId
  val taskType = Utils.getFormattedClassName(task)

  outputCommitCoordinator.taskCompleted(stageId, task.partitionId,
    event.taskInfo.attempt, event.reason)

  // The success case is dealt with separately below, since we need to compute accumulator
  // updates before posting.
  if (event.reason != Success) {
    val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
    listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
      event.taskInfo, event.taskMetrics))
  }

  if (!stageIdToStage.contains(task.stageId)) {
    // Skip all the actions if the stage has been cancelled.
    return
  }

  val stage = stageIdToStage(task.stageId)
  event.reason match {
    case Success =>
      listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
        event.reason, event.taskInfo, event.taskMetrics))
      stage.pendingTasks -= task
      task match {
        case rt: ResultTask[_, _] =>
          // Cast to ResultStage here because it's part of the ResultTask
          // TODO Refactor this out to a function that accepts a ResultStage
          val resultStage = stage.asInstanceOf[ResultStage]
          resultStage.resultOfJob match {
            case Some(job) =>
	      //如果这个分区尚未被标记为已完成,处理
              if (!job.finished(rt.outputId)) {
                updateAccumulators(event)
                job.finished(rt.outputId) = true//标记为已完成
                job.numFinished += 1
                // If the whole job has finished, remove it
		// 最后一个stage的所有分区都完成了,即这个job运行完成了
                if (job.numFinished == job.numPartitions) {
                  markStageAsFinished(resultStage)
                  cleanupStateForJobAndIndependentStages(job)//清理内存中的关于该job的信息
                  listenerBus.post(
                    SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
                }

                // taskSucceeded runs some user code that might throw an exception. Make sure
                // we are resilient against that.
                try {
		  //调用listener的taskSucceeded方法,这里的listener就是提交job时的JobWaiter,见下面分析
                  job.listener.taskSucceeded(rt.outputId, event.result)
                } catch {
                  case e: Exception =>
                    // TODO: Perhaps we want to mark the resultStage as failed?
                    job.listener.jobFailed(new SparkDriverExecutionException(e))
                }
              }
            case None =>
              logInfo("Ignoring result from " + rt + " because its job has finished")
          }

        case smt: ShuffleMapTask =>
          val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
          updateAccumulators(event)
          val status = event.result.asInstanceOf[MapStatus]//shuffleMapTask的输出结果是一个MapStatus结构
          val execId = status.location.executorId
          logDebug("ShuffleMapTask finished on " + execId)
          if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
            logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
          } else {
	    //将这个partition标记为运行完成,即添加分区号->status的映射到outputLoc的hashmap结构中
            shuffleStage.addOutputLoc(smt.partitionId, status)
          }
   	  //如果这个shuffleMapTask是该stage处理的最后一个task,表明这个stage处理结束了
          if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
            markStageAsFinished(shuffleStage)
            logInfo("looking for newly runnable stages")
            logInfo("running: " + runningStages)
            logInfo("waiting: " + waitingStages)
            logInfo("failed: " + failedStages)

            // We supply true to increment the epoch number here in case this is a
            // recomputation of the map outputs. In that case, some nodes may have cached
            // locations with holes (from when we detected the error) and will need the
            // epoch incremented to refetch them.
            // TODO: Only increment the epoch number if this is not the first time
            //       we registered these map outputs.
    	    //保存shuffleId->mapStatuses的映射
            mapOutputTracker.registerMapOutputs(
              shuffleStage.shuffleDep.shuffleId,
              //这里取list.head是因为同一个partition有可能会有多个task attempt在运行,每一个task attempt运行完成后就添加到list的头部
              shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
              changeEpoch = true)

	    //由于已经更新了outputLoc, 所以将缓存中的clear
            clearCacheLocs()
            //如果有任何一个partition的outputLoc为空,即说明这个stage未完成,需要重新提交
            if (shuffleStage.outputLocs.contains(Nil)) {
              // Some tasks had failed; let's resubmit this shuffleStage
              // TODO: Lower-level scheduler should also deal with this
              logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                ") because some of its tasks had failed: " +
                shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
                    .map(_._2).mkString(", "))
              submitStage(shuffleStage)
            } else {
              val newlyRunnable = new ArrayBuffer[Stage]
              for (shuffleStage <- waitingStages) {
                logInfo("Missing parents for " + shuffleStage + ": " +
                  getMissingParentStages(shuffleStage))
              }
	      //准备提交下一个stage。下一个可以提交的stage的依据是它的parent stage已经都完成了
              for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
              {
                newlyRunnable += shuffleStage
              }
              waitingStages --= newlyRunnable
              runningStages ++= newlyRunnable
              for {
                shuffleStage <- newlyRunnable.sortBy(_.id)
                jobId <- activeJobForStage(shuffleStage)
              } {
                logInfo("Submitting " + shuffleStage + " (" +
                  shuffleStage.rdd + "), which is now runnable")
   		//提交被选中的stage运行
                submitMissingTasks(shuffleStage, jobId)
              }
            }
          }
        }
     ...略掉部分case
  }
  submitWaitingStages()//这个是调度等待中的stage
}
override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
  if (_jobFinished) {
    throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
  }
  resultHandler(index, result.asInstanceOf[T])//调用先前设置的方法,见下面
  finishedTasks += 1
  if (finishedTasks == totalTasks) {
    _jobFinished = true
    jobResult = JobSucceeded
    this.notifyAll()//触发该job awaitResult等待完成
  }
}

resultHandler方法就是对每一个分区的结果用数组保存。之后将该数组返回。

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    allowLocal: Boolean
    ): Array[U] = {
  val results = new Array[U](partitions.size)
  runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
  results
}

至此,任务的运行就介绍结束了。


版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-11 04:04:11

spark core源码分析10 Task的运行的相关文章

spark core源码分析7 Executor的运行

实际任务的运行,都是通过Executor类来执行的.这一节,我们只介绍Standalone模式. 源码位置:org.apache.spark.executor.CoarseGrainedExecutorBackend private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

spark core源码分析8 从简单例子看transformation

前面提到过spark自带的一个最简单的例子,也介绍了SparkContext的部分,这节介绍剩余的内容中的transformation. object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(

spark core源码分析14 参数配置

博客地址: http://blog.csdn.net/yueqian_zhu/ spark 参数详解 一.Shuffle 相关 1.spark.shuffle.manager(默认 sort) HashShuffleManager,故名思义也就是在Shuffle的过程中写数据时不做排序操作,只是将数据根据Hash的结果,将各个Reduce分区的数据写到各自的磁盘文件中.带来的问题就是如果Reduce分区的数量比较大的话,将会产生大量的磁盘文件.如果文件数量特别巨大,对文件读写的性能会带来比较大的

spark core源码分析4 worker启动流程

源码位置:org.apache.spark.deploy.worker.Worker.scala 首先查看worker的main方法,与master类似,创建sparkConf,参数解析,以及构造worker对象并创建ActorRef用于对外或者本身的信息交互.这里masters参数可以设置多个 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args =

spark core源码分析9 从简单例子看action操作

上一节举例讲解了transformation操作,这一节以reduce为例讲解action操作 首先看submitJob方法,它将我们reduce中写的处理函数随JobSubmitted消息传递出去,因为每个分区都需要调用它进行计算: 而resultHandler是指最后合并的方法,在每个task完成后,需要调用resultHandler将最终结果合并.所以它不需要随JobSubmitted消息传递,而是保存在JobWaiter中 /** * Submit a job to the job sc

spark core源码分析15 Shuffle详解-写流程

博客地址: http://blog.csdn.net/yueqian_zhu/ Shuffle是一个比较复杂的过程,有必要详细剖析一下内部写的逻辑 ShuffleManager分为SortShuffleManager和HashShuffleManager 一.SortShuffleManager 每个ShuffleMapTask不会为每个Reducer生成一个单独的文件:相反,它会将所有的结果写到一个本地文件里,同时会生成一个index文件,Reducer可以通过这个index文件取得它需要处理

spark core源码分析13 异常情况下的容错保证

博客地址: http://blog.csdn.net/yueqian_zhu/ standalone模式下的框架图如下: 异常分析1: worker异常退出 worker异常退出,比如说有意识的通过kill指令将worker杀死 worker在退出之前,会将自己所管控的所有小弟executor全干掉 worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个"分舵"离开了 Master非常伤心,伤心的Ma

Spark SQL 源码分析之 In-Memory Columnar Storage 之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar