本期内容:
1 Job动态生成
2 深度思考
一切不能进行实时流处理的数据都是无效的数据。在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下。
Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序。如果可以掌握Spark streaming这个复杂的应用程序,那么其他的再复杂的应用程序都不在话下了。这里选择Spark Streaming作为版本定制的切入点也是大势所趋。
在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler
private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging
在JobScheduler中有两个重要的成员:JobGenerator和ReceiverTracker,而在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop。
当JobGenerator启动的时候会调用startFirstTime方法,当然如果不是第一次启动就会restart。
if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() }
在 这个方法中:会启动DStream和定时器
private def startFirstTime() { val startTime = new Time(timer.getStartTime()) graph.start(startTime - graph.batchDuration) timer.start(startTime.milliseconds) logInfo("Started JobGenerator at " + startTime) }
定时器负责在每一个时间间隔batchInterval,在EventLoop循环中发送一次消息。在EventLoop接收到消息后就会启动run方法在消息队列总来执行
override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => { try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } }
此时在消息中会执行generateJobs方法来不断地生成job,至此,job完成了生成的过程。
private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. SparkEnv.set(ssc.env) Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }
在job生成过程中主要包含了以下4个步骤
- 要求ReceiverTracker将目前已收到的数据进行一次allocate,即将上次batch切分后的数据切分到到本次新的batch里
- 要求DStreamGraph复制出一套新的 RDD DAG 的实例。整个DStreamGraph.generateJobs(time)遍历结束的返回值是Seq[Job]
- 将第2步生成的本 batch 的 RDD DAG,和第1步获取到的 meta 信息,一同提交给JobScheduler异步执行这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的meta信息。这三者包装为一个JobSet,然后调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler。这里的向JobScheduler提交过程与JobScheduler接下来在jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回。
- 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个checkpoint这里做checkpoint也只是异步提交一个DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的JobSet等实际运行时信息。
备注:
资料来源于:DT_大数据梦工厂(Spark发行版本定制)
更多私密内容,请关注微信公众号:DT_Spark
如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580