Spark技术内幕: Shuffle详解(三)

前两篇文章写了Shuffle Read的一些实现细节。但是要想彻底理清楚这里边的实现逻辑,还是需要更多篇幅的;本篇开始,将按照Job的执行顺序,来讲解Shuffle。即,结果数据(ShuffleMapTask的结果和ResultTask的结果)是如何产生的;结果是如何处理的;结果是如何读取的。

在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend。它在接收到LaunchTask的命令后,通过在Driver创建SparkContext时已经创建的org.apache.spark.executor.Executor的实例的launchTask,启动Task:

  deflaunchTask(
     context: ExecutorBackend, taskId: Long, taskName: String,serializedTask: ByteBuffer) {
   val tr = new TaskRunner(context, taskId, taskName, serializedTask)
   runningTasks.put(taskId, tr)
   threadPool.execute(tr) // 开始在executor中运行
  }

最终Task的执行是在org.apache.spark.executor.Executor.TaskRunner#run。org.apache.spark.executor.ExecutorBackend是Executor与Driver通信的接口,它实际上是一个trait:

private[spark] trait ExecutorBackend {
  defstatusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
}

TaskRunner会将Task执行的状态汇报给Driver(org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor)。 而Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。

在Executor运行Task时,得到计算结果会存入org.apache.spark.scheduler.DirectTaskResult。在将结果回传到Driver时,会根据结果的大小有不同的策略:对于“较大”的结果,将其以taskid为key存入org.apache.spark.storage.BlockManager;如果结果不大,那么直接回传给Driver。那么如何判定这个阈值呢?

这里的回传是直接通过akka的消息传递机制。因此这个大小首先不能超过这个机制设置的消息的最大值。这个最大值是通过spark.akka.frameSize设置的,单位是Bytes,默认值是10MB。除此之外,还有200KB的预留空间。因此这个阈值就是conf.getInt("spark.akka.frameSize", 10) * 1024 *1024 – 200KB。

       // directSend = sending directly back to the driver
       val (serializedResult, directSend) = {
         if (resultSize >=akkaFrameSize - AkkaUtils.reservedSizeBytes) { //如果结果太大,那么存入BlockManager
           val blockId = TaskResultBlockId(taskId)
           env.blockManager.putBytes(
              blockId, serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)
           (ser.serialize(new IndirectTaskResult[Any](blockId)), false)
         } else { // 如果大小合适,则直接发送结果给Driver
           (serializedDirectResult, true)
         }
       }
       execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

TaskRunner将Task的执行状态汇报给Driver后,Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。而在这里不同的状态有不同的处理:

1.    如果类型是TaskState.FINISHED,那么调用org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask进行处理。

2.    如果类型是TaskState.FAILED或者TaskState.KILLED或者TaskState.LOST,调用org.apache.spark.scheduler.TaskResultGetter#enqueueFailedTask进行处理。对于TaskState.LOST,还需要将其所在的Executor标记为failed, 并且根据更新后的Executor重新调度。

enqueueSuccessfulTask的逻辑也比较简单,就是如果是IndirectTaskResult,那么需要通过blockid来获取结果:sparkEnv.blockManager.getRemoteBytes(blockId);如果是DirectTaskResult,那么结果就无需远程获取了。然后调用

1.    org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask

2.    org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask

3.    org.apache.spark.scheduler.DAGScheduler#taskEnded

4.    org.apache.spark.scheduler.DAGScheduler#eventProcessActor

5.    org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion

进行处理。核心逻辑都在第5个调用栈。如果task是ResultTask,处理逻辑比较简单,停止job,更新一些状态,发送一些event即可。

    if (!job.finished(rt.outputId)){
        job.finished(rt.outputId) =true
        job.numFinished += 1
        // If the whole job hasfinished, remove it
        if (job.numFinished ==job.numPartitions) {
          markStageAsFinished(stage)
         cleanupStateForJobAndIndependentStages(job)
          listenerBus.post(SparkListenerJobEnd(job.jobId,JobSucceeded))
        }

        // taskSucceeded runs someuser code that might throw an exception.
        // Make sure we areresilient against that.
        try {
         job.listener.taskSucceeded(rt.outputId, event.result)
        } catch {
          case e: Exception =>
            // TODO: Perhaps we wantto mark the stage as failed?
           job.listener.jobFailed(new SparkDriverExecutionException(e))
        }
    }

如果task是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游的Stage,以便于其可以作为下游Stage的输入。这个机制是怎么实现的?

实际上,对于ShuffleMapTask来说,其结果实际上是org.apache.spark.scheduler.MapStatus;其序列化后存入了DirectTaskResult或者IndirectTaskResult中。而DAGScheduler#handleTaskCompletion通过下面的方式来获取这个结果:

val status =event.result.asInstanceOf[MapStatus]

通过将这个status注册到org.apache.spark.MapOutputTrackerMaster,就实现了

    mapOutputTracker.registerMapOutputs(
                 stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list=> if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
时间: 2024-10-03 13:39:47

Spark技术内幕: Shuffle详解(三)的相关文章

Spark技术内幕:Client,Master和Worker 通信源码解析

Spark的Cluster Manager可以有几种部署模式: Standlone Mesos YARN EC2 Local 在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并最终完成计算.具体阐述可以阅读<Spark:大数据的电花火石!>. 那么Standalone模式下,Client,Master和Worker是如何进行通信,注册并开启服务的呢? 1. node之间的IP

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure).Spark可以选用ZooKeeper来实现HA. ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来.由于

Spark技术内幕: Task向Executor提交的源代码解析

在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓扑,即须要依照顺序计算的Stage,Stage中包括了能够以partition为单位并行计算的Task.我们并没有分析Stage中得Task是怎样生成而且终于提交到Executor中去的. 这就是本文的主题. 从org.apache.spark.scheduler.DAGScheduler#submitMis

前端技术之_CSS详解第一天

前端技术之_CSS详解第一天 一html部分 略.... 二.列表 列表有3种 2.1 无序列表 无序列表,用来表示一个列表的语义,并且每个项目和每个项目之间,是不分先后的. ul就是英语unordered list,“无序列表”的意思. li 就是英语list item , “列表项”的意思. 你会发现,这是我们学习的第一个“组标签”,就是要么不写,要么就要写一组. <ul><li>北京</li><li>上海</li><li>广州&

华为设备二层交换技术——MSTP协议详解

前面提到的STP协议以及Cisco的私有协议PVST+都属于单生成树(SST)协议,也就是对于支持多VLAN的设备只能运行单一的生成树.可以参考博文:Cisco设备二层交换技术--STP协议详解 MSTP是IEEE 802.1s中提出的一种STP和VLAN结合使用的新协议,它既继承了RSTP端口快速迁移的优点,又解决了RSTP中不同VLAN必须运行在同一棵生成树上的问题.接下来我们详细了解一下MSTP协议. MSTP协议是一个公有的生成树协议,在实际生产环境中得到了广泛的应用. 一.MSTP概述

php学习之道:WSDL详解(三)

通过声明方式定义绑定(binding)属性 如果你在服务中采用SOAP binding,你可以使用JAX-WS来指定一定数量的属性binding.这些属性指定对应你在WSDL中指定的属性.某些设置,比如参数类型,可以约束你实现的方法,这些设置也影响声明的效用. @SOAPBinding声明,定义在javax.jws.soap.SOAPBinding接口中.它提供发布时的SOAP Binding细节.如果@SOAPBinding没有被指定,则用缺省的doc/literal SOAPBinding.

Spark技术内幕:Master的故障恢复

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现  详细阐述了使用ZK实现的Master的HA,那么Master是如何快速故障恢复的呢? 处于Standby状态的Master在接收到org.apache.spark.deploy.master.ZooKeeperLeaderElectionAgent发送的ElectedLeader消息后,就开始通过ZK中保存的Application,Driver和Worker的元数据信息进行故障恢复了,它

UINavigationController详解三(转)ToolBar

原文出自:http://blog.csdn.net/totogo2010/article/details/7682641,特别感谢. 1.显示Toolbar  在RootViewController.m的- (void)viewDidLoad方法中添加代码,这样Toobar就显示出来了. [cpp] view plaincopy [self.navigationController  setToolbarHidden:NO animated:YES]; 2.在ToolBar上添加UIBarBut