(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容:

1、JobScheduler内幕实现

2、JobScheduler深度思考

JobScheduler是Spark Streaming的调度核心,地位相当于Spark Core上调度中心的DAG Scheduler,非常重要!

JobGenerator每隔Batch Duration时间会动态的生成JobSet提交给JobScheduler,JobScheduler接收到JobSet后,如何处理呢?

产生Job

/** Generate jobs and perform checkpoint for the given `time`.  */private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try {    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) =>      jobScheduler.reportError("Error generating jobs for time " + time, e)  }eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}

处理产生的JobSet

def submitJobSet(jobSet: JobSet) {if (jobSet.jobs.isEmpty) {    logInfo("No jobs added for time " + jobSet.time)  } else {listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))jobSets.put(jobSet.time, jobSet)jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))    logInfo("Added jobs for time " + jobSet.time)  }}

这里会为每个job生成一个新的JobHandler,交给jobExecutor运行。

这里最重要的处理逻辑是 job => jobExecutor.execute(new JobHandler(job)),也就是将每个 job 都在 jobExecutor 线程池中、用 new JobHandler 来处理

先来看JobHandler针对Job的主要处理逻辑:

var _eventLoop = eventLoopif (_eventLoop != null) {  _eventLoop.post(JobStarted(job, clock.getTimeMillis()))// Disable checks for existing output directories in jobs launched by the streaming  // scheduler, since we may need to write output to an existing directory during checkpoint  // recovery; see SPARK-4835 for more details.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {job.run()  }  _eventLoop = eventLoopif (_eventLoop != null) {    _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))  }

也就是说,JobHandler除了做一些状态记录外,最主要的就是调用job.run()!这里就与我们在 DStream 生成 RDD 实例详解 里分析的对应起来了, 在ForEachDStream.generateJob(time)时,是定义了Job的运行逻辑,即定义了Job.func。而在JobHandler这里,是真正调用了Job.run()、将触发Job.func的真正执行!

def run() {
_result = Try(func())
}

参考博客:http://lqding.blog.51cto.com/9123978/1773391

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

时间: 2024-11-07 04:57:46

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考的相关文章

Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!           一.JobScheduler内幕实现 问:JobScheduler是在什么地方生成的? 答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:    

Spark 定制版:007~Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本讲内容: a. JobScheduler内幕实现 b. JobScheduler深度思考 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课,我们以JobGenerator类为重心,为大家左右延伸,解密Job之动态生成:并总结出了Job之动态生成的三大核心: a. JobGenerator: 负责Job生成 b. JobSheduler:负责Job调度 c. ReceiverTracker:获取元数据 如Job动态生成图: 开讲 由上

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

spark版本定制六:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 特别感谢王家林老师的独具一格的讲解: 王家林老师名片: 中国Spark第一人 新浪微博:http://weibo.com/ilovepains 微信公众号:DT_Spark 博客:http://blog.sina.com.cn/ilovepains QQ:1740415547 YY课堂:每天20:00现场授课频道68917580

第6课:Spark Streaming源码解读之Job动态生成和深度思考

上一节我们从总体上讲解了Spark Streaming job的运行机制.本节我们针对job如何生成进行详细的阐述,请看下图: 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /**  * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate  * the jobs and runs them using a thread pool.   

Spark Streaming源码解读之Job动态生成和深度思考

本博文主要包含以下内容: 1. Spark Streaming Job 生成深度思考 2 .Spark Streaming Job 生成源码解析 一 :Spark Streaming Job 生成深度思考 输入的DStream有很多来源Kafka.Socket.Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

第15课:Spark Streaming源码解读之No Receivers彻底思考

本期内容: Direct Access Kafka 前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读.但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receive

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式