本期内容:
1、JobScheduler内幕实现
2、JobScheduler深度思考
JobScheduler是Spark Streaming的调度核心,地位相当于Spark Core上调度中心的DAG Scheduler,非常重要!
JobGenerator每隔Batch Duration时间会动态的生成JobSet提交给JobScheduler,JobScheduler接收到JobSet后,如何处理呢?
产生Job
/** Generate jobs and perform checkpoint for the given `time`. */private def generateJobs(time: Time) {// Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.SparkEnv.set(ssc.env)Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batchgraph.generateJobs(time) // generate jobs using allocated block} match {case Success(jobs) =>val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) }eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))}
处理产生的JobSet
def submitJobSet(jobSet: JobSet) {if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else {listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))jobSets.put(jobSet.time, jobSet)jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) }}
这里会为每个job生成一个新的JobHandler,交给jobExecutor运行。
这里最重要的处理逻辑是 job => jobExecutor.execute(new JobHandler(job)),也就是将每个 job 都在 jobExecutor 线程池中、用 new JobHandler 来处理
先来看JobHandler针对Job的主要处理逻辑:
var _eventLoop = eventLoopif (_eventLoop != null) { _eventLoop.post(JobStarted(job, clock.getTimeMillis()))// Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details.PairRDDFunctions.disableOutputSpecValidation.withValue(true) {job.run() } _eventLoop = eventLoopif (_eventLoop != null) { _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) }
也就是说,JobHandler除了做一些状态记录外,最主要的就是调用job.run()!这里就与我们在 DStream 生成 RDD 实例详解 里分析的对应起来了, 在ForEachDStream.generateJob(time)时,是定义了Job的运行逻辑,即定义了Job.func。而在JobHandler这里,是真正调用了Job.run()、将触发Job.func的真正执行!
def run() { _result = Try(func()) }
参考博客:http://lqding.blog.51cto.com/9123978/1773391
备注:
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580