从源码剖析一个Spark WordCount Job执行的全过程

原文地址:http://mzorro.me/post/55c85d06e40daa9d022f3cbd

WordCount可以说是分布式数据处理框架的”Hello World”,我们可以以它为例来剖析一个Spark Job的执行全过程。

我们要执行的代码为:

sc.textFile("hdfs://...").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect

只有一行,很简单也很经典的代码。这里的collect作为一个action,将触发一个Job,现在我们从源码开始剖析这个Job执行的全部过程。我这次读的源码是Spark 1.4.1的release版本。

为了方便描述,我们把上面的代码先进行一下拆分,这样可以清晰的看到每一步生成的RDD及其依赖关系,并方便下面分析时进行引用:

val hadoopRDD0 = sc.textFile("hdfs://...") // HadoopRDD[0]

val mapPartitionsRDD1 = hadoopRDD0.flatMap(_.split(" ")) // MapPartitionsRDD[2]

val mapPartitionsRDD2 = mapPartitionsRDD1.map((_, 1)) // MapPartitionsRDD[2]

val shuffledRDD3 = mapPartitionsRDD2.reduceByKey(_+_) // ShuffledRDD[3]

shuffledRDD3.collect // action

1. collect触发Job

首先,collect调用了SparkContext上的runJob方法。这个方法是一个阻塞方法,会在Job完成之前一直阻塞等待,直到Job执行完成之后返回所得的结果:

RDD.collect

def collect(): Array[T] = withScope {

val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

Array.concat(results: _*)

}

需要注意的是这里传入了一个函数,这个函数就是这个Job的要执行的任务。后面我们可以看到,它将会被包装并序列化后发送到要执行它的executor上,并在要处理的RDD上的每个分区上被调用执行。

2. DAGScheduler提交Job

SparkContext的runJob被调用之后,这个Job的信息被递传给了SparkContext持有的一个DAGScheduler上。DAGScheduler本身维护着一个消息队列,在收到这个Job之后,将给自己的消息队列发送一个JobSubmitted消息。这个消息中包含了新生成的一个JobId, 触发action的RDD,经过清理后的闭包函数,要处理的各个分区的在RDD中的索引,以及一些其他信息。

DAGScheduler的消息队列在收到JobSubmitted消息后,将触发调用handleJobSubmitted方法。在这个方法中,首先会根据这个触发action的RDD的依赖信息计算出这个Job的所有Stage。在这个WordCount中,我们是在reduceByKey生成的shuffledRDD3(其生成的过程涉及到通用的combineByKey方法,具体可以参考这篇文章)上触发的action,所以我们的ResultStage所对应的finalRDD就是shuffledRDD3,ResultStage所要执行的就是shuffledRDD3的所有分区。shuffledRDD3有一个ShuffleDependency,指向mapPartitionsRDD2,据此ShuffleDependency会生成一个ShuffleMapStage,它是ResultStage的父Stage。

3. 根据继承关系分析Stages

在分析出所有的Stage之后,DAGScheduler会根据ResultStage创建出一个ActiveJob对象,用来表示这个活跃的Job。然后提交ResultStage,但是在真正执行这个Stage之前,先递归的判断它有没有父Stage,若有的话先提交它的父Stage,并将当前Stage加入等待队列;若没有父Stage,才会真正的开始执行这个Stage。等待队列中的Stage,会在父Stage都执行完成之后再被执行。

由此可以看出,在一个Job中,Stage之间必须按序执行,后一个Stage的执行将依赖前一个Stage的结果。一个Job只会有一个ResultStage,并且这个ResultStage一定会是整个Job的最后一个Stage,所以ResultStage执行的结束也就标志着整个Job的结束。

4. Task的创建和提交

按照之前的分析,我们的Job一共有两个Stage,一个ShuffleMapStage,一个ResultStage,并将先执行ShuffleMapStage。在执行Stage的时候,会按此Stage对应的RDD的分区数量,对应每一个分区创建一个Task。如果是ShuffleMapStage则创建ShuffleMapTask,如果是ResultStage则创建ResultTask。这些Task在后面将会被序列化后发到其他的executor上面去运行。

在这里分析一下每个Task包含哪些信息
两种Task都会包含的信息有 (1)当前Stage对应的RDD对象(轻量级) (2)当前Stage的ID (3)要处理的那个分区信息(轻量级),以及该任务可能的最优执行位置(例如,对于hdfs上的文件,HadoopRDD中会记录其每一个分区存储在集群的位置,并将这个位置通过依赖继承到其子RDD)

除此之外,ShuffleMapTask还包含了对应的ShuffleDependency的对象(这其中实际上有分区的方法,数据合并的方法等计算时所需的信息);ResultTask还包含了当前这个Job最终要执行在每个数据上的函数(在此情况下就是collect传给SparkContext的那个函数)。

在对每个要处理的分区创建出各个Task之后,DAGScheduler会将同一个Stage的各个Task合并成一个TaskSet,并将其提交给TaskScheduler。至此,调度这些Task的工作就交给了TaskScheduler来进行。

TaskScheduler在收到这个TaskSet之后,首先为其创建一个TaskSetManager,这个TaskSetManager将辅助任务的调度。然后TaskScheduler将会调用SchedulerBackend上的reviveOffers方法去申请可用的资源。

5. SchedulerBackend分配资源(executors)和发送Task

SchedulerBackend是一个接口,它在不同的部署模式下会有不同的实现(实际上TaskScheduler也是这样)。SchedulerBackend的作用是调度和控制整个集群里面的资源(我是这么理解的,这里的资源指的是可用的executors),当reviveOffers方法被调用后,它会将当前可用的所有资源信息,通过调用TaskScheduler的resourceOffers提供给TaskScheduler(实际上这个过程是通过另一个EndPoint类以消息队列的方式实现的,这样可以保证同时只会进行一个对资源的申请或释放过程)。

TaskScheduler在收到当前所有可用的资源信息后,会将这些资源信息按序提供给当前正在执行的多个TaskSet,每个TaskSet再根据这些资源信息将当前可以执行的Task序列化后包装到一个TaskDescription对象中返回(这个TaskDescription对象中也包含了这个任务将要运行在哪个executor上),最终通过TaskScheduler将所有当前的资源情况可以执行的Task对应的TaskDescription返回给SchedulerBackend。

SchedulerBackend这时才根据每个TaskDescription将executors资源真正的分配给这些Task,并记录已分配掉的资源和剩余的资源,然后将TaskDescription中序列化后的Task通过网络(Spark使用akka框架)发送给它对应的executor。

6. executor执行Task

集群中的executor在收到Task后,申请一个线程开始运行这个Task。这是整个Job中最核心的部分了,真正的计算都在这一步发生。首先将其反序列化,然后调用这个Task对象上的runTask方法。在这里对于ShuffleMapTask和ResultTask,runTask方法有着不同的实现,并将返回不同的内容。我们分别来分别分析。

对于ShuffleMapTask,runTask首先获取对应的RDD和ShuffleDependency。在这里对应的RDD是mapPartitionsRDD2,ShuffleDependency中则有着合并的计算信息。然后调用RDD的iterator方法获取一个对应分区数据的迭代器。如果当前RDD分区的数据已经在之前计算过了,则会直接去内存或磁盘中获取,否则在此时就会调用mapPartitionsRDD2的compute方法,根据其依赖去计算它的分区数据。如果ShuffleDependency中的mapSideCombine标记为true,就会将iterator方法返回的分区数据在这里(也就是map端)进行合并(此时要求ShuffleDependency中的aggregator不为空,aggregator中包含了如何将数据进行合并的信息)。然后根据ShuffleDependency中的partitioner(默认是一个HashPartitioner)计算出每条数据在其结果端(就是shuffleRDD3中)的分区,并将其写入到本地磁盘中对应的文件中去(在这里写入方法有多种实现方式,1.4.1的版本默认是用了SortShuffleManager,还有的其他实现是HashShuffleManager和UnsafeShuffleManager,具体的实现方法在此处就不详说了)。当分区的每条数据都处理完后,runTask会返回一个MapStatus,这其中包含了一个BlockManagerId(标记了这个任务被执行的位置,也就是Map后的数据存储的位置)以及每个结果分区(每个reduceId)的数据的大小信息。最后这个MapStatus将通过网络发回给driver,dirver将其记录。

ShuffleMapTask.runTask

override def runTask(context: TaskContext): MapStatus = {

// Deserialize the RDD using the broadcast variable.

val deserializeStartTime = System.currentTimeMillis()

val ser = SparkEnv.get.closureSerializer.newInstance()

val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](

ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)

var writer: ShuffleWriter[Any, Any] = null

try {

val manager = SparkEnv.get.shuffleManager

writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

return writer.stop(success = true).get

} catch {

case e: Exception =>

try {

if (writer != null) {

writer.stop(success = false)

}

} catch {

case e: Exception =>

log.debug("Could not stop writer", e)

}

throw e

}

}

对于ResultTask,runTask首先也是获取对应的RDD和要在数据上执行的函数func。在这里对应的RDD应该是shuffleRDD3,然后调用RDD上的iterator获取这个分区的数据,并将其传入func函数中,将func函数的返回值作为runTask的返回值返回。过程看似简单,实际上在shuffleRDD3上调用iterator时就对应了shuffle的reduce端的合并。从shuffleRDD3的compute方法的实现可以看出,它的每个分区的数据都要去执行了ShuffleMapTask的executor上面获取,所以会产生大量的网络流量和磁盘IO。这个过程就是MapReduce范式中的shuffle过程,这里面还有很多的细节我并没有详述,但是这个过程十分关键,它的实现效率直接决定了分布式大数据处理的效率。

ResultTask.runTask

override def runTask(context: TaskContext): U = {

// Deserialize the RDD and the func using the broadcast variables.

val deserializeStartTime = System.currentTimeMillis()

val ser = SparkEnv.get.closureSerializer.newInstance()

val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](

ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)

func(context, rdd.iterator(partition, context))

}

7. executor返回结果

在runTask计算结束返回数据后,executor将其返回的数据进行序列化,然后根据序列化后数据的大小进行判断:如果数据大与某个值,就将其写入本地的内存或磁盘(如果内存不够),然后将数据的位置blockId和数据大小封装到一个IndirectTaskResult中,并将其序列化;如果数据不是很大,则直接将其封装入一个DirectTaskResult并进行序列化。最终将序列化后的DirectTaskResult或者IndirectTaskResult递传给executor上运行的一个ExecutorBackend上(通过statusUpdate方法)。

ExecutorBackend如上面的SchedulerBackend有着相似的功能(实际上,对于local模式,这两个类都由一个LocalBackend实现),将结果封入一个StatusUpdate消息透传给一个对应的EndPoint类,EndPoint类中收到这个消息后将该消息再通过网络发送给driver。

8. driver接收executor返回的结果并释放资源

在driver端的SchedulerBackend收到这个StatusUpdate消息之后,将结果续传给TaskScheduler,并进行资源的释放,在释放资源后再调用一次reviveOffers,这样又可以重复上面所描述的过程,将释放出来的资源安排给其他的Task来执行。

9. TaskResultGetter解析并拉取结果

TaskScheduler在收到任务结果后,将这个任务标记为结束,然后使用一个TaskResultGetter类来进行结果的解析。TaskResultGetter将结果反序列化,判断如果其是一个DirectTaskResult则直接抽取出其中的结果;如果是一个IndirectTaskResult则需要根据其中的blockId信息去对应的机器上拉取结果。最终都是将结果拉取到driver的内存中(这就是我们最好不要在大数据集上执行类似collect的方法的原因,它会将所有的数据拉入driver的内存中,造成大量的内存开销,甚至内存不足)。然后TaskResultGetter会将拉取到的结果递交给TaskScheduler,TaskScheduler再将此结果递交给DAGScheduler。

10. 处理结果并在Job完成时返回

DAGScheduler在收到Task完成的消息后,先判断这完成的是一个什么任务。如果是一个ShuffleMapTask则需要将返回的结果(MapStatus)记录到driver中,并判断如果当前的ShuffleMapStage若是已经完成,则去提交下一个Stage。如果是一个ResultTask完成了, 则将其结果递交给JobWaiter,并标记这个任务以完成。

JobWaiter是DAGScheduler在最开始submitJob的时候创建的一个对象,用于阻塞等待任务的完成,并进行结果的处理。JobWaiter在每收到一个ResultTask的结果时,都将结果在resultHandler上执行。这个resultHandler则是由SparkContext传进来的一个函数,其作用是将数据放入一个数组中,这个数组最终将作为SparkContext.runJob方法的返回值,被最开始的collect方法接收然后返回。若JobWaiter收到了每个ResultTask的结果,则表示整个Job已经完成,此时就停止阻塞等待,于是SparkContext.runJob返回一个结果的数组,并由collect接收后返回给用户程序。

至此,一个Spark的WordCount执行结束。

总结

本文从源码的角度详细分析了一个Spark Job的整个执行、调度的过程,不过很多东西还只是浅尝辄止,并未完全深入。尽管如此,经过连续好几天的分析,我还是觉得收获颇丰,对Spark的实现原理有了更加深入的理解,甚至对MapReduce的编程范式以及其shuffle过程也增加了不少理解。PS:其实从一开始我到分析结束都是没有做任何记录的,只因为一直一知半解实在不知道如何来做记录,所以只是去查阅一些资料和使劲儿的阅读源码。在我自认为分析结束后,我才开始写这篇记录,但是在写的过程中我才发现我分析的过程有一些并不是很清晰,然后重新去看,才真正弄的比较清晰了。可见写博文是很重要的过程,不仅是将学到的知识分享出来,而且对自身的知识也有很好的加固作用。

时间: 2024-10-27 11:22:13

从源码剖析一个Spark WordCount Job执行的全过程的相关文章

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark

一个Python开源项目-腾讯哈勃沙箱源码剖析(上)

前言 2019年来了,2020年还会远吗? 请把下一年的年终奖发一下,谢谢... 回顾逝去的2018年,最大的改变是从一名学生变成了一位工作者,不敢说自己多么的职业化,但是正在努力往那个方向走. 以前想的更多是尝试,现在需要考虑的更多是落地.学校和公司还是有很大的不一样,学到了很多东西. 2019年了,新年新气象,给大家宣布一下"七夜安全博客"今年的规划: 1. 2019年不再接任何商业广告(文末腾讯广告除外),纯粹输出安全技术干货. 2. 2019年每周至少两篇原创图文,也就是说每个

豆瓣Redis解决方案Codis源码剖析:Dashboard

豆瓣Redis解决方案Codis源码剖析:Dashboard 1.不只是Dashboard 虽然名字叫Dashboard,但它在Codis中的作用却不可小觑.它不仅仅是Dashboard管理页面,更重要的是,它负责监控和指挥各个Proxy的负载均衡(数据分布和迁移).并且,所有API都以RESTFul接口的形式对外提供,供Proxy和codis-config(Codis的命令行工具)调用.下面就来看一下数据分布和迁移的代码执行流程. Dashboard涉及到的知识点比较多,包括Martini框架

Phaser实现源码剖析

在这里首先说明一下,由于Phaser在4.3代码里是存在,但并没有被开放出来供使用,但已经被本人大致研究了,因此也一并进行剖析. Phaser是一个可以重复利用的同步栅栏,功能上与CyclicBarrier和CountDownLatch相似,不过提供更加灵活的用法.也就是说,Phaser的同步模型与它们差不多.一般运用的场景是一组线程希望同时到达某个执行点后(先到达的会被阻塞),执行一个指定任务,然后这些线程才被唤醒继续执行其它任务. Phaser一般是定义一个parties数(parties一

【Java集合源码剖析】HashMap源码剖析

转载请注明出处:http://blog.csdn.net/ns_code/article/details/36034955 HashMap简介 HashMap是基于哈希表实现的,每一个元素是一个key-value对,其内部通过单链表解决冲突问题,容量不足(超过了阀值)时,同样会自动增长. HashMap是非线程安全的,只是用于单线程环境下,多线程环境下可以采用concurrent并发包下的concurrentHashMap. HashMap 实现了Serializable接口,因此它支持序列化,

转:【Java集合源码剖析】Vector源码剖析

转载请注明出处:http://blog.csdn.net/ns_code/article/details/35793865   Vector简介 Vector也是基于数组实现的,是一个动态数组,其容量能自动增长. Vector是JDK1.0引入了,它的很多实现方法都加入了同步语句,因此是线程安全的(其实也只是相对安全,有些时候还是要加入同步语句来保证线程的安全),可以用于多线程环境. Vector没有丝线Serializable接口,因此它不支持序列化,实现了Cloneable接口,能被克隆,实

菜鸟nginx源码剖析 框架篇(一) 从main函数看nginx启动流程(转)

俗话说的好,牵牛要牵牛鼻子 驾车顶牛,处理复杂的东西,只要抓住重点,才能理清脉络,不至于深陷其中,不能自拔.对复杂的nginx而言,main函数就是“牛之鼻”,只要能理清main函数,就一定能理解其中的奥秘,下面我们就一起来研究一下nginx的main函数. 1.nginx的main函数解读 nginx启动显然是由main函数驱动的,main函数在在core/nginx.c文件中,其源代码解析如下,涉及到的数据结构在本节仅指出其作用,将在第二节中详细解释. nginx main函数的流程图如下:

HashMap(2) 源码剖析(推荐)

今天看代码,想到去年发生的HashMap发生的CPU使用率100%的事件,转载下当时看的三个比较不错的博客(非常推荐) 参考:http://coolshell.cn/articles/9606.html   http://github.thinkingbar.com/hashmap-analysis/ http://developer.51cto.com/art/201102/246431.htm 在 Java 集合类中,使用最多的容器类恐怕就是 HashMap 和 ArrayList 了,所以