Apache Spark-1.0.0浅析(七):资源调度——结果返回

对于ResultTask,直接执行func操作,最后告知任务是否执行完成;而对于ShuffleMapTask,则需要将中间结果存储到实例化DirectTaskResult,以备下一个task使用,同时还要返回实例化的MapStatus。

Executor.run中,当Task执行完毕调用execBackend.statusUpdate,在CoarseGrainedExecutorBackend继承了ExecutorBackend,重新定义statusUpdate,向driver发送StatusUpdate消息

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    driver ! StatusUpdate(executorId, taskId, state, data)
  }
}

CoaseGrainedSchedulerBackend中定义的driverActor接收,首先执行scheduler.statusUpdate,更新状态,释放资源

case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        if (TaskState.isFinished(state)) {
          if (executorActor.contains(executorId)) {
            freeCores(executorId) += scheduler.CPUS_PER_TASK
            makeOffers(executorId)
          } else {
            // Ignoring the update since we don‘t know about the executor.
            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
            logWarning(msg.format(taskId, state, sender, executorId))
          }
        }

scheduler.statusUpdate主要移除当前完成的task,同时更新taskSets

def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
    var failedExecutor: Option[String] = None
    synchronized {
      try {
        if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
          // We lost this entire executor, so remember that it‘s gone
          val execId = taskIdToExecutorId(tid)
          if (activeExecutorIds.contains(execId)) {
            removeExecutor(execId)
            failedExecutor = Some(execId)
          }
        }
        taskIdToTaskSetId.get(tid) match {
          case Some(taskSetId) =>
            if (TaskState.isFinished(state)) {
              taskIdToTaskSetId.remove(tid)
              taskIdToExecutorId.remove(tid)
            }
            activeTaskSets.get(taskSetId).foreach { taskSet =>
              if (state == TaskState.FINISHED) {
                taskSet.removeRunningTask(tid)
                taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
              } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
                taskSet.removeRunningTask(tid)
                taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
              }
            }
          case None =>
            logError(
              ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
               "likely the result of receiving duplicate task finished status updates)")
              .format(state, tid))
        }
      } catch {
        case e: Exception => logError("Exception in statusUpdate", e)
      }
    }
    // Update the DAGScheduler without holding a lock on this, since that can deadlock
    if (failedExecutor.isDefined) {
      dagScheduler.executorLost(failedExecutor.get)
      backend.reviveOffers()
    }
  }

其中,主要语句是taskResultGetter.enqueueSuccessfulTask,首先获得反序列化的结果数据,分为直接结果或非直接结果处理,最后执行scheduler.handleSuccessfulTask

def enqueueSuccessfulTask(
    taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
    getTaskResultExecutor.execute(new Runnable {
      override def run(): Unit = Utils.logUncaughtExceptions {
        try {
          val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
            case directResult: DirectTaskResult[_] => directResult
            case IndirectTaskResult(blockId) =>
              logDebug("Fetching indirect task result for TID %s".format(tid))
              scheduler.handleTaskGettingResult(taskSetManager, tid)
              val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
              if (!serializedTaskResult.isDefined) {
                /* We won‘t be able to get the task result if the machine that ran the task failed
                 * between when the task ended and when we tried to fetch the result, or if the
                 * block manager had to flush the result. */
                scheduler.handleFailedTask(
                  taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                return
              }
              val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                serializedTaskResult.get)
              sparkEnv.blockManager.master.removeBlock(blockId)
              deserializedResult
          }
          result.metrics.resultSize = serializedData.limit()
          scheduler.handleSuccessfulTask(taskSetManager, tid, result)
        } catch {
          case cnf: ClassNotFoundException =>
            val loader = Thread.currentThread.getContextClassLoader
            taskSetManager.abort("ClassNotFound with classloader: " + loader)
          case ex: Exception =>
            taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
        }
      }
    })
  }
scheduler.handleSuccessfulTask在TaskSchedulerImpl中定义如下,仅调用taskSetManager.handleSuccessfulTask
def handleSuccessfulTask(
    taskSetManager: TaskSetManager,
    tid: Long,
    taskResult: DirectTaskResult[_]) = synchronized {
    taskSetManager.handleSuccessfulTask(tid, taskResult)
  }

taskSetManager.handleSuccessfulTask,将task标记为successful,从RunningTask中移除,然后调用sched.dagScheduler.taskEnded

/**
   * Marks the task as successful and notifies the DAGScheduler that a task has ended.
   */
  def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
    val info = taskInfos(tid)
    val index = info.index
    info.markSuccessful()
    removeRunningTask(tid)
    sched.dagScheduler.taskEnded(
      tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
    if (!successful(index)) {
      tasksSuccessful += 1
      logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
        tid, info.duration, info.host, tasksSuccessful, numTasks))
      // Mark successful and stop if all the tasks have succeeded.
      successful(index) = true
      if (tasksSuccessful == numTasks) {
        isZombie = true
      }
    } else {
      logInfo("Ignorning task-finished event for TID " + tid + " because task " +
        index + " has already completed successfully")
    }
    failedExecutors.remove(index)
    maybeFinishTaskSet()
  }

sched.dagScheduler,taskEnded向eventProcessActor发送CompletionEvent消息

// Called by TaskScheduler to report task completions or failures.
  def taskEnded(
      task: Task[_],
      reason: TaskEndReason,
      result: Any,
      accumUpdates: Map[Long, Any],
      taskInfo: TaskInfo,
      taskMetrics: TaskMetrics) {
    eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)

DAGScheduler中定义接收响应,调用dagScheduler.handleTaskCompletion

case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
      dagScheduler.handleTaskCompletion(completion)

dagScheduler.handleTaskCompletion,如果是ResultTask,首先向listenerBus发送SparkListenerTaskEnd,获得task对应的stage,定义了一个本地方法markStageAsFinished,后续调用,判断事件类型,包含Success、Resubmitted、FetchFailed、ExceptionFailure、TaskResultLost等,最后submitWaitingStages()提交等待(依赖)的stages。

如果是Success事件,则进一步判断task是ResultTask或者ShuffleMapTask,如果是ResultTask,将task所属stage中的该部output标记为已完成,最后调用job.listener.taskSucceeded,如果整个stage完成,则标记markStageAsFinished,向listenerBus发送SparkListenerJobEnd。

若是ShuffleMapTask,记录task在executor完成,addOutputLoc添加Shuffle output location,markStageAsFinished,判断如果该stage是runningStages且该stage挂起的tasks为空,主要动作是getMissingParentStages获得依赖waitingStages,最后submitMissingTasks提交依赖tasks

/**
   * Responds to a task finishing. This is called inside the event loop so it assumes that it can
   * modify the scheduler‘s internal state. Use taskEnded() to post a task end event from outside.
   */
  private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
    val task = event.task
    val stageId = task.stageId
    val taskType = Utils.getFormattedClassName(task)
    listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
      event.taskMetrics))
    if (!stageIdToStage.contains(task.stageId)) {
      // Skip all the actions if the stage has been cancelled.
      return
    }
    val stage = stageIdToStage(task.stageId)

    def markStageAsFinished(stage: Stage) = {
      val serviceTime = stageToInfos(stage).submissionTime match {
        case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0)
        case _ => "Unknown"
      }
      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
      stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
      listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
      runningStages -= stage
    }
    event.reason match {
      case Success =>
        logInfo("Completed " + task)
        if (event.accumUpdates != null) {
          Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn‘t resubmitted
        }
        pendingTasks(stage) -= task
        task match {
          case rt: ResultTask[_, _] =>
            resultStageToJob.get(stage) match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  job.finished(rt.outputId) = true
                  job.numFinished += 1
                  // If the whole job has finished, remove it
                  if (job.numFinished == job.numPartitions) {
                    markStageAsFinished(stage)
                    cleanupStateForJobAndIndependentStages(job, Some(stage))
                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                  }
                  job.listener.taskSucceeded(rt.outputId, event.result)
                }
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
            }

          case smt: ShuffleMapTask =>
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
            } else {
              stage.addOutputLoc(smt.partitionId, status)
            }
            if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
              markStageAsFinished(stage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
              if (stage.shuffleDep.isDefined) {
                // We supply true to increment the epoch number here in case this is a
                // recomputation of the map outputs. In that case, some nodes may have cached
                // locations with holes (from when we detected the error) and will need the
                // epoch incremented to refetch them.
                // TODO: Only increment the epoch number if this is not the first time
                //       we registered these map outputs.
                mapOutputTracker.registerMapOutputs(
                  stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
              }
              clearCacheLocs()
              if (stage.outputLocs.exists(_ == Nil)) {
                // Some tasks had failed; let‘s resubmit this stage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + stage + " (" + stage.name +
                  ") because some of its tasks had failed: " +
                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
                submitStage(stage)
              } else {
                val newlyRunnable = new ArrayBuffer[Stage]
                for (stage <- waitingStages) {
                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
                }
                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                  newlyRunnable += stage
                }
                waitingStages --= newlyRunnable
                runningStages ++= newlyRunnable
                for {
                  stage <- newlyRunnable.sortBy(_.id)
                  jobId <- activeJobForStage(stage)
                } {
                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                  submitMissingTasks(stage, jobId)
                }
              }
            }
          }

      case Resubmitted =>
        logInfo("Resubmitted " + task + ", so marking it as still running")
        pendingTasks(stage) += task

      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
        // Mark the stage that the reducer was in as unrunnable
        val failedStage = stageIdToStage(task.stageId)
        runningStages -= failedStage
        // TODO: Cancel running tasks in the stage
        logInfo("Marking " + failedStage + " (" + failedStage.name +
          ") for resubmision due to a fetch failure")
        // Mark the map whose fetch failed as broken in the map stage
        val mapStage = shuffleToMapStage(shuffleId)
        if (mapId != -1) {
          mapStage.removeOutputLoc(mapId, bmAddress)
          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
        }
        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
          "); marking it for resubmission")
        if (failedStages.isEmpty && eventProcessActor != null) {
          // Don‘t schedule an event to resubmit failed stages if failed isn‘t empty, because
          // in that case the event will already have been scheduled. eventProcessActor may be
          // null during unit tests.
          import env.actorSystem.dispatcher
          env.actorSystem.scheduler.scheduleOnce(
            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
        }
        failedStages += failedStage
        failedStages += mapStage
        // TODO: mark the executor as failed only if there were lots of fetch failures on it
        if (bmAddress != null) {
          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
        }

      case ExceptionFailure(className, description, stackTrace, metrics) =>
        // Do nothing here, left up to the TaskScheduler to decide how to handle user failures

      case TaskResultLost =>
        // Do nothing here; the TaskScheduler handles these failures and resubmits the task.

      case other =>
        // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
        // will abort the job.
    }
    submitWaitingStages()
  }

ResultTask执行成功调用的job.listener.taskSucceeded,JobWaiter继承了JobListener,重新定义了taskSucceeded,判断如果已完成的task数量和总共task数量相等,则意味着job完成,向所有listener发送JobSucceeded消息

override def taskSucceeded(index: Int, result: Any): Unit = synchronized {
    if (_jobFinished) {
      throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
    }
    resultHandler(index, result.asInstanceOf[T])
    finishedTasks += 1
    if (finishedTasks == totalTasks) {
      _jobFinished = true
      jobResult = JobSucceeded
      this.notifyAll()
    }
  }

接DAGScheduler.runJob,waiter等待接受消息JobSucceeded消息,整个job执行完毕

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: String,
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit,
      properties: Properties = null)
  {
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
    waiter.awaitResult() match {
      case JobSucceeded => {}
      case JobFailed(exception: Exception) =>
        logInfo("Failed to run " + callSite)
        throw exception
    }
  }

END

时间: 2024-08-03 01:19:50

Apache Spark-1.0.0浅析(七):资源调度——结果返回的相关文章

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide 概述 入门 属性 Graph 示例属性 Graph Graph 运算符 运算符的汇总表 Property 运算符 Structural 运算符 Join 运算符 邻域聚合 聚合消息 (aggregateMessages) Map Reduce Triplets Transition Guide (Legacy) 计算级别信息 收集相邻点 Caching and Uncaching Pregel API Graph 建造者 Vertex and E

Apache Spark 2.2.0 中文文档 - Submitting Applications | ApacheCN

Submitting Applications 在 script in Spark的 bin 目录中的spark-submit 脚本用与在集群上启动应用程序.它可以通过一个统一的接口使用所有 Spark 支持的 cluster managers,所以您不需要专门的为每个cluster managers配置您的应用程序. 打包应用依赖 如果您的代码依赖了其它的项目,为了分发代码到 Spark 集群中您将需要将它们和您的应用程序一起打包.为此,创建一个包含您的代码以及依赖的 assembly jar

Apache Spark 2.2.0新特性介绍(转载)

这个版本是 Structured Streaming 的一个重要里程碑,因为其终于可以正式在生产环境中使用,实验标签(experimental tag)已经被移除.在流系统中支持对任意状态进行操作:Apache Kafka 0.10 的 streaming 和 batch API支持读和写操作.除了在 SparkR, MLlib 和 GraphX 里面添加新功能外,该版本更多的工作在系统的可用性(usability).稳定性(stability)以及代码的润色(polish)并解决了超过 110

Apache Spark 1.5.0正式发布

Spark 1.5.0是1.x线上的第6个发行版.这个版本共处理了来自230+contributors和80+机构的1400+个patches.Spark 1.5的许多改变都是围绕在提升Spark的性能.可用性以及操作稳定性.Spark 1.5.0焦点在Tungsten项目,它主要是通过对低层次的组建进行优化从而提升Spark的性能.Spark 1.5版本为Streaming增加了operational特性,比如支持backpressure.另外比较重要的更新就是新增加了一些机器学习算法和工具,

Apache Spark 2.2.0 中文文档 - 集群模式概述 | ApacheCN

集群模式概述 该文档给出了 Spark 如何在集群上运行.使之更容易来理解所涉及到的组件的简短概述.通过阅读 应用提交指南 来学习关于在集群上启动应用. 组件 Spark 应用在集群上作为独立的进程组来运行,在您的 main 程序中通过 SparkContext 来协调(称之为 driver 程序). 具体的说,为了运行在集群上,SparkContext 可以连接至几种类型的 Cluster Manager(既可以用 Spark 自己的 Standlone Cluster Manager,或者

Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD 抽象 2.2 Spark 编程接口 2.2.1 例子 – 监控日志数据挖掘 2.3 RDD 模型的优势 2.4 不适合用 RDDs 的应用 3 Spark 编程接口 3.1 Spark 中 RDD 的操作 3.2 举例应用 3.2.1 线性回归 3.2.2 PageRank 4 表达 RDDs 5