Spark 定制版:007~Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本讲内容:

a. JobScheduler内幕实现

b. JobScheduler深度思考

注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解。

上节回顾

上节课,我们以JobGenerator类为重心,为大家左右延伸,解密Job之动态生成;并总结出了Job之动态生成的三大核心:

a. JobGenerator: 负责Job生成

b. JobSheduler:负责Job调度

c. ReceiverTracker:获取元数据

如Job动态生成图:

开讲

由上节课我们可知:

JobScheduler是SparkStreaming 所有Job调度的中心,内部有两个重要的成员:JobGenerator负责Job的生成,ReceiverTracker负责记录输入的数据源信息。

JobScheduler的启动会导致ReceiverTracker和JobGenerator的启动。ReceiverTracker的启动导致运行在Executor端的Receiver启动并且接收数据,ReceiverTracker会记录Receiver接收到的数据meta信息。

JobGenerator的启动导致每隔BatchDuration,就调用DStreamGraph生成RDD Graph,并生成Job。

JobScheduler中的线程池来提交封装的JobSet对象(时间值,Job,数据源的meta)。Job中封装了业务逻辑,导致最后一个RDD的action被触发,被DAGScheduler真正调度在Spark集群上执行该Job。

因此可以说JobScheduler是整个Spark Streming的调度的核心,其地位相当于Spark Core中的DAGScheduler。因此,我们务必彻底掌握JobScheduler。

首先让我们来看看JobScheduler重要方法的源码跟踪步骤图:

下面我们来逐步解密JobScheduler的内幕吧。

上几节课中,我们在进行Spark Streaming开发的时候,会对DStream进行各种transform和action级别的操作,这些操作就构成DStream graph,也就是DStream 之间的依赖关系,随着时间的流逝,DStream graph会根据batchintaval时间间隔,产生RDD的DAG,然后进行job的执行。DStream的DStream graph是逻辑级别的,RDD的DAG是物理执行级别的。DStream是空间维度的层面,空间维度加上时间构成时空维度。

至此,JobScheduler是将逻辑级别的job物理的运行在Spark Core上。JobGenerator是产生逻辑级别的Job,使用JobScheduler将Job在线程池中运行。JobScheduler是在StreamingContext中进行实例化的,并在StreamingContext的start方法中开辟一条新的线程启动的。

大括号中的代码作为一个匿名函数在新的线程中执行。Sparkstreaming运行时至少需要两条线程,其中一条用于一直循环接收数据,现在所说的至少两条线程和上边开辟一条新线程运行scheduler.start()并没有关系。Sparkstreaming运行时至少需要两条线程是用于作业处理的,上边的代码开辟新的线程是在调度层面的中,不论Sparkstreaming程序运行时指定多少线程,这里都会开辟一条新线程,之间没有一点关系。

每一条线程都有自己私有的属性,在这里给新的线程设置私有的属性,这些属性不会影响主线程中的。

源码中代码的书写模式非常值得学习,以后看源码的时候就把它当做是一个普通的应用程序,从JVM的角度看,Spark就是一个分布式的应用程序。在以后的项目中要将调度和作业处理线程分开,也便于维护和优化。

JobScheduler在实例化的时候会实例化JobGenerator和线程池。

线程池中默认是有一条线程,当然可以在spark配置文件中配置或者使用代码在sparkconf中修改默认的线程数,在一定程度上增加默认线程数可以提高执行Job的效率,这也是一个性能调优的方法(尤其是在一个程序中有多个Job时)。

ReceiverTracker、JobGenerator在JobScheduler实例化的时候实例化。

来看一下JobScheduler.start的代码:

Eventloop是在调用JobGenerator的start方法时实例化。

在EventLoop的start方法中会回调onStart方法,一般在onStart方法中会执行一些准备性的代码,在JobSchedule中虽然并没有复写onStart方法,不过Spark Streaming框架在这里显然是为了代码的可扩展性考虑的,这是开发项目时需要学习的。

DStream的action级别的操作转过来还是会调用foreachRDD这个方法,生动的说明在对DStream操作的时候其实还是对RDD的操作。

上边代码中foreachFunc这个方法是对DStream action级别的方法的进一步封装;foreachRDD方法,转过来new ForEachDstream

注释中说的:将这个函数作用于这个DStream中的每一个RDD,这是一个输出操作,因此这个DStream会被注册成Outputstream,并进行物化。

ForEachDStream中很重要的一个函数generateJob。考虑时间维度和action级别,每个Duration都基于generateJob来生成作业。foreachFunc(rdd, time)这个方法就是对Dstream最后的操作 ,new Job(time, jobFunc)只是在RDD的基础上,加上时间维度的封装而已。这里的Job只是一个普通的对象,代表了一个spark的计算,调用Job的run方法时,真正的作业就触发了。foreachFunc(rdd, time)中的rdd其实就是通过DStreamGraph中最后一个DStream来决定的。

来看一下ForEachDStream.generateJob的代码:

Job是通过ForEachDstream的generateJob来生成的,值得注意的是在DStream的子类中,只有ForEachDstream重写了generateJob方法。

现在考虑一下ForEachDStream的generateJob方法是谁调用的?当然是JobGenerator。ForEachDstream的generateJob方法是静态的逻辑级别,他如果想要真正运行起来变成物理级别的这时候就需要JobGenerator。

现在就来看看JobGenerator的代码,JobGenerator中有一个定时器timer和消息循环体eventLoop,timer会基于batchInterval,一直向eventLoop中发送generateJobs的消息,进而导致processEvent方法->generateJobs方法的执行。

generateJobs方法的代码:

graph.generateJobs(time)这个方法的代码:

其中的outputStream.generateJob(time)中的outputStream就是前面说ForEachDstream,generateJob(time)方法就是ForEachDstream中的generateJob(time)方法。

这是从时间维度调用空间维度的东西,所以时空结合就转变成物理的执行了。

JobGenerator的generateJobs方法的代码:

基于graph.generateJobs产生job后,会封装成JobSet并提交给JobScheduler,JobSet(time, jobs, streamIdToInputInfos),其中streamIdToInputInfos就是接收的数据的元数据。

JobSet代表了一个batch duration中的一批jobs。就是一个普通对象,包含了未提交的jobs,提交的时间,执行开始和结束时间等信息。

JobSet提交给JobScheduler后,会放入jobSets数据结构中,jobSets.put(jobSet.time, jobSet) ,所以JobScheduler就拥有了每个batch中的jobSet.在线程池中进行执行。

在把job放入线程池中时,采用JobHandler进行封装。JobHandler是一个Runable接口的实例。

其中主要的代码就是job.run(),前面说过job.run()调用的就是Dstream的action级别的方法。

在job.run()前后会发送JobStarted和JobCompleted的消息,JobScheduler接收到这两个消息只是记录一下时间,通知一下job要开始执行或者执行完成,并没有过多的操作。

备注:

1、DT大数据梦工厂微信公众号DT_Spark

2、Spark大神级专家:王家林

3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-08-05 04:17:20

Spark 定制版:007~Spark Streaming源码解读之JobScheduler内幕实现和深度思考的相关文章

Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!           一.JobScheduler内幕实现 问:JobScheduler是在什么地方生成的? 答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:    

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1.JobScheduler内幕实现 2.JobScheduler深度思考 JobScheduler是Spark Streaming的调度核心,地位相当于Spark Core上调度中心的DAG Scheduler,非常重要! JobGenerator每隔Batch Duration时间会动态的生成JobSet提交给JobScheduler,JobScheduler接收到JobSet后,如何处理呢? 产生Job /** Generate jobs and perform checkpo

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

Spark Streaming源码解读之Job动态生成和深度思考

本博文主要包含以下内容: 1. Spark Streaming Job 生成深度思考 2 .Spark Streaming Job 生成源码解析 一 :Spark Streaming Job 生成深度思考 输入的DStream有很多来源Kafka.Socket.Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理

spark版本定制六:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 特别感谢王家林老师的独具一格的讲解: 王家林老师名片: 中国Spark第一人 新浪微博:http://weibo.com/ilovepains 微信公众号:DT_Spark 博客:http://blog.sina.com.cn/ilovepains QQ:1740415547 YY课堂:每天20:00现场授课频道68917580

第6课:Spark Streaming源码解读之Job动态生成和深度思考

上一节我们从总体上讲解了Spark Streaming job的运行机制.本节我们针对job如何生成进行详细的阐述,请看下图: 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /**  * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate  * the jobs and runs them using a thread pool.   

Spark版本定制第7天:JobScheduler内幕实现和深度思考

本期内容: 1 JobScheduler内幕实现 2 深度思考 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌握Spark streaming这个复杂的应用程序,

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式