6.Spark streaming技术内幕 : Job动态生成原理与源码解析

原创文章,转载请注明:转载自 周岳飞博客(http://www.cnblogs.com/zhouyf/)

Spark streaming 程序的运行过程是将DStream的操作转化成RDD的操作,Spark Streaming 和 Spark Core 的关系如下图(图片来自spark官网)

Spark Streaming 会按照程序设定的时间间隔不断动态生成Job来处理输入数据,这里的Job生成是指将Spark Streaming 的程序翻译成Spark内核的RDD操作,翻译的过程并不会触发Job的运行,Spark Streaming 会将翻译的处理逻辑封装在Job对象中,最后会将Job提交到集群上运行。这就是Spark Streaming 运行的基本过程。下面详细介绍Job动态生成和提交过程。

首先,当SparkStreaming的start方法调用后,整个Spark Streaming 程序开始运行,按照指定的时间间隔生成Job并提交给集群运行,在生成Job的工程中主要核心对象有

1.JobScheduler

2.JobGenerator

3.DStreamGraph

4.DStream

其中, JobScheduler 负责启动JobGenerator生成Job,并提交生成的Job到集群运行,这里的Job不是在spark core 中提到的job,它只是作业运行的代码模板,是逻辑级别的,可以类比java线程中的Runnable接口实现,不是真正运行的作业, 它封装了由DStream转化而来的RDD操作.JobGenerator负责定时调用DStreamingGraph的generateJob方法生成Job和清理Dstream的元数据, DStreamGraph持有构成DStream图的所有DStream对象,并调用DStream的generateJob方法生成具体Job对象.DStream生成最终的Job交给JobScheduler 调度执行。整体过程如下图所示:

原创文章,转载请注明:转载自 周岳飞博客(http://www.cnblogs.com/zhouyf/)

下面结合源码分析每一步过程 (源码中黄色背景部分为核心逻辑代码,例如 : scheduler.start() ) :

首先,StreamingContext起动时调用start方法

  1. try {
  2. validate()
  3. // Start the streaming scheduler in a new thread, so that thread local properties
  4. // like call sites and job groups can be reset without affecting those of the
  5. // current thread.
  6. ThreadUtils.runInNewThread("streaming-start") {
  7. sparkContext.setCallSite(startSite.get)
  8. sparkContext.clearJobGroup()
  9. sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
  10. savedProperties.set(SerializationUtils.clone(
  11. sparkContext.localProperties.get()).asInstanceOf[Properties])
  12. scheduler.start()
  13. }
  14. state = StreamingContextState.ACTIVE
  15. } catch {
  16. case NonFatal(e) =>
  17. logError("Error starting the context, marking it as stopped", e)
  18. scheduler.stop(false)
  19. state = StreamingContextState.STOPPED
  20. throw e
  21. }

其中调用了scheduler的start方法,此处的scheduler 就是 org.apache.spark.streaming.scheduler.JobScheduler 对象,

StreamingContext持有org.apache.spark.streaming.scheduler.JobScheduler对象的引用。

下面看一下JobScheduler的start方法:

  1. eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
  2. override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
  3. override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  4. }
  5. eventLoop.start()
  6. // attach rate controllers of input streams to receive batch completion updates
  7. for {
  8. inputDStream <- ssc.graph.getInputStreams
  9. rateController <- inputDStream.rateController
  10. } ssc.addStreamingListener(rateController)
  11. listenerBus.start()
  12. receiverTracker = new ReceiverTracker(ssc)
  13. inputInfoTracker = new InputInfoTracker(ssc)
  14. executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
  15. ssc.sparkContext,
  16. receiverTracker,
  17. ssc.conf,
  18. ssc.graph.batchDuration.milliseconds,
  19. clock)
  20. executorAllocationManager.foreach(ssc.addStreamingListener)
  21. receiverTracker.start()
  22. jobGenerator.start()
  23. executorAllocationManager.foreach(_.start())
  24. logInfo("Started JobScheduler")

可以看到JobScheduler调用了jobGeneratorstart方法和eventLoop的start方法,eventLoop用来接收JobSchedulerEvent消息,并交给processEvent函数进行处理

代码如下:

  1. private def processEvent(event: JobSchedulerEvent) {
  2. try {
  3. event match {
  4. case JobStarted(job, startTime) => handleJobStart(job, startTime)
  5. case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
  6. case ErrorReported(m, e) => handleError(m, e)
  7. }
  8. } catch {
  9. case e: Throwable =>
  10. reportError("Error in job scheduler", e)
  11. }
  12. }

可以看到JobScheduler中的eventLoop只处理JobStarted,JobCompleted和ErrorReported 三类消息,这三类消息的处理不是Job动态生成的核心逻辑代码先略过,(注意:后面JobGenerator中也有个eventLoop不要和这里的eventLoop混淆。)

JobGenerator的start方法首先new了一个EventLoop对象eventLoop,并复写onReceive(),将收到的JobGeneratorEvent 消息交给 processEvent 方法处理.源码如下:

  1. /** Start generation of jobs */ def start(): Unit = synchronized { if (eventLoop != null) return // generator has already been started // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock. // See SPARK-10125 checkpointWriter eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = { jobScheduler.reportError("Error in job generator", e) } } eventLoop.start() if (ssc.isCheckpointPresent) { restart() } else { startFirstTime() } }

JobGenerator创建了eventLoop对象之后调用该对象的start方法,启动监听进程,准备接收JobGeneratorEvent类型消息交给processEvent函数处理,然后调用了startFirstTime方法,该方法启动DStreamGraph和定时器,定时器启动后根据程序设定的时间间隔给eventLoop对象发送GenerateJobs消息,如下图:

原创文章,转载请注明:转载自 周岳飞博客(http://zhou-yuefei.iteye.com/)

eventLoop对象收到 GenerateJobs 消息交个processEvent方法处理,processEvent收到该消息,调用generateJobs方法处理,源码如下:

  1. /** Generate jobs and perform checkpoint for the given `time`. */
  2. private def generateJobs(time: Time) {
  3. // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
  4. // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
  5. ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
  6. Try {
  7. jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
  8. graph.generateJobs(time) // generate jobs using allocated block
  9. } match {
  10. case Success(jobs) =>
  11. val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
  12. jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
  13. case Failure(e) =>
  14. jobScheduler.reportError("Error generating jobs for time " + time, e)
  15. }
  16. eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  17. }

JobGenerator中的generateJobs方法主要关注两行代码,首先调用graph的generateJobs方法,给方法返回Success(jobs) 或者 Failure(e),其中的jobs就是该方法返回的Job对象集合,如果Job创建成功,再调用JobScheduler的submitJobSet方法将job提交给集群执行。

首先分析Job对象的产生,DStreamGraph 的start方法源码:

  1. def generateJobs(time: Time): Seq[Job] = {
  2. logDebug("Generating jobs for time " + time)
  3. val jobs = this.synchronized {
  4. outputStreams.flatMap { outputStream =>
  5. val jobOption = outputStream.generateJob(time)
  6. jobOption.foreach(_.setCallSite(outputStream.creationSite))
  7. jobOption
  8. }
  9. }
  10. logDebug("Generated " + jobs.length + " jobs for time " + time)
  11. jobs
  12. }

DStreamGraph 的start方法源码调用了outputStream对象的generateJob方法,ForeachDStream重写了该方法:

  1. override def generateJob(time: Time): Option[Job] = {
  2. parent.getOrCompute(time) match {
  3. case Some(rdd) =>
  4. val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
  5. foreachFunc(rdd, time)
  6. }
  7. Some(new Job(time, jobFunc))
  8. case None => None
  9. }
  10. }

ForeachDStream的generateJob 将用户编写的DStream处理函数封装在jobFunc中,并将其传入Job对象,至此Job的生成。

接下来分析Job提交过程,JobScheduler负责Job的提交,核心代码在submitJobSet方法中:

  1. def submitJobSet(jobSet: JobSet) {
  2. if (jobSet.jobs.isEmpty) {
  3. logInfo("No jobs added for time " + jobSet.time)
  4. } else {
  5. listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
  6. jobSets.put(jobSet.time, jobSet)
  7. jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
  8. logInfo("Added jobs for time " + jobSet.time)
  9. }
  10. }

其中jobExecutor对象是一个线程池,JobHandler实现了Runnable接口,在JobHandler 的run方法中会调用传入的job对象的run方法。

疑问:Job的run方法执行是如何触发RDD的Action操作从而出发job的真正运行的呢?我们下次再具体分析,请随时关注博客更新!

原创文章,转载请注明:转载自 周岳飞博客(http://www.cnblogs.com/zhouyf/)

From WizNote

Attachment List

时间: 2024-10-25 05:16:42

6.Spark streaming技术内幕 : Job动态生成原理与源码解析的相关文章

Spark技术内幕: Task向Executor提交的源码解析

从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的. 如果一个Stage的所有的parent stage都已经计算完成或者存在于cache中,那么他会调用submitMissingTasks来提交该Stage所包含的Tasks. org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下: 首先得到RDD中需要计

Java并发编程与技术内幕:ArrayBlockingQueue、LinkedBlockingQueue及SynchronousQueue源码解析

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 摘要:本文主要讲了Java中BlockingQueue的源码 一.BlockingQueue介绍与常用方法 BlockingQueue是一个阻塞队列.在高并发场景是用得非常多的,在线程池中.如果运行线程数目大于核心线程数目时,也会尝试把新加入的线程放到一个BlockingQueue中去.队列的特性就是先进先出很容易理解,在java里头它的实现类主要有下图的几种,其中最常用到的是ArrayBl

Spark技术内幕:Stage划分及提交源码分析

当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJob org.apache.spark.scheduler.DAGScheduler#runJob org.apache.spark.scheduler.DAGScheduler#submitJob org.apache.spark.scheduler.DAGSchedulerEventProcess

JDK1.8 动态代理机制及源码解析

动态代理 a) jdk 动态代理 Proxy, 核心思想:通过实现被代理类的所有接口,生成一个字节码文件后构造一个代理对象,通过持有反射构造被代理类的一个实例,再通过invoke反射调用被代理类实例的方法,来实现代理. 缺点:被代理类必须实现一个或多个接口 参考链接:http://rejoy.iteye.com/blog/1627405 源码解析:见第四部分 cglib 动态代理 核心思想:通过生成子类字节码实现,代理类为每个委托方法都生成两个方法,以add方法为例,一个是重写的add方法,一个

Spark集群任务提交流程----2.1.0源码解析

Spark的应用程序是通过spark-submit提交到Spark集群上运行的,那么spark-submit到底提交了什么,集群是怎样调度运行的,下面一一详解. 0. spark-submit提交任务 0.1 启动脚本解析 分析spark-submit脚本源码可知最终该命令执行./bin/spark-class的Java类脚本,./bin/spark-class脚本启动的类是org.apache.spark.launcher.Main,在spark-submit模式下该类会启动SparkSubm

Scala 深入浅出实战经典 第65讲:Scala中隐式转换内幕揭秘、最佳实践及其在Spark中的应用源码解析

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-87讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2土豆:http://www.tudou.com/programs/view/NGgUD5FBQaA/优酷:http://v.youku.com/v_show/id_

[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

本课主题 Broadcast 运行原理图 Broadcast 源码解析 Broadcast 运行原理图 Broadcast 就是将数据从一个节点发送到其他的节点上; 例如 Driver 上有一张表,而 Executor 中的每个并行执行的Task (100万个Task) 都要查询这张表的话,那我们通过 Broadcast 的方式就只需要往每个Executor 把这张表发送一次就行了,Executor 中的每个运行的 Task 查询这张唯一的表,而不是每次执行的时候都从 Driver 中获得这张表

微信小程序动态生成保存二维码

起源:最近小程序需要涉及到一些推广方面的功能,所以要写一个动态生成二维码用户进行下载分享,写完之后受益良多,特此来分享一下: 一.微信小程序动态生成保存二维码 wxml: <canvas style="width: 350rpx;height: 350rpx;background:#f1f1f1;" canvas-id="mycanvas"/> js: // pages/qrcode/qrcode.js var QR = require("..

Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等(DT大数据梦工厂)

内容: 1.Spark 1.6 RPC解析: 2.RPCEnv源码解析: 3.RPCEndpoint等源码解析: 以前和现在的RPC都是采用Akka,以前和现在的不同就在于RPCEnv,现在就是基于RPCEnv去做RPC通信的 ==========Spark 1.6 RPC解析============ 1.Spark 1.6推出了以RPCEnv.RPCEndpoint.RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka: 2.Akka是基于Sc