第6课:Spark Streaming源码解读之Job动态生成和深度思考

上一节我们从总体上讲解了Spark Streaming job的运行机制。本节我们针对job如何生成进行详细的阐述,请看下图:

在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler:

/**
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
  * 这个类调度jobs在Spark上运行,它使用JobGenerator产生jobs,并且使用线程池来运行jobs
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging

JobScheduler有两个非常重要的成员:

  • JobGenerator
  • ReceiverTracker

JobScheduler 将每个batch的RDD DAG的具体生成工作委托给JobGenerator,将源头数据输入的记录工作委托给ReceiverTracker 。

在JobGenerator中有两个至关重要的成员就是RecurringTimer和EventLoop;RecurringTimer它控制了job的触发。每到batchInterval时间,就往EventLoop的队列中放入一个消息。而EventLoop则不断的查看消息队列,一旦有消息就处理;

在Spark Streaming应用程序中都会调用

ssc.start() //ssc 代表StreamingContext

这将隐含的导致一系列的模块的启动:

ssc.start()

--> scheduler.start()

--> jobGenerator.start()

我们来具体的看看JobGenerator.start()的代码:

def start(): Unit = synchronized {
  ... 
  eventLoop.start()  //启动RPC处理线程

  if (ssc.isCheckpointPresent) {
    restart()       // 如果不是第一次启动,就从Checkpoint中恢复
  } else {
    startFirstTime() //第一次启动
  }
}

在startFirstTime中将DStreamGraph、定时器启动

private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}

定时器RecurringTimer启动后,使用线程每到一个新的batchInterval,就会向EventLoop中发生一个消息。

private def triggerActionForNextInterval(): Unit = {
  clock.waitTillTime(nextTime)
  callback(nextTime)
  prevTime = nextTime
  nextTime += period
  logDebug("Callback for " + name + " called at time " + prevTime)
}

这里的callback函数就是RecurringTimer初始化时传入的匿名函数:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

当EventLoop收到消息后:

override def run(): Unit = {
  try {
    while (!stopped.get) {
      val event = eventQueue.take()
      try {
        onReceive(event)
      } catch {
        case NonFatal(e) => {
          try {
            onError(e)
          } catch {
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
      }
    }
  } catch {
    case ie: InterruptedException => // exit even if eventQueue is not empty
    case NonFatal(e) => logError("Unexpected error in " + name, e)
  }
}

不断的去处理事件:

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

这里调用generateJobs方法:

private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

这段代码异常精悍,包含了JobGenerator主要工作4个步骤

  1. 要求ReceiverTracker将目前已收到的数据进行一次allocate,即将上次batch切分后的数据切分到到本次新的batch里
  2. 要求DStreamGraph复制出一套新的 RDD DAG 的实例。整个DStreamGraph.generateJobs(time)遍历结束的返回值是Seq[Job]
  3. 将第2步生成的本 batch 的 RDD DAG,和第1步获取到的 meta 信息,一同提交给JobScheduler异步执行这里我们提交的是将 (a) time (b) Seq[job] (c) 块数据的meta信息。这三者包装为一个JobSet,然后调用JobScheduler.submitJobSet(JobSet)提交给JobScheduler。这里的向JobScheduler提交过程与JobScheduler接下来在jobExecutor里执行过程是异步分离的,因此本步将非常快即可返回。
  4. 只要提交结束(不管是否已开始异步执行),就马上对整个系统的当前运行状态做一个checkpoint这里做checkpoint也只是异步提交一个DoCheckpoint消息请求,不用等 checkpoint 真正写完成即可返回这里也简单描述一下 checkpoint 包含的内容,包括已经提交了的、但尚未运行结束的JobSet等实际运行时信息。

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-08-03 08:32:51

第6课:Spark Streaming源码解读之Job动态生成和深度思考的相关文章

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

Spark Streaming源码解读之Job动态生成和深度思考

本博文主要包含以下内容: 1. Spark Streaming Job 生成深度思考 2 .Spark Streaming Job 生成源码解析 一 :Spark Streaming Job 生成深度思考 输入的DStream有很多来源Kafka.Socket.Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理

spark版本定制六:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 特别感谢王家林老师的独具一格的讲解: 王家林老师名片: 中国Spark第一人 新浪微博:http://weibo.com/ilovepains 微信公众号:DT_Spark 博客:http://blog.sina.com.cn/ilovepains QQ:1740415547 YY课堂:每天20:00现场授课频道68917580

(版本定制)第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1.JobScheduler内幕实现 2.JobScheduler深度思考 JobScheduler是Spark Streaming的调度核心,地位相当于Spark Core上调度中心的DAG Scheduler,非常重要! JobGenerator每隔Batch Duration时间会动态的生成JobSet提交给JobScheduler,JobScheduler接收到JobSet后,如何处理呢? 产生Job /** Generate jobs and perform checkpo

Spark 定制版:007~Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本讲内容: a. JobScheduler内幕实现 b. JobScheduler深度思考 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课,我们以JobGenerator类为重心,为大家左右延伸,解密Job之动态生成:并总结出了Job之动态生成的三大核心: a. JobGenerator: 负责Job生成 b. JobSheduler:负责Job调度 c. ReceiverTracker:获取元数据 如Job动态生成图: 开讲 由上

Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!           一.JobScheduler内幕实现 问:JobScheduler是在什么地方生成的? 答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:    

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

第15课:Spark Streaming源码解读之No Receivers彻底思考

本期内容: Direct Access Kafka 前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读.但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receive

15、Spark Streaming源码解读之No Receivers彻底思考

在前几期文章里讲了带Receiver的Spark Streaming 应用的相关源码解读,但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receivers,我们直接操作数据来源,这其实是一种更自然的方式