定制班第6课

内容:

1,Spark Streaming Job生成深度思考

2,Spark Streaming Job生成源码解析

一、Spark Streaming Job生成深度思考

做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务。例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理。

JobGenerator构造的时候有一个核心的参数是jobScheduler,jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生成Job。这里面的Job就相当于Java中线程要处理的Runnable里面的业务逻辑封装。Spark的Job就是运行的一个作业。

Spark Streaming除了基于定时操作以外参数Job,还可以通过各种聚合操作,或者基于状态的操作。

val ssc = new StreamingContext(conf,Seconds(5)),Spark Streaming的触发器是以时间为单位的,storm是以事件为触发器,也就是基于一个又一个record。Spark Streaming基于时间,这个时间是Batch Duractions,如代码所示,每5秒钟JobGenerator都会产生Job,此时的Job是逻辑级别的,也就是说有这个Job,并且说这个Job具体该怎么去做,此时并没有执行。具体执行的话是交给底层的RDD的action去触发,此时的action也是逻辑级别的。底层物理级别的,Spark
Streaming他是基于DStream构建的依赖关系导致的Job是逻辑级别的,底层是基于RDD的逻辑级别的。从逻辑级别翻译成物理级别,最后一个操作肯定是RDD的action,但是并不想一翻译立马就触发job。这个时候怎么办? action触发作业,这个时候作为Runnable接口封装,他会定义一个方法,这个方法里面是基于DStream的依赖关系生成的RDD。翻译的时候是将DStream的依赖关系翻译成RDD的依赖关系,由于DStream的依赖关系最后一个是action级别的,翻译成RDD的时候,RDD的最后一个操作也应该是action级别的,如果翻译的时候直接执行的话,就直接生成了Job,就没有所谓的队列,所以会将翻译的事件放到一个函数中或者一个方法中,因此,如果这个函数没有指定的action触发作业是执行不了的。

Spark Streaming根据时间不断的去管理我们的生成的作业,所以这个时候我们每个作业又有action级别的操作,这个action操作是对DStream进行逻辑级别的操作,他生成每个Job放到队列的时候,他一定会被翻译为RDD的操作,那基于RDD操作的最后一个一定是action级别的,如果翻译的话直接就是触发action的话整个Spark Streaming的Job就不受管理了。因此我们既要保证他的翻译,又要保证对他的管理,把DStream之间的依赖关系转变为RDD之间的依赖关系,最后一个DStream使得action的操作,翻译成一个RDD之间的action操作,整个翻译后的内容他是一块内容,他这一块内容是放在一个函数体中的,这个函数体,他会函数的定义,这个函数由于他只是定义还没有执行,所以他里面的RDD的action不会执行,不会触发Job,当我们的JobScheduler要调度Job的时候,转过来在线程池中拿出一条线程执行刚才的封装的方法。

二、Spark Streaming Job生成源码解析

Spark 作业动态生成三大核心:JobGenerator: 负责Job生成。 JobSheduler:负责Job调度。 ReceiverTracker: 获取元数据。 JobScheduler的start方法被调用的时候,会启动JobGenerator的start方法。

/** Start generation of jobs */
def start(): Unit = synchronized {
//eventLoop是消息循环体,因为不断的生成Job
  if(eventLoop != null) return // generator has already been started

  //Call checkpointWriter here to initialize it before eventLoop uses it to avoid adeadlock.
  //See SPARK-10125
 checkpointWriter
//匿名内部类
 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
   override protected def onReceive(event: JobGeneratorEvent): Unit =processEvent(event)

   override protected def onError(e: Throwable): Unit = {
     jobScheduler.reportError("Error in job generator", e)
    }
  }
//调用start方法。
 eventLoop.start()

  if(ssc.isCheckpointPresent) {
   restart()
  }else {
   startFirstTime()
  }
}

EvenLoop: 的start方法被调用,首先会调用onstart方法。然后就启动线程。

/**
 * Anevent loop to receive events from the caller and process all events in theevent thread. It
 *will start an exclusive event thread to process all events.
 *
 *Note: The event queue will grow indefinitely. So subclasses should make sure`onReceive` can
 *handle events in time to avoid the potential OOM.
 */
private[spark] abstract classEventLoop[E](name: String) extends Logging {

 private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

 private val stopped = new AtomicBoolean(false)
//开启后台线程。
 private val eventThread = new Thread(name) {
   setDaemon(true)

   override def run(): Unit = {
     try {
//不断的从BlockQueue中拿消息。
       while (!stopped.get) {
//线程的start方法调用就会不断的循环队列,而我们将消息放到eventQueue中。
         val event = eventQueue.take()
         try {
//
           onReceive(event)
         } catch {
           case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) =>logError("Unexpected error in " + name, e)
              }
           }
         }
       }
     } catch {
       case ie: InterruptedException => // exit even if eventQueue is notempty
       case NonFatal(e) => logError("Unexpected error in " + name,e)
     }
    }

  }

  defstart(): Unit = {
   if (stopped.get) {
     throw new IllegalStateException(name + " has already beenstopped")
    }
   // Call onStart before starting the event thread to make sure it happensbefore onReceive

   onStart()
   eventThread.start()
  }

onReceive:不断的从消息队列中获得消息,一旦获得消息就会处理。 不要在onReceive中添加阻塞的消息,如果这样的话会不断的阻塞消息。 消息循环器一般都不会处理具体的业务逻辑,一般消息循环器发现消息以后都会将消息路由给其他的线程去处理。

/**
 *Invoked in the event thread when polling events from the event queue.
 *
 *Note: Should avoid calling blocking actions in `onReceive`, or the event threadwill be blocked
 *and cannot process events in time. If you want to call some blocking actions, runthem in
 *another thread.
 */
protected def onReceive(event: E): Unit

消息队列接收到事件后具体处理如下:

/** Processes all events */
private def processEvent(event:JobGeneratorEvent) {
 logDebug("Got event " + event)
 event match {
   case GenerateJobs(time) => generateJobs(time)
   case ClearMetadata(time) => clearMetadata(time)
   case DoCheckpoint(time, clearCheckpointDataLater) =>
     doCheckpoint(time, clearCheckpointDataLater)
   case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

基于Batch Duractions生成Job,并完成checkpoint。Job生成的5个步骤:

/** Generate jobs and perform checkpointfor the given `time`.  */
private def generateJobs(time: Time) {
  //Set the SparkEnv in this thread, so that job generation code can access theenvironment
  //Example: BlockRDDs are created in this thread, and it needs to accessBlockManager
  //Update: This is probably redundant after threadlocal stuff in SparkEnv has beenremoved.
 SparkEnv.set(ssc.env)
  Try{
//第一步:获取当前时间段里面的数据。根据分配的时间来分配具体要处理的数据。
   jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocatereceived blocks to batch
//第二步:生成Job,获取RDD的DAG依赖关系。在此基于DStream生成了RDD实例。
   graph.generateJobs(time) // generate jobs using allocated block
  }match {
   case Success(jobs) =>
//第三步:获取streamIdToInputInfos的信息。BacthDuractions要处理的数据,以及我们要处理的业务逻辑。
     val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
//第四步:将生成的Job交给jobScheduler
     jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
   case Failure(e) =>
     jobScheduler.reportError("Error generating jobs for time " +time, e)
  }
//第五步:进行checkpoint
 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
此时的outputStream是整个DStream中的最后一个DStream,也就是foreachDStream。

def generateJobs(time: Time): Seq[Job] = {
 logDebug("Generating jobs for time " + time)
  valjobs = this.synchronized {
   outputStreams.flatMap { outputStream =>
//根据最后一个DStream,然后根据时间生成Job.
     val jobOption = outputStream.generateJob(time)
     jobOption.foreach(_.setCallSite(outputStream.creationSite))
     jobOption
    }
  }
 logDebug("Generated " + jobs.length + " jobs for time" + time)
 jobs
}

此时的JobFunc就是我们前面提到的用函数封装了Job。generateJob基于给定的时间生成Spark Streaming 的Job,这个方法会基于我们的DStream的操作物化成了RDD,由此可以看出,DStream是逻辑级别的,RDD是物理级别的。

/**
 *Generate a SparkStreaming job for the given time. This is an internal methodthat
 *should not be called directly. This default implementation creates a job
 *that materializes the corresponding RDD. Subclasses of DStream may overridethis
 * togenerate their own jobs.
 */
private[streaming] def generateJob(time:Time): Option[Job] = {
 getOrCompute(time) match {
    caseSome(rdd) => {
     val jobFunc = () => {
       val emptyFunc = { (iterator: Iterator[T]) => {} }
//rdd => 就是RDD的依赖关系
       context.sparkContext.runJob(rdd, emptyFunc)
     }
//此时的
     Some(new Job(time, jobFunc))
    }
   case None => None
  }
}

Job这个类就代表了Spark业务逻辑,可能包含很多SparkJobs。

/**
 *Class representing a Spark computation. It may contain multiple Spark jobs.
 */
private[streaming]
class Job(val time: Time, func: () => _){
 private var _id: String = _
 private var _outputOpId: Int = _
 private var isSet = false
 private var _result: Try[_] = null
 private var _callSite: CallSite = null
 private var _startTime: Option[Long] = None
 private var _endTime: Option[Long] = None

  defrun() {
//调用func函数,此时这个func就是我们前面generateJob中的func
   _result = Try(func())
  }

此时put函数中的RDD是最后一个RDD,虽然触发Job是基于时间,但是也是基于DStream的action的。

/**
 *Get the RDD corresponding to the given time; either retrieve it from cache
 * orcompute-and-cache it.
 */
private[streaming] final defgetOrCompute(time: Time): Option[RDD[T]] = {
  //If RDD was already generated, then retrieve it from HashMap,
  //or else compute the RDD
//基于时间生成RDD
 generatedRDDs.get(time).orElse {
   // Compute the RDD if time is valid (e.g. correct time in a slidingwindow)
   // of RDD generation, else generate nothing.
   if (isTimeValid(time)) {

     val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps =false) {
       // Disable checks for existing output directories in jobs launched bythe streaming
       // scheduler, since we may need to write output to an existing directoryduring checkpoint
       // recovery; see SPARK-4835 for more details. We need to have this callhere because
       // compute() might cause Spark jobs to be launched.
       PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
//
         compute(time)
       }
     }
//然后对generated RDD进行checkpoint
     rddOption.foreach { case newRDD =>
       // Register the generated RDD for caching and checkpointing
       if (storageLevel != StorageLevel.NONE) {
         newRDD.persist(storageLevel)
         logDebug(s"Persisting RDD ${newRDD.id} for time $time to$storageLevel")
       }
       if (checkpointDuration != null && (time -zeroTime).isMultipleOf(checkpointDuration)) {
         newRDD.checkpoint()
         logInfo(s"Marking RDD ${newRDD.id} for time $time forcheckpointing")
       }
//以时间为Key,RDD为Value,此时的RDD为最后一个RDD
       generatedRDDs.put(time, newRDD)
     }
     rddOption
    }else {
     None
    }
  }
}

回到JobGenerator中的start方法。

  if(ssc.isCheckpointPresent) {
//如果不是第一次启动的话,就需要从checkpoint中恢复。
   restart()
  }else {
//否则的话,就是第一次启动。
   startFirstTime()
  }
}

StartFirstTime的源码如下:

/** Starts the generator for the first time*/
private def startFirstTime() {
  valstartTime = new Time(timer.getStartTime())
//告诉DStreamGraph第一个Batch启动时间。
 graph.start(startTime - graph.batchDuration)
//timer启动,整个job不断生成就开始了。
 timer.start(startTime.milliseconds)
 logInfo("Started JobGenerator at " + startTime)
}

这里的timer是RecurringTimer。RecurringTimer的start方法会启动内置线程thread。

private val timer = newRecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
 longTime => eventLoop.post(GenerateJobs(new Time(longTime))),"JobGenerator")

Timer.start源码如下:

/**
 *Start at the given start time.
 */
def start(startTime: Long): Long =synchronized {
 nextTime = startTime //每次调用的
 thread.start()
 logInfo("Started timer for " + name + " at time " +nextTime)
 nextTime
}
调用thread启动后台进程。

private val thread = newThread("RecurringTimer - " + name) {
 setDaemon(true)
 override def run() { loop }
}

loop源码如下:

/**
   *Repeatedly call the callback every interval.
   */
 private def loop() {
   try {
     while (!stopped) {
       triggerActionForNextInterval()
     }
     triggerActionForNextInterval()
    }catch {
     case e: InterruptedException =>
    }
  }
}

tiggerActionForNextInterval源码如下:

private def triggerActionForNextInterval():Unit = {
 clock.waitTillTime(nextTime)
 callback(nextTime)
 prevTime = nextTime
  +=period
 logDebug("Callback for " + name + " called at time "+ prevTime)
}
此时的callBack是RecurringTimer传入的。下面就去找callBack是谁传入的,这个时候就应该找RecurringTimer什么时候实例化的。

private[streaming]
class RecurringTimer(clock: Clock, period:Long, callback: (Long) => Unit, name: String)
 extends Logging {

 private val thread = new Thread("RecurringTimer - " + name) {
   setDaemon(true)
   override def run() { loop }
  }
在jobGenerator中,匿名函数会随着时间不断的推移反复被调用。

private val timer = newRecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
//匿名函数,复制给callback。
 longTime => eventLoop.post(GenerateJobs(new Time(longTime))),"JobGenerator")

而此时的eventLoop就是JobGenerator的start方法中eventLoop.eventLoop是一个消息循环体当收到generateJobs,就会将消息放到线程池中去执行。至此,就知道了基于时间怎么生成作业的流程就贯通了。 Jobs: 此时的jobs就是jobs的业务逻辑,就类似于RDD之间的依赖关系,保存最后一个job,然后根据依赖关系进行回溯。 streamIdToInputInfos:基于Batch Duractions以及要处理的业务逻辑,然后就生成了JobSet。

jobScheduler.submitJobSet(JobSet(time,jobs, streamIdToInputInfos))

此时的JobSet就包含了数据以及对数据处理的业务逻辑。

/** Class representing a set of Jobs
  *belong to the same batch.
  */
private[streaming]
case class JobSet(
   time: Time,
   jobs: Seq[Job],
   streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {

 private val incompleteJobs = new HashSet[Job]()
 private val submissionTime = System.currentTimeMillis() // when thisjobset was submitted
 private var processingStartTime = -1L // when the first job of thisjobset started processing
 private var processingEndTime = -1L // when the last job of this jobsetfinished processing

 jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
 incompleteJobs ++= jobs

  defhandleJobStart(job: Job) {
   if (processingStartTime < 0) processingStartTime =System.currentTimeMillis()
  }
submitJobSet:

def submitJobSet(jobSet: JobSet) {
  if(jobSet.jobs.isEmpty) {
   logInfo("No jobs added for time " + jobSet.time)
  }else {
   listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
//
   jobSets.put(jobSet.time, jobSet)
//jobHandler
   jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
   logInfo("Added jobs for time " + jobSet.time)
  }
}

JobHandle是一个Runnable接口,Job就是我们业务逻辑,代表的就是一系列RDD的依赖关系,job.run方法就导致了func函数的调用。

 private class JobHandler(job: Job) extends Runnable with Logging {
   import JobScheduler._

   def run() {
     try {
       val formattedTime = UIUtils.formatBatchTime(
         job.time.milliseconds, ssc.graph.batchDuration.milliseconds,showYYYYMMSS = false)
       val batchUrl =s"/streaming/batch/?id=${job.time.milliseconds}"
       val batchLinkText = s"[output operation ${job.outputOpId}, batchtime ${formattedTime}]"

       ssc.sc.setJobDescription(
         s"""Streaming job from[$batchLinkText]($batchUrl)""")
       ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY,job.time.milliseconds.toString)
       ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY,job.outputOpId.toString)

       // We need to assign `eventLoop` to a temp variable. Otherwise, because
       // `JobScheduler.stop(false)` may set `eventLoop` to null when thismethod is running, then
       // it's possible that when `post` is called, `eventLoop` happens tonull.
       var _eventLoop = eventLoop
       if (_eventLoop != null) {
         _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
         // Disable checks for existing output directories in jobs launched bythe streaming
         // scheduler, since we may need to write output to an existing directoryduring checkpoint
         // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true){
//
           job.run()
         }
         _eventLoop = eventLoop
         if (_eventLoop != null) {
           _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
         }
       } else {
         // JobScheduler has been stopped.
       }
     } finally {
       ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
       ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
     }
    }
  }
}

此时的func就是基于DStream的业务逻辑。也就是RDD之间依赖的业务逻辑。

def run() {
 _result = Try(func())
}

-------------------------------EOF---------------------------------------------------

说明:文章以DT大数据定制班为基础,并结合其他同学的优秀博文总结而成。

时间: 2024-10-15 11:50:34

定制班第6课的相关文章

Spark定制班第4课:Spark Streaming的Exactly-One的事务处理和不重复输出彻底掌握

本篇文章主要从二个方面展开: 本期内容 1 Exactly Once 2 输出不重复 1 Exactly Once 事务: 银行转帐为例,A用户转笔账给B用户,如果B用户没收到账,或者收到多笔账,都是破坏事务的一致性.事务处理就是,能够处理且只会处理一次,即A只转一次,B只收一次. 从事务视角解密SparkStreaming架构: SparkStreaming应用程序启动,会分配资源,除非整个集群硬件资源崩溃,一般情况下都不会有问题.SparkStreaming程序分成而部分,一部分是Drive

定制班第1课:通过案例对SparkStreaming 透彻理解三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析

从今天起,我们踏上了新的Spark学习旅途.我们的目标是要像Spark官方机构那样有能力去定制Spark版本. 我们最开始将从Spark Streaming着手. 为何从Spark Streaming切入Spark版本定制?Spark的子框架已有若干,为何选择Spark Streaming?让我们细细道来. Spark最开始只有Spark Core,没有目前的这些子框架.我们通过对一个框架的彻底研究,肯定可以精通Spark力量的源泉和所有问题的解决之道. 我们再看看目前的这些子框架.Spark

Spark定制班第2课:通过案例对Spark Streaming透彻理解三板斧之二:解密Spark Streaming运行机制和架构

本期内容: 1 解密Spark Streaming运行机制 2 解密Spark Streaming架构 1 解密Spark Streaming运行机制 我们看看上节课仍没有停下来的Spark Streaming程序运行留下的信息. 这个程序仍然在不断地循环运行.即使没有接收到新数据,日志中也不断循环显示着JobScheduler.BlockManager.MapPartitionsRDD.ShuffledRDD等等信息.这些都是Spark Core相关的信息.其循环的依据,也就是时间这个维度.

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

老男孩linux实战培训初级班第二次课前考试题

################################################################ 本文内容摘录于老男孩linux实战运维培训中心课前考试题(答案部分) 如有转载,请务必保留本文链接及版权信息. 欢迎广到运维兄弟一起交流linux/unix网站运维技术! 网站运维交流群:114580181 45039636 37081784  老男孩 QQ:31333741  mail:[email protected] ======================

微软实战训练营(X)重点班第(1)课:SOA必备知识之ASP.NET Web Service开发实战

微软实战训练营 上海交大(A)实验班.(X)重点班 内部课程资料 链接:http://pan.baidu.com/s/1jGsTjq2 密码:0wmf <微软实战训练营(X)重点班第(1)课:SOA必备知识之ASP.NET Web Service开发实战>微软实战训练营 上海交大(A)实验班.(X)重点班 .(E)英语口语班http://54peixun.com/MSTrainingCamp/index.html 微软实战训练营(X)重点班第(1)课:SOA必备知识之ASP.NET Web S

七月算法12月机器学习在线班---第二十次课笔记---深度学习--RNN

七月算法12月机器学习在线班---第二十次课笔记---深度学习--RNN 七月算法(julyedu.com)12月机器学习在线班学习笔记http://www.julyedu.com 循环神经网络 复习之前的知识点: 全连接前向网络: 学习出来的是函数 卷积网络:卷积操作,部分链接,共享操作,逐层提取原始图像的特征(语音,NLP) 学习出来的特征 局部相关性 浅层宽网络很难做成神经网络 ? 1.1状态和模型 1, ID数据 ·分类问题 ·回归问题 ·特征表达 2, 大部分数据都不满足ID ·大部分

spark版本定制课程-第1课

1.学习本课程可以自己动手改进spark,或者给spark增加功能.增加某些官方没有提供的功能,通过本课程希望早就一些顶级spark专家,根据整个社会的需要对spark进行扩展或者定制.2.通过前三课就可以对spark streaming透彻理解3.为什么要对spark streaming为切入点对spark进行定制? #spark最开始并没有streaming等其他框架,最开始就是很原始的spark core,要做自己源码定制版本,以streaming作为切入点,透过对此框架的研究,就可以掌握

3月机器学习在线班第六课笔记--信息熵与最大熵模型

原文:https://www.zybuluo.com/frank-shaw/note/108124 信息熵 信息是个很抽象的概念.人们常常说信息很多,或者信息较少,但却很难说清楚信息到底有多少.比如一本五十万字的中文书到底有多少信息量.直到1948年,香农提出了“信息熵”的概念,才解决了对信息的量化度量问题.(百度百科) 香农定义的信息熵的计算公式如下: H(X)=−∑p(xi)log(p(xi))    (i=1,2,…,n) 其中X 表示的是随机变量,随机变量的取值为(x1,x2,…,xn)