Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容:

1,JobScheduler内幕实现

2,JobScheduler深度思考

摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!

         

一、JobScheduler内幕实现

问:JobScheduler是在什么地方生成的?

答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:

      private[streaming] val scheduler = new JobScheduler(this)

问:Spark Streaming为啥要设置两条线程? 
答:setMaster指定的两条线程是指程序运行的时候至少需要两条线程。一条线程用于接收数据,需要不断的循环。另一条是处理线程,是我们自己指定的线程数用于作业处理。如StreamingContext的start()方法所示:

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.            //Spark Streaming内部启动的线程,用于整个作业的调度
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

进入JobScheduler源码:

/**   JobScheduler负责逻辑层面的Job,并将其物理级别的运行在Spark之上
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {

  //通过JobSet集合,不断地存放接收到的Job
  private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]  //设置并行度,默认为1,想要修改作业运行的并行度在spark-conf或者应用程序中修改此值就中  为什么要修改并发度呢?  答:有时候应用程序中有多个输出,会导致多个job的执行,都是在一个batchDurations里面,job之间执行无需互相等待,所以可以通过设置此值并发执行!     不同的Batch,线程池中有很多的线程,也可以并发运行!
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)  //将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")  //实例化JobGenerator
  private val jobGenerator = new JobGenerator(this)
  val clock = jobGenerator.clock
  val listenerBus = new StreamingListenerBus()
     //下面三个是说在JobScheduler启动时实例化
  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null
  // A tracker to track all the input stream information as well as processed record number
  var inputInfoTracker: InputInfoTracker = null

  private var eventLoop: EventLoop[JobSchedulerEvent] = null

  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }

二、JobScheduler深度思考

下面从应用程序的输出方法print()入手,反推Job的生成过程:

1.点击应用程序中的print()方法后,跳入DStream的print():

/**
 * Print the first ten elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(): Unit = ssc.withScope {
  print(10)
}

2.再次点击上面红线标记的print()方法:

/**
 * Print the first num elements of each RDD generated in this DStream. This is an output
 * operator, so this DStream will be registered as an output stream and there materialized.
 */
def print(num: Int): Unit = ssc.withScope {
  def foreachFunc: (RDD[T], Time) => Unit = {
    (rdd: RDD[T], time: Time) => {
      val firstNum = rdd.take(num + 1)
      // scalastyle:off println
      println("-------------------------------------------")
      println("Time: " + time)
      println("-------------------------------------------")
      firstNum.take(num).foreach(println)
      if (firstNum.length > num) println("...")
      println()
      // scalastyle:on println
    }
  }
  foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

从图中红色标记的代码可以得出:SparkStreaming最终执行的时候还是对RDD进行各种逻辑级别的操作!

3.再次点击图上的foreachRDD进入foreachRDD方法:

/**
 * Apply a function to each RDD in this DStream. This is an output operator, so
 * ‘this‘ DStream will be registered as an output stream and therefore materialized.
 * @param foreachFunc foreachRDD function
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           in the `foreachFunc` to be displayed in the UI. If `false`, then
 *                           only the scopes and callsites of `foreachRDD` will override those
 *                           of the RDDs on the display.
 */
private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

4.点击上图的ForEachDStream进入ForEachDStream类并找到了generateJob方法:

/**
 * An internal DStream used to represent output operations like DStream.foreachRDD.
 * @param parent        Parent DStream
 * @param foreachFunc   Function to apply on each RDD generated by the parent DStream
 * @param displayInnerRDDOps Whether the detailed callsites and scopes of the RDDs generated
 *                           by `foreachFunc` will be displayed in the UI; only the scope and
 *                           callsite of `DStream.foreachRDD` will be displayed.
 */
private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None
  //根据时间间隔不断的产生Job
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {          //基于时间生成的RDD,由于是输出,所以是最后一个RDD,接下来我们只要找出哪儿调用ForEachDStream的generateJob方法,就能知道job最终的生成
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

5.上一讲中我们得出了如下的流程:

streamingcontext.start-->jobscheduler.start-->receiverTracker.start()-->JobGenterator.start()-->EventLoop-->processEvent()-->generateJobs()-->jobScheduler.receiverTracker.allocateBlocksToBatch(time)-->graph.generateJobs(time)  

 其中最后的graph.generateJobs是DSTreamGraph的方法,进入之:

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    //此时的outputStream就是forEachDStream
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}
private val outputStreams = new ArrayBuffer[DStream[_]]()

通过查看DStream的子类继承结构和上面的ForEachDStream的generateJob方法,得出DStream的子类中只有ForEachDStream override了DStream的generateJob!最终得出结论:
真正Job的生成是通过ForeachDStream的generateJob来生成的,此时的job是逻辑级别的,真正被物理级别的调用是在JobGenerator中generateJob方法中:
/** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

进入jobScheduler.submitJobSet方法:

//将逻辑级别的Job转化为物理级别的job就是通过newDaemonFixedThreadPool线程实现的
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

至此,整个job的生成、执行就非常清晰了,最后总结如下:

从上一讲中,我们得知JobScheduler包含两个核心组件JobGenerator和ReceiverTracker,它们分别负责Job的生成和源数据的接收,

ReceiverTracker启动后会导致运行在Executor端的Receiver启动并且接收数据,ReceiverTracker会记录Receiver接收到的数据meta信息,  

JobGenerator的启动导致每隔BatchDuration,就调用DStreamGraph生成RDD Graph,并生成Job,

JobScheduler中的线程池来提交封装的JobSet对象(时间值,Job,数据源的meta)。Job中封装了业务逻辑,导致最后一个RDD的action被触发,

被DAGScheduler真正调度在Spark集群上执行该Job。

特别感谢王家林老师的独具一格的讲解:

王家林老师名片:

中国Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公众号:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY课堂:每天20:00现场授课频道68917580

时间: 2024-10-12 19:39:01

Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考的相关文章

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1.JobScheduler内幕实现 2.JobScheduler深度思考 JobScheduler是Spark Streaming的调度核心,地位相当于Spark Core上调度中心的DAG Scheduler,非常重要! JobGenerator每隔Batch Duration时间会动态的生成JobSet提交给JobScheduler,JobScheduler接收到JobSet后,如何处理呢? 产生Job /** Generate jobs and perform checkpo

Spark 定制版:007~Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本讲内容: a. JobScheduler内幕实现 b. JobScheduler深度思考 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课,我们以JobGenerator类为重心,为大家左右延伸,解密Job之动态生成:并总结出了Job之动态生成的三大核心: a. JobGenerator: 负责Job生成 b. JobSheduler:负责Job调度 c. ReceiverTracker:获取元数据 如Job动态生成图: 开讲 由上

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

spark版本定制六:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 特别感谢王家林老师的独具一格的讲解: 王家林老师名片: 中国Spark第一人 新浪微博:http://weibo.com/ilovepains 微信公众号:DT_Spark 博客:http://blog.sina.com.cn/ilovepains QQ:1740415547 YY课堂:每天20:00现场授课频道68917580

Spark Streaming源码解读之Job动态生成和深度思考

本博文主要包含以下内容: 1. Spark Streaming Job 生成深度思考 2 .Spark Streaming Job 生成源码解析 一 :Spark Streaming Job 生成深度思考 输入的DStream有很多来源Kafka.Socket.Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理

第6课:Spark Streaming源码解读之Job动态生成和深度思考

上一节我们从总体上讲解了Spark Streaming job的运行机制.本节我们针对job如何生成进行详细的阐述,请看下图: 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /**  * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate  * the jobs and runs them using a thread pool.   

Spark版本定制第7天:JobScheduler内幕实现和深度思考

本期内容: 1 JobScheduler内幕实现 2 深度思考 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌握Spark streaming这个复杂的应用程序,

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式