5.Spark Streaming流计算框架的运行流程源码分析2

1 spark streaming 程序代码实例

代码如下:

[html] view plain copy

  1. object OnlineTheTop3ItemForEachCategory2DB {
  2. def main(args: Array[String]){
  3. val conf = new SparkConf() //创建SparkConf对象
  4. //设置应用程序的名称,在程序运行的监控界面可以看到名称
  5. conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")
  6. conf.setMaster("spark://Master:7077") //此时,程序在Spark集群
  7. //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
  8. val ssc = new StreamingContext(conf, Seconds(5))
  9. ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
  10. val soketDStream = ssc.socketTextStream("Master", 9999)
  11. /// 业务处理逻辑 .....
  12. ssc.start()
  13. ssc.awaitTermination()
  14. }
  15. }

2 Spark Streaming的运行源码分析

2.1 创建StreamingContext

 

我们将基于以上实例例,粗略地分析一下Spark源码,提示一些有针对性的内容,以了解其运行的主要流程。

1)代码没有直接使用SparkContext,而是使用StreamingContext。

我们来看看StreamingContext 的源码片段:

[html] view plain copy

  1. /**
  2. * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
  3. * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
  4. * @param batchDuration the time interval at which streaming data will be divided into batches
  5. */
  6. def this(conf: SparkConf, batchDuration: Duration) = {
  7. this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  8. }

没错,createNewSparkContext就是创建SparkContext:

[html] view plain copy

  1. private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
  2. new SparkContext(conf)
  3. }

这说明Spark Streaming也是Spark上的一个应用程序。

2)案例最开始的地方肯定要通过数据流创建一个InputDStream。

[html] view plain copy

  1. val socketDstram = ssc.socketTextStream("Master", 9999)

socketTextStream方法定义如下:

[html] view plain copy

  1. /**
  2. * Create a input stream from TCP source hostname:port. Data is received using
  3. * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
  4. * lines.
  5. * @param hostname      Hostname to connect to for receiving data
  6. * @param port          Port to connect to for receiving data
  7. * @param storageLevel  Storage level to use for storing the received objects
  8. *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
  9. */
  10. def socketTextStream(
  11. hostname: String,
  12. port: Int,
  13. storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  14. ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  15. socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  16. }

3)可看到代码最后面调用socketStream。

socketStream定义如下:

[html] view plain copy

  1. /**
  2. * Create a input stream from TCP source hostname:port. Data is received using
  3. * a TCP socket and the receive bytes it interepreted as object using the given
  4. * converter.
  5. * @param hostname      Hostname to connect to for receiving data
  6. * @param port          Port to connect to for receiving data
  7. * @param converter     Function to convert the byte stream to objects
  8. * @param storageLevel  Storage level to use for storing the received objects
  9. * @tparam T            Type of the objects received (after converting bytes to objects)
  10. */
  11. def socketStream[T: ClassTag](
  12. hostname: String,
  13. port: Int,
  14. converter: (InputStream) => Iterator[T],
  15. storageLevel: StorageLevel
  16. ): ReceiverInputDStream[T] = {
  17. new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  18. }

4)实际上生成SocketInputDStream。

SocketInputDStream类如下:

[html] view plain copy

  1. private[streaming]
  2. class SocketInputDStream[T: ClassTag](
  3. ssc_ : StreamingContext,
  4. host: String,
  5. port: Int,
  6. bytesToObjects: InputStream => Iterator[T],
  7. storageLevel: StorageLevel
  8. ) extends ReceiverInputDStream[T](ssc_) {
  9. def getReceiver(): Receiver[T] = {
  10. new SocketReceiver(host, port, bytesToObjects, storageLevel)
  11. }
  12. }

SocketInputDStream继承ReceiverInputDStream。

其中实现getReceiver方法,返回SocketReceiver对象。
总结一下SocketInputDStream的继承关系:

SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。  

5)DStream是生成RDD的模板,是逻辑级别,当达到Interval的时候这些模板会被batch data实例化成为RDD和DAG。

DStream的generatedRDDs:

[html] view plain copy

  1. // RDDs generated, marked as private[streaming] so that testsuites can access it
  2. @transient
  3. private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

DStream的getOrCompute:

[html] view plain copy

  1. /**
  2. * Get the RDD corresponding to the given time; either retrieve it from cache
  3. * or compute-and-cache it.
  4. */
  5. private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  6. // If RDD was already generated, then retrieve it from HashMap,
  7. // or else compute the RDD
  8. generatedRDDs.get(time).orElse {
  9. // Compute the RDD if time is valid (e.g. correct time in a sliding window)
  10. // of RDD generation, else generate nothing.
  11. if (isTimeValid(time)) {
  12. val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
  13. // Disable checks for existing output directories in jobs launched by the streaming
  14. // scheduler, since we may need to write output to an existing directory during checkpoint
  15. // recovery; see SPARK-4835 for more details. We need to have this call here because
  16. // compute() might cause Spark jobs to be launched.
  17. PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
  18. compute(time)
  19. }
  20. }
  21. rddOption.foreach { case newRDD =>
  22. // Register the generated RDD for caching and checkpointing
  23. if (storageLevel != StorageLevel.NONE) {
  24. newRDD.persist(storageLevel)
  25. logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
  26. }
  27. if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
  28. newRDD.checkpoint()
  29. logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
  30. }
  31. generatedRDDs.put(time, newRDD)
  32. }
  33. rddOption
  34. } else {
  35. None
  36. }
  37. }
  38. }

主要是生成RDD,再将生成的RDD放在HashMap中。具体生成RDD过程以后剖析。

目前大致讲了DStream和RDD这些核心概念在Spark Streaming中的使用。

2.2 启动StreamingContext

代码实例中的ssc.start() 方法启动StreamingContext,主要的逻辑发生在这个start方法中:

*  在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,

*  在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和

*  ReceiverTacker的start方法:

*

*  1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job

*  其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG

*  而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,

*  在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中

*  基于RDD的Action触发真正的作业的运行)

*

*  2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动

*  ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把

*  数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过

*  ReceivedBlockTracker来管理接受到的元数据信息.

体现Spark Streaming应用运行流程的关键类如下图所示。

下面开启神奇的 源码分析之旅,过程痛苦,痛苦之后是大彻大悟的畅快...........

 

 

1)先看看ScreamingContext的start()。

start()方法启动StreamContext,由于Spark应用程序不能有多个SparkContext对象实例,所以Spark Streaming框架在启动时对状态进行判断。代码如下:

[html] view plain copy

  1. /**
  2. * Start the execution of the streams.
  3. *
  4. * @throws IllegalStateException if the StreamingContext is already stopped.
  5. */
  6. def start(): Unit = synchronized {
  7. state match {
  8. case INITIALIZED =>
  9. startSite.set(DStream.getCreationSite())
  10. StreamingContext.ACTIVATION_LOCK.synchronized {
  11. StreamingContext.assertNoOtherContextIsActive()
  12. try {
  13. validate()
  14. // Start the streaming scheduler in a new thread, so that thread local properties
  15. // like call sites and job groups can be reset without affecting those of the
  16. // current thread.
  17. //线程本地存储,线程有自己的私有属性,设置这些线程的时候不会影响其他线程,
  18. ThreadUtils.runInNewThread("streaming-start") {
  19. sparkContext.setCallSite(startSite.get)
  20. sparkContext.clearJobGroup()
  21. sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
  22. //启动JobScheduler
  23. scheduler.start()
  24. }
  25. state = StreamingContextState.ACTIVE
  26. } catch {
  27. case NonFatal(e) =>
  28. logError("Error starting the context, marking it as stopped", e)
  29. scheduler.stop(false)
  30. state = StreamingContextState.STOPPED
  31. throw e
  32. }
  33. StreamingContext.setActiveContext(this)
  34. }
  35. shutdownHookRef = ShutdownHookManager.addShutdownHook(
  36. StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
  37. // Registering Streaming Metrics at the start of the StreamingContext
  38. assert(env.metricsSystem != null)
  39. env.metricsSystem.registerSource(streamingSource)
  40. uiTab.foreach(_.attach())
  41. logInfo("StreamingContext started")
  42. case ACTIVE =>
  43. logWarning("StreamingContext has already been started")
  44. case STOPPED =>
  45. throw new IllegalStateException("StreamingContext has already been stopped")
  46. }
  47. }

初始状态时,会启动JobScheduler。

2)接着来看下JobScheduler的启动过程start()。

其中启动了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多项工作。

[html] view plain copy

  1. def start(): Unit = synchronized {
  2. if (eventLoop != null) return // scheduler has already been started
  3. logDebug("Starting JobScheduler")
  4. eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
  5. override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
  6. override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  7. }
  8. // 启动消息循环处理线程。用于处理JobScheduler的各种事件。
  9. eventLoop.start()
  10. // attach rate controllers of input streams to receive batch completion updates
  11. for {
  12. inputDStream <- ssc.graph.getInputStreams
  13. // rateController可以控制输入速度
  14. rateController <- inputDStream.rateController
  15. } ssc.addStreamingListener(rateController)
  16. // 启动监听器。用于更新Spark UI中StreamTab的内容。
  17. listenerBus.start(ssc.sparkContext)
  18. receiverTracker = new ReceiverTracker(ssc)
  19. // 生成InputInfoTracker。用于管理所有的输入的流,以及他们输入的数据统计。这些信息将通过 StreamingListener监听。
  20. inputInfoTracker = new InputInfoTracker(ssc)
  21. // 启动ReceiverTracker。用于处理数据接收、数据缓存、Block生成。
  22. receiverTracker.start()
  23. // 启动JobGenerator。用于DStreamGraph初始化、DStream与RDD的转换、生成Job、提交执行等工作。
  24. jobGenerator.start()
  25. logInfo("Started JobScheduler")
  26. }

3)JobScheduler中的消息处理函数processEvent。

处理三类消息:Job已开始,Job已完成,错误报告。

[html] view plain copy

  1. private def processEvent(event: JobSchedulerEvent) {
  2. try {
  3. event match {
  4. case JobStarted(job, startTime) => handleJobStart(job, startTime)
  5. case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
  6. case ErrorReported(m, e) => handleError(m, e)
  7. }
  8. } catch {
  9. case e: Throwable =>
  10. reportError("Error in job scheduler", e)
  11. }
  12. }

4)我们再粗略地分析一下JobScheduler.start()中启动的工作。

4.1)先看JobScheduler.start()启动的第一项工作EventLoop。

EventLoop用于处理JobScheduler的各种事件。

EventLoop中有事件队列:

[html] view plain copy

  1. private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

还有一个线程处理队列中的事件:

[html] view plain copy

  1. private val eventThread = new Thread(name) {
  2. setDaemon(true)
  3. override def run(): Unit = {
  4. try {
  5. while (!stopped.get) {
  6. val event = eventQueue.take()
  7. try {
  8. onReceive(event)
  9. } catch {
  10. case NonFatal(e) => {
  11. try {
  12. onError(e)
  13. } catch {
  14. case NonFatal(e) => logError("Unexpected error in " + name, e)
  15. }
  16. }
  17. }
  18. }
  19. } catch {
  20. case ie: InterruptedException => // exit even if eventQueue is not empty
  21. case NonFatal(e) => logError("Unexpected error in " + name, e)
  22. }
  23. }
  24. }

这个线程中的onReceive、onError,在JobScheduler中的EventLoop实例化时已定义。

4.2)JobScheduler.start()启动的第二项工作StreamListenerBus。

- 用于异步传递StreamingListenerEvents到注册的StreamingListeners。

- 用于更新Spark UI中StreamTab的内容。

4.3)看JobScheduler.start()启动的第三项工作ReceiverTracker。

ReceiverTracker用于管理所有的输入的流,以及他们输入的数据统计。

这些信息将通过 StreamingListener监听。

ReceiverTracker的start()中,会内部实例化ReceiverTrackerEndpoint这个Rpc消息通信体。

 1 def start(): Unit = synchronized {
 2   if (isTrackerStarted) {
 3     throw new SparkException("ReceiverTracker already started")
 4   }
 5
 6   if (!receiverInputStreams.isEmpty) {
 7     endpoint = ssc.env.rpcEnv.setupEndpoint(
 8       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
 9     if (!skipReceiverLaunch) launchReceivers()
10     logInfo("ReceiverTracker started")
11     trackerState = Started
12   }
13 }

在ReceiverTracker启动的过程中会调用其launchReceivers方法:

/**
 * Get the receivers from the ReceiverInputDStreams, distributes them to the
 * worker nodes as a parallel collection, and runs them.
 */
private def launchReceivers(): Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
  })
  runDummySparkJob()
  logInfo("Starting " + receivers.length + " receivers")
  endpoint.send(StartAllReceivers(receivers))
}
 

其中调用了runDummySparkJob方法来启动Spark Streaming的框架第一个Job,其中collect这个action操作会触发Spark Job的执行。这个方法是为了确保每个Slave都注册上,避免所有Receiver都在一个节点,使后面的计算能负载均衡。

/**
 * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
 * receivers to be scheduled on the same node.
 *
 * TODO Should poll the executor number and wait for executors according to
 * "spark.scheduler.minRegisteredResourcesRatio" and
 * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
 */
private def runDummySparkJob(): Unit = {
  if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
  }
  assert(getExecutors.nonEmpty)
}
ReceiverTracker.launchReceivers()还调用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信体发送StartAllReceivers消息。
ReceiverTrackerEndpoint它自己接收到消息后,先根据调度策略获得Recevier在哪个Executor上运行,然后在调用startReceiver(receiver, executors)方法,来启动Receiver。
override def receive: PartialFunction[Any, Unit] = {
  // Local messages
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
      val executors = scheduledLocations(receiver.streamId)
      updateReceiverScheduledExecutors(receiver.streamId, executors)
      receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
      startReceiver(receiver, executors)
    }
 

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的。而在startReceiverFunc方法中实例化ReceiverSupervisorImpl对象,该对象是对Receiver进行管理和监控。这个Job是Spark Streaming框架为我们启动的第二个Job,且一直运行。因为supervisor.awaitTermination()该方法会阻塞等待退出。

/**
 * Start a receiver along with its scheduled executors
 */
private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
  def shouldStartReceiver: Boolean = {

    // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........

  // Function to start the receiver on the worker node
  val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
      if (!iterator.hasNext) {
        throw new SparkException(
          "Could not start receiver as object not found.")
      }
      if (TaskContext.get().attemptNumber() == 0) {
        val receiver = iterator.next()
        assert(iterator.hasNext == false)
        //实例化Receiver监控者
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      } else {
        // It‘s restarted by TaskScheduler, but we want to reschedule it again. So exit it.
      }
    }

  // Create the RDD using the scheduledLocations to run the receiver in a Spark job
  val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
      ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
      val preferredLocations = scheduledLocations.map(_.toString).distinct
      ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }

  receiverRDD.setName(s"Receiver $receiverId")
  ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
  ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
  val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD,
   startReceiverFunc, //提交Job时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的
  Seq(0), (_, _) => Unit, ())

  // 一直重启 receiver job直到 ReceiverTracker is stopped
  future.onComplete {
    case Success(_) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
    case Failure(e) =>
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
      } else {
        logError("Receiver has been stopped. Try to restart it.", e)
        logInfo(s"Restarting Receiver $receiverId")
        self.send(RestartReceiver(receiver))
      }
  }(submitJobThreadPool)
  logInfo(s"Receiver ${receiver.streamId} started")
}
 

接下来看下ReceiverSupervisorImpl的启动过程,先启动所有注册上的BlockGenerator对象,然后向ReceiverTrackerEndpoint发送RegisterReceiver消息,再调用receiver的onStart方法。

/** Start the supervisor */
def start() {
  onStart()
  startReceiver()
}

其中的onStart():启动所有注册上的BlockGenerator对象

override protected def onStart() {
  registeredBlockGenerators.foreach { _.start() }
}

其中的startReceiver()方法中调用onReceiverStart()然后再调用receiver的onStart方法。

/** Start receiver */
def startReceiver(): Unit = synchronized {
  try {
    if (onReceiverStart()) {
      logInfo("Starting receiver")
      receiverState = Started
      receiver.onStart()
      logInfo("Called receiver onStart")
    } else {
      // The driver refused us
      stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    }
  } catch {
    case NonFatal(t) =>
      stop("Error starting receiver " + streamId, Some(t))
  }
}

在onReceiverStart()中向ReceiverTrackerEndpoint发送RegisterReceiver消息

override protected def onReceiverStart(): Boolean = {
  val msg = RegisterReceiver(
    streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
  trackerEndpoint.askWithRetry[Boolean](msg)
}

其中在Driver运行的ReceiverTrackerEndpoint对象接收到RegisterReceiver消息后,将streamId, typ, host, executorId, receiverEndpoint封装为ReceiverTrackingInfo保存到内存对象receiverTrackingInfos这个HashMap中。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  // Remote messages
  case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
      registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
 

registerReceiver方法源码:

/** Register a receiver */
private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
  ): Boolean = {
  if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
  }

    // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........

  if (!isAcceptable) {
    // Refuse it since it‘s scheduled to a wrong executor
    false
  } else {
    val name = s"${typ}-${streamId}"
    val receiverTrackingInfo = ReceiverTrackingInfo(
      streamId,
      ReceiverState.ACTIVE,
      scheduledLocations = None,
      runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
      name = Some(name),
      endpoint = Some(receiverEndpoint))
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true
  }
}

Receiver的启动,以ssc.socketTextStream("localhost", 9999)为例,创建的是SocketReceiver对象。内部启动一个线程来连接Socket Server,读取socket数据并存储。

private[streaming]
class SocketReceiver[T: ClassTag](
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends Receiver[T](storageLevel) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
        // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
  }
}

4.4)接下来看JobScheduler.start()中启动的第四项工作JobGenerator。

JobGenerator有成员RecurringTimer,用于启动消息系统和定时器。按照batchInterval时间间隔定期发送GenerateJobs消息。

//根据创建StreamContext时传入的batchInterval,定时发送GenerateJobs消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

JobGenerator的start()方法:
/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }

  // 启动消息循环处理线程
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    // 开启定时生成Job的定时器
    startFirstTime()
  }
}

JobGenerator.start()中的startFirstTime()的定义:

/** Starts the generator for the first time */
private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}

JobGenerator.start()中的processEvent()的定义:

[html] view plain copy

  1. /** Processes all events */
  2. private def processEvent(event: JobGeneratorEvent) {
  3. logDebug("Got event " + event)
  4. event match {
  5. case GenerateJobs(time) => generateJobs(time)
  6. case ClearMetadata(time) => clearMetadata(time)
  7. case DoCheckpoint(time, clearCheckpointDataLater) =>
  8. doCheckpoint(time, clearCheckpointDataLater)
  9. case ClearCheckpointData(time) => clearCheckpointData(time)
  10. }
  11. }

其中generateJobs的定义:

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {

    // 根据特定的时间获取具体的数据
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //调用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

DStreamGraph的generateJobs方法,调用输出流的generateJob方法来生成Jobs集合。

// 输出流:具体Action的输出操作
private val outputStreams = new ArrayBuffer[DStream[_]]()

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

来看下DStream的generateJob方法,调用getOrCompute方法来获取当Interval的时候,DStreamGraph会被BatchData实例化成为RDD,如果有RDD则封装jobFunc方法,里面包含context.sparkContext.runJob(rdd, emptyFunc),然后返回封装后的Job。

[html] view plain copy

  1. /**
  2. * Generate a SparkStreaming job for the given time. This is an internal method that
  3. * should not be called directly. This default implementation creates a job
  4. * that materializes the corresponding RDD. Subclasses of DStream may override this
  5. * to generate their own jobs.
  6. */
  7. private[streaming] def generateJob(time: Time): Option[Job] = {
  8. getOrCompute(time) match {
  9. case Some(rdd) => {
  10. val jobFunc = () => {
  11. val emptyFunc = { (iterator: Iterator[T]) => {} }
  12. context.sparkContext.runJob(rdd, emptyFunc)
  13. }
  14. Some(new Job(time, jobFunc))
  15. }
  16. case None => None
  17. }
  18. }

接下来看JobScheduler的submitJobSet方法,向线程池中提交JobHandler。而JobHandler实现了Runnable 接口,最终调用了job.run()这个方法。看一下Job类的定义,其中run方法调用的func为构造Job时传入的jobFunc,其包含了context.sparkContext.runJob(rdd, emptyFunc)操作,最终导致Job的提交。

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

JobHandler实现了Runnable 接口,最终调用了job.run()这个方法:

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {

         //  *********** 此处省略无关代码 *******************

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it‘s possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }
}

Job的代码片段:

[html] view plain copy

  1. private[streaming]
  2. class Job(val time: Time, func: () => _) {
  3. private var _id: String = _
  4. private var _outputOpId: Int = _
  5. private var isSet = false
  6. private var _result: Try[_] = null
  7. private var _callSite: CallSite = null
  8. private var _startTime: Option[Long] = None
  9. private var _endTime: Option[Long] = None
  10. def run() {
  11. _result = Try(func())
  12. }

以上是主要源码的分析,累死宝宝了,......慢慢的成就感

时间: 2024-10-05 04:58:32

5.Spark Streaming流计算框架的运行流程源码分析2的相关文章

Spark Streaming实时计算框架介绍

http://www.cnblogs.com/Leo_wl/p/3530464.html 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API.基于内存的高速执行引擎,用户可以结合流式.批处理和交互试查询应用.本文将详细介绍Spark Streaming实时计算框架的原理与特点.适用场景. Spar

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

基于案例贯通 Spark Streaming 流计算框架的运行源码

本期内容 : Spark Streaming+Spark SQL案例展示 基于案例贯穿Spark Streaming的运行源码 一. 案例代码阐述 : 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等. 1.案例运行代码 : import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCont

spark版本定制五:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 一.在线动态计算分类最热门商品案例回顾与演示 案例回顾: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

Spark 定制版:005~贯通Spark Streaming流计算框架的运行源码

本讲内容: a. 在线动态计算分类最热门商品案例回顾与演示 b. 基于案例贯通Spark Streaming的运行源码 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课主要从事务视角为大家探索Spark Streaming架构机制:Spark Streaming程序分成而部分,一部分是Driver,另外一部分是Executor.通过对Driver和Executor解析,洞察怎么才能完成完整的语义.事务一致性,并保证数据的零丢失,Exa

Spark发行版笔记5:贯通Spark Streaming流计算框架的运行源码

本章节内容: 一.在线动态计算分类最热门商品案例回顾 二.基于案例贯通Spark Streaming的运行源码 先看代码(源码场景:用户.用户的商品.商品的点击量排名,按商品.其点击量排名前三): package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext impo