Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容:

一、数据接受架构和设计模式

二、接受数据的源码解读

Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑。

Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报。

因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片)。

思考Spark Streaming接收数据:

不断有循环器接收数据,接收数据要存储数据,将存储数据后需要汇报给Deriver,接收数据和存储数据不应该给同一个对象进行处理。

Spark Streaming接收数据从设计模式来讲是MVC的架构:

V:就是Driver

M:就是Receiver

C:就是ReceiverSupervisor

因为:

Receiver就是接收数据器,例如:可以从socketTextStream中获取数据。

ReceiverSupervisor就是存储数据的控制器,因为Receiver是通过ReceiverSupervisor来启动的,反过来讲Receiver在接收到数据后是通过ReceiverSupervisor来存储数据的。

然后将存储后的元数据汇报给Driver端。

V:就是Driver,操作元数据通过元数据指针,根据指针地址操作其他机器上具体数据内容,并将处理结果展示出来。

所以说:

Spark Streaming数据接收全生命周期可以看成是一个MVC模式,ReceiverSupervisor相当于是控制器(C),Receiver(M)、Driver(V)

源码分析:

1、Receiver类:

/** * :: DeveloperApi :: * Abstract class of a receiver that can be run on worker nodes to receive external data. A * custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()` * should define the setup steps necessary to start receiving data, * and `onStop()` should define the cleanup steps necessary to stop receiving data. * Exceptions while receiving can be handled either by restarting the receiver with `restart(...)` * or stopped completely by `stop(...)` or * * A custom receiver in Scala would look like this. * * {{{ *  class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) { *      def onStart() { *          // Setup stuff (start threads, open sockets, etc.) to start receiving data. *          // Must start new thread to receive data, as onStart() must be non-blocking. * *          // Call store(...) in those threads to store received data into Spark‘s memory. * *          // Call stop(...), restart(...) or reportError(...) on any thread based on how *          // different errors needs to be handled. * *          // See corresponding method documentation for more details *      } * *      def onStop() { *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. *      } *  } * }}} * * A custom receiver in Java would look like this. * * {{{ * class MyReceiver extends Receiver<String> { *     public MyReceiver(StorageLevel storageLevel) { *         super(storageLevel); *     } * *     public void onStart() { *          // Setup stuff (start threads, open sockets, etc.) to start receiving data. *          // Must start new thread to receive data, as onStart() must be non-blocking. * *          // Call store(...) in those threads to store received data into Spark‘s memory. * *          // Call stop(...), restart(...) or reportError(...) on any thread based on how *          // different errors needs to be handled. * *          // See corresponding method documentation for more details *     } * *     public void onStop() { *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data. *     } * } * }}} */@DeveloperApiabstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

2、ReceiverSupervisor类:
/** * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. */private[streaming] abstract class ReceiverSupervisor(    receiver: Receiver[_],    conf: SparkConf  ) extends Logging {

ReceiverTracker发送一个个Job,每个Job有一个task,每个task中有一个ReceiverSupervisor,用于启动每个Receiver的,看ReceiverTracker的start方法:

/**  * 管理receiver的:启动、执行、重新启动  * 确定所有的输入流记录,有成员记录所有输入来源  * 需要输入流,为每个输入流启动一个receiver * This class manages the execution of the receivers of ReceiverInputDStreams. Instance of * this class must be created after all input streams have been added and StreamingContext.start() * has been called because it needs the final set of input streams at the time of instantiation. *dirver端 * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing. */private[streaming]class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {

  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()  private val receiverInputStreamIds = receiverInputStreams.map { _.id }  private val receivedBlockTracker = new ReceivedBlockTracker(    ssc.sparkContext.conf,    ssc.sparkContext.hadoopConfiguration,    receiverInputStreamIds,    ssc.scheduler.clock,    ssc.isCheckpointPresent,    Option(ssc.checkpointDir)  )  private val listenerBus = ssc.scheduler.listenerBus

  /** Enumeration to identify current state of the ReceiverTracker */  object TrackerState extends Enumeration {    type TrackerState = Value    val Initialized, Started, Stopping, Stopped = Value  }  import TrackerState._

  /** State of the tracker. Protected by "trackerStateLock" */  @volatile private var trackerState = Initialized

  // endpoint is created when generator starts.  // This not being null means the tracker has been started and not stopped  private var endpoint: RpcEndpointRef = null

  private val schedulingPolicy = new ReceiverSchedulingPolicy()

  // Track the active receiver job number. When a receiver job exits ultimately, countDown will  // be called.  private val receiverJobExitLatch = new CountDownLatch(receiverInputStreams.size)

  /**   * Track all receivers‘ information. The key is the receiver id, the value is the receiver info.   * It‘s only accessed in ReceiverTrackerEndpoint.   */  private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]

  /**   * Store all preferred locations for all receivers. We need this information to schedule   * receivers. It‘s only accessed in ReceiverTrackerEndpoint.   */  private val receiverPreferredLocations = new HashMap[Int, Option[String]]

  /** Start the endpoint and receiver execution thread. */  def start(): Unit = synchronized {    if (isTrackerStarted) {      throw new SparkException("ReceiverTracker already started")    }

    if (!receiverInputStreams.isEmpty) {      endpoint = ssc.env.rpcEnv.setupEndpoint(        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))      if (!skipReceiverLaunch) launchReceivers()      logInfo("ReceiverTracker started")      trackerState = Started    }  }RDD中的元素必须要实现序列化,才能将RDD序列化传输给Executor端,Receiver就实现了Serializable接口,自定义的Receiver也必须实现Serializable接口。@DeveloperApiabstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {处理Receiver接收到的数据,存储数据并汇报给Driver,Receiver是一条一条的接收数据的。

作用于rdd的function,内部就是一个个Receiver,代码里面需要启动的Receiver是谁,根据你输入的数据来源inputDStreams receiver,socketTextStream

相当于一个引用句柄socketReceiver,我们获得的Receiver是引用的描述,接收的数据其是下面的getReceiver产生的:

/** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. */private def launchReceivers(): Unit = {  val receivers = receiverInputStreams.map(nis => {    val rcvr = nis.getReceiver()    rcvr.setReceiverId(nis.id)    rcvr  })

  runDummySparkJob()

  logInfo("Starting " + receivers.length + " receivers")  endpoint.send(StartAllReceivers(receivers))}
private[streaming]class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()  private val receiverInputStreamIds = receiverInputStreams.map { _.id }  private val receivedBlockTracker = new ReceivedBlockTracker(    ssc.sparkContext.conf,    ssc.sparkContext.hadoopConfiguration,    receiverInputStreamIds,    ssc.scheduler.clock,    ssc.isCheckpointPresent,    Option(ssc.checkpointDir)  )
private[streaming]class SocketInputDStream[T: ClassTag](    ssc_ : StreamingContext,    host: String,    port: Int,    bytesToObjects: InputStream => Iterator[T],    storageLevel: StorageLevel  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {    new SocketReceiver(host, port, bytesToObjects, storageLevel)  }}

private[streaming]class SocketReceiver[T: ClassTag](    host: String,    port: Int,    bytesToObjects: InputStream => Iterator[T],    storageLevel: StorageLevel  ) extends Receiver[T](storageLevel) with Logging {

  def onStart() {    // Start the thread that receives data over a connection    new Thread("Socket Receiver") {      setDaemon(true)      override def run() { receive() }    }.start()  }

如果Receiver RDD为空,则默认创建一个RDD,主要处理Receiver 接收到的数据,将接收数据给ReceiverSupervisor存储数据,并将元数据汇报给ReceiverTracker,Receiver 接收数据是一条条的,从抽象讲,是while循环获取一条条数据。接收数据,合并成buffer,放入block队列,在ReceiverSupervisorImpl启动会调用BlockGenerator对象的start方法。

override protected def onStart() {  registeredBlockGenerators.foreach { _.start() }}
/** * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. */private[streaming] class BlockGenerator(    listener: BlockGeneratorListener,    receiverId: Int,    conf: SparkConf,    clock: Clock = new SystemClock()  ) extends RateLimiter(conf) with Logging {

  private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])

  /**   * The BlockGenerator can be in 5 possible states, in the order as follows.   *   *  - Initialized: Nothing has been started   *  - Active: start() has been called, and it is generating blocks on added data.   *  - StoppedAddingData: stop() has been called, the adding of data has been stopped,   *                       but blocks are still being generated and pushed.   *  - StoppedGeneratingBlocks: Generating of blocks has been stopped, but   *                             they are still being pushed.   *  - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed.   */  private object GeneratorState extends Enumeration {    type GeneratorState = Value    val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value  }  import GeneratorState._

  private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")  require(blockIntervalMs > 0, s"‘spark.streaming.blockInterval‘ should be a positive value")

  private val blockIntervalTimer =    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

  @volatile private var currentBuffer = new ArrayBuffer[Any]  @volatile private var state = Initialized

  /** Start block generating and pushing threads. */  def start(): Unit = synchronized {    if (state == Initialized) {      state = Active      blockIntervalTimer.start()      blockPushingThread.start()      logInfo("Started BlockGenerator")    } else {      throw new SparkException(        s"Cannot start BlockGenerator as its not in the Initialized state [state = $state]")    }  }BlockGenerator类是用来干什么的?从上述的源码注释可以说明该类来把一个Receiver接收到的数据合并到一个Block然后写入到BlockManager对象中。该类内部有两个线程,一个是周期性把数据生成一批对象,然后把先前的一批数据封装成Block。另一个线程时把Block写入到BlockManager进行存储。
override def createBlockGenerator(    blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {  // Cleanup BlockGenerators that have already been stopped  registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }

  val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)  registeredBlockGenerators += newBlockGenerator  newBlockGenerator}

BlockGenerator类继承自ReateLimiter类,说明我们不能限定接收数据的速度,但是可以限定存储数据的速度,转过来就限定流动的速度。

BlockGenerator类有一个定时器(默认每200ms将接收到的数据合并成block)和一个线程(把block写入到BlockManager),200ms会产生一个Block,即1秒钟生成5个Partition。太小则生成的数据片中数据太小,导致一个Task处理的数据少,性能差。实际经验得到不要低于50ms。

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")require(blockIntervalMs > 0, s"‘spark.streaming.blockInterval‘ should be a positive value")private val blockIntervalTimer =  new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

感谢王家林老师的知识分享


王家林老师名片:


中国Spark第一人


感谢王家林老师的知识分享


新浪微博:http://weibo.com/ilovepains


微信公众号:DT_Spark


博客:http://blog.sina.com.cn/ilovepains


手机:18610086859


QQ:1740415547


邮箱:[email protected]


YY课堂:每天20:00现场授课频道68917580

时间: 2025-01-15 23:21:13

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考的相关文章

Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

本期内容 : 数据接收架构设计模式 数据接收源码彻底研究 一.Spark Streaming数据接收设计模式   Spark Streaming接收数据也相似MVC架构: 1. Mode相当于Receiver存储数据,C级别的,Receiver是个抽象因为他有好多的Receiver 2. ReceiverSupervisor 是控制器,因为Receiver启动是靠ReceiverSuperior启动的,及接收到的数据交给ReceiverSuperior存储数据的 3. Driver会获得源数据,

(版本定制)第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

本期内容: 1.数据接收架构设计模式 2.数据接收源码彻底研究 1.Receiver接受数据的过程类似于MVC模式: Receiver,ReceiverSupervisor和Driver的关系相当于Model,Control,View,也就是MVC. Model就是Receiver,存储数据Control,就是ReceiverSupervisor,Driver是获得元数据,也就是View. 2.数据的位置信息会被封装到RDD里面. 3.Receiver接受数据,交给ReceiverSupervi

第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

上一课我们讲解了Receiver启动的流程.Receiver是通过ReceiverSupervisor的start方法启动的: /** Start the supervisor */ def start() {   onStart()   startReceiver() } 首先会调用ReceiverSupervisor的onStart()方法, override protected def onStart() {   registeredBlockGenerators.foreach { _.

spark发行版笔记10

感谢DT大数据梦工厂支持提供技术支持,DT大数据梦工厂专注于Spark发行版定制. 本期概览: 数据接收全生命周期的思考 大数据处理框架中,最重要的就是性能,性能是排在前面的.其次再考虑其他的.因为数据量大,一不小心的多余的操作,几分钟,十几分钟就过去了. 根据一般的架构设计原则,接收数据和存储数据是不同的对象来完成的. Spark Streaming数据接收全生命周期可以看成是一个MVC模式,ReceiverSupervisor相当于是控制器(c),Receiver(v) 首先启动的是Rece

Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

一:Receiver启动的方式设想 1. Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job. 2. Receiver属于Spark Streaming应用程序启动阶段,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动. 3. Receivers和InputDStreams是一一对应的,默认情况下一般只有一个Receiver.

第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

一:Receiver启动的方式设想 1. Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job. 2. Receiver属于Spark Streaming应用程序启动阶段,那么我们找Receiver在哪里启动就应该去找Spark Streaming的启动. 3. Receivers和InputDStreams是一一对应的,默认情况下一般只有一个Receiver.

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

Spark发行版笔记9:Spark Streaming源码解读之Receiver生成全生命周期彻底研究和思考

本节的主要内容: 一.Receiver启动的方式设想 二.Receiver启动源码彻底分析 Receiver的设计是非常巧妙和出色的,非常值得我们去学习.研究.借鉴. 在深入认识Receiver之前,我们有必要思考一下,如果没有Spark.Spark Streaming,我们怎么实现Reciver?数据不断接进来,我们该怎么做?该怎么启动Receiver呢?...... 首先,我们找到数据来源的入口,入口如下: 数据来源kafka.socket.flume等构建的都是基于InputDStream

Spark Streaming源码解读之生成全生命周期彻底研究与思考

本期内容 : DStream与RDD关系彻底研究 Streaming中RDD的生成彻底研究 问题的提出 : 1. RDD是怎么生成的,依靠什么生成 2.执行时是否与Spark Core上的RDD执行有什么不同的 3. 运行之后我们要怎么处理 为什么有第三点 : 是因为Spark Streaming 中会随着相关触发条件,窗口Window滑动的时候都会不断的产生RDD , 从最基本的层次考虑,RDD也是基本对象,每秒会产生RDD ,内存能不能完全容纳,每个处理完成后怎么进行管理? 一. 整个Spa