Spark技术内幕: Task向Executor提交的源码解析

从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的。

如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks。

org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下:

  1. 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
  2. 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
  3. 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
  4. 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的
  5. 通过TaskScheduler提交TaskSet。

TaskSet
就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个
partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来
说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。

TaskSet是一个数据结构,存储了这一组task:

[java] view plaincopy

  1. private[spark] class TaskSet(
  2. val tasks: Array[Task[_]],
  3. val stageId: Int,
  4. val attempt: Int,
  5. val priority: Int,
  6. val properties: Properties) {
  7. val id: String = stageId + "." + attempt
  8. override def toString: String = "TaskSet " + id
  9. }

管理调度这个TaskSet的时org.apache.spark.scheduler.TaskSetManager,TaskSetManager会负责task的失败重试;跟踪每个task的执行状态;处理locality-aware的调用。

详细的调用堆栈如下:

  1. org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
  2. org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
  3. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
  4. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#makeOffers
  5. org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers
  6. org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor#launchTasks
  7. org.apache.spark.executor.CoarseGrainedExecutorBackend.receiveWithLogging#launchTask
  8. org.apache.spark.executor.Executor#launchTask

首先看一下org.apache.spark.executor.Executor#launchTask:

[java] view plaincopy

  1. def launchTask(
  2. context: ExecutorBackend, taskId: Long, taskName: String, serializedTask: ByteBuffer) {
  3. val tr = new TaskRunner(context, taskId, taskName, serializedTask)
  4. runningTasks.put(taskId, tr)
  5. threadPool.execute(tr) // 开始在executor中运行
  6. }

TaskRunner会从序列化的task中反序列化得到task,这个需要看 org.apache.spark.executor.Executor.TaskRunner#run 的实现:task.run(taskId.toInt)。而task.run的实现是:

[java] view plaincopy

  1. final def run(attemptId: Long): T = {
  2. context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
  3. context.taskMetrics.hostname = Utils.localHostName()
  4. taskThread = Thread.currentThread()
  5. if (_killed) {
  6. kill(interruptThread = false)
  7. }
  8. runTask(context)
  9. }

对于原来提到的两种Task,即

  1. org.apache.spark.scheduler.ShuffleMapTask
  2. org.apache.spark.scheduler.ResultTask

分别实现了不同的runTask:

org.apache.spark.scheduler.ResultTask#runTask即顺序调用rdd的compute,通过rdd的拓扑顺序依次对partition进行计算:

[java] view plaincopy

  1. override def runTask(context: TaskContext): U = {
  2. // Deserialize the RDD and the func using the broadcast variables.
  3. val ser = SparkEnv.get.closureSerializer.newInstance()
  4. val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
  5. ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  6. metrics = Some(context.taskMetrics)
  7. try {
  8. func(context, rdd.iterator(partition, context))
  9. } finally {
  10. context.markTaskCompleted()
  11. }
  12. }

而org.apache.spark.scheduler.ShuffleMapTask#runTask则是写shuffle的结果,

[java] view plaincopy

  1. override def runTask(context: TaskContext): MapStatus = {
  2. // Deserialize the RDD using the broadcast variable.
  3. val ser = SparkEnv.get.closureSerializer.newInstance()
  4. val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
  5. ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  6. //此处的taskBinary即为在org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取得的
  7. metrics = Some(context.taskMetrics)
  8. var writer: ShuffleWriter[Any, Any] = null
  9. try {
  10. val manager = SparkEnv.get.shuffleManager
  11. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
  12. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) // 将rdd计算的结果写入memory或者disk
  13. return writer.stop(success = true).get
  14. } catch {
  15. case e: Exception =>
  16. if (writer != null) {
  17. writer.stop(success = false)
  18. }
  19. throw e
  20. } finally {
  21. context.markTaskCompleted()
  22. }


两个task都不要按照拓扑顺序调用rdd的compute来完成对partition的计算,不同的是ShuffleMapTask需要shuffle
write,以供child stage读取shuffle的结果。
对于这两个task都用到的taskBinary,即为在
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks序列化的task的广播变量取
得的。

时间: 2024-12-13 05:29:37

Spark技术内幕: Task向Executor提交的源码解析的相关文章

Spark技术内幕: Task向Executor提交的源代码解析

在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位并行计算的Task.我们并没有分析Stage中得Task是怎样生成而且终于提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMis

6.Spark streaming技术内幕 : Job动态生成原理与源码解析

原创文章,转载请注明:转载自 周岳飞博客(http://www.cnblogs.com/zhouyf/) Spark streaming 程序的运行过程是将DStream的操作转化成RDD的操作,Spark Streaming 和 Spark Core 的关系如下图(图片来自spark官网) Spark Streaming 会按照程序设定的时间间隔不断动态生成Job来处理输入数据,这里的Job生成是指将Spark Streaming 的程序翻译成Spark内核的RDD操作,翻译的过程并不会触发J

Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文主要讲了Java中BlockingQueue的源码 一.BlockingQueue介绍与常用方法 BlockingQueue是一个阻塞队列.在高并发场景是用得非常多的,在线程池中.如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去.队列的特性就是先进先出很容易理解,在java里头它的实现类主要有下图的几种,其中最常用到的是ArrayBl

Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJob org.apache.spark.scheduler.DAGScheduler#runJob org.apache.spark.scheduler.DAGScheduler#submitJob org.apache.spark.scheduler.DAGSchedulerEventProcess

Spark技术内幕:Shuffle Map Task运算结果的处理

Shuffle Map Task运算结果的处理 这个结果的处理,分为两部分,一个是在Executor端是如何直接处理Task的结果的:还有就是Driver端,如果在接到Task运行结束的消息时,如何对Shuffle Write的结果进行处理,从而在调度下游的Task时,下游的Task可以得到其需要的数据. Executor端的处理 在解析BasicShuffle Writer时,我们知道ShuffleMap Task在Executor上运行时,最终会调用org.apache.spark.sche

Spark技术内幕:Client,Master和Worker 通信源码解析

Spark的Cluster Manager可以有几种部署模式: Standlone Mesos YARN EC2 Local 在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并最终完成计算.具体阐述可以阅读<Spark:大数据的电花火石!>. 那么Standalone模式下,Client,Master和Worker是如何进行通信,注册并开启服务的呢? 1. node之间的IP

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure).Spark可以选用ZooKeeper来实现HA. ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来.由于

Spark技术内幕:Master的故障恢复

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢? 处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它

Spark技术内幕:Worker源码与架构解析

首先通过一张Spark的架构图来了解Worker在Spark中的作用和地位: Worker所起的作用有以下几个: 1. 接受Master的指令,启动或者杀掉Executor 2. 接受Master的指令,启动或者杀掉Driver 3. 报告Executor/Driver的状态到Master 4. 心跳到Master,心跳超时则Master认为Worker已经挂了不能工作了 5. 向GUI报告Worker的状态 说白了,Worker就是整个集群真正干活的.首先看一下Worker重要的数据结构: v