解密SparkStreaming运行机制和架构进阶之Job和容错(第三篇)

本期要点:

1、探讨Spark Streaming Job架构和运行机制

2、探讨Spark Streaming 容错机制

关于SparkStreaming我们在前面的博客中其实有所探讨,SparkStreaming是运行在SparkCode之前的一个子框架,下面我们通过一个简单例子来逐一探讨SparkStreaming运行机制和架构

  1. SparkStreaming运行机制和架构
//新浪微博:http://weibo.com/ilovepains/

SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("WordCountOnline");
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream lines = jsc.socketTextStream("Master", 9999);

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { 

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordsCount.print();

        jsc.start();
        jsc.awaitTermination();
        jsc.close();

这是一个SparkStreaming单词记数的例子

在SparkStreaming程序中是StreamingContext是SparkStreaming应用程序所有功能的起始点和程序调度的核心,我们来看一下StreamingContext初始化的部分源码:

//StreamingContext.scala 183行
private[streaming] val scheduler = new JobScheduler(this)

我们可以看到在构建StreamingContext的时候,StreamingContext初始化了JobScheduler,而在JobScheduler中又初始化了JobGenerator,同时定义了receiverTracker变量,如下

//JobScheduler.scala 50行
 private val jobGenerator = new JobGenerator(this)
  val clock = jobGenerator.clock
  val listenerBus = new StreamingListenerBus()

  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null

下面我们来看jsc.socketTextStream(“Master”, 9999)创建DStream背后的部分源码:

// StreamingContext.scala 327行
 def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }
  // StreamingContext.scala  345行
 def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }

从上面我们可以看到StreamingContext对socketStream方法进行了方法重载,最终调用的是SocketInputDStream,那我们接着来看一下SocketInputDStream

private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

在SocketInputDStream中定了接受数据的getReceiver方法,当然咋们看到的这些都处于方法定义或者对象初始化的阶段,还没真正开始执行

那现在我们接着来看jsc.start()开始启动程序执行方法

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

我们可以开到jsc.start(),其实做了很多工作,但我们重点关注一下:scheduler.start()

  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext)
    //JobScheduler.scala 80行
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    //JobScheduler.scala 83行
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }

我现在可以看到在JobScheduler的start方法中receiverTracker得到了初始化,并且调用了其start方法

//ReceiverTracker.scala 149行
def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

//ReceiverTracker.scala 413行
private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map(nis => {
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    })

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }

至此我们可以看到,在StreamingContext执行start方法时会调用JobScheduler的start方法,而在JobScheduler的start方法中会初始化ReceiverTracker并执行其start方法,ReceiverTracker执行start方法时最终是通过rpc通信的方式通知Worker中的excutor进程开始不断接受数据,并将元数据信息汇报给driver

下面我们接着回到JobScheduler.scala 83行,看jobGenerator.start()方法:

//JobGenerator.scala 79行,
def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

到这块已经完成了SparkStreaming启动ReceiverTracker接受数据并且通过JobGenerator Job生成器产生Job,运行在cluster之上

当然我们在程序当中可以看到源码当中其实有很多线程池的使用,笔者认为其中最大的好处在于可以减少创建新线程的时间消耗而又可以达到对线程的高度复用(类似于数据库的连接池是一个道理)

  1. Spark Streaming 容错机制:

Spark Streaming底层实际上就是RDD的集合,基于这种特性,它的容错机制主要就是两种:一是checkpoint,二是基于lineage(血统)的容错。当然如果lineage链条过于复杂和冗长,这时候就需要做checkpoint

 由于RDD的依赖关系,如果stage之间都是窄依赖,此时一般基于lineage容错,方便高效。在stage之间如果是宽依赖,而宽依赖一般会产生shuffle操作,这时候我们就需要考虑checkpoint了

时间: 2024-08-07 15:00:28

解密SparkStreaming运行机制和架构进阶之Job和容错(第三篇)的相关文章

第3课:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错

本期内容: 解密Spark Streaming Job架构和运行机制 解密Spark Streaming容错架构和运行机制 理解SparkStreaming的Job的整个架构和运行机制对于精通SparkStreaming是至关重要的.我们知道对于一般的Spark应用程序来说,是RDD的action操作触发了Job的运行.那对于SparkStreaming来说,Job是怎么样运行的呢?我们在编写SparkStreaming程序的时候,设置了BatchDuration,Job每隔BatchDurat

spark版本定制:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错

本期内容: 1.解密Spark Streaming Job架构和运行机制 2.解密Spark Streaming 容错架构和运行机制 一.解密Spark Streaming Job架构和运行机制 通过代码洞察Job的执行过程: object OnlineForeachRDD2DB { def main(args: Array[String]){ /* * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息 */ val conf = new SparkCon

第3课:通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错

理解Spark Streaming的Job的整个架构和运行机制对于精通Spark Streaming是至关重要的. 一 首先我们运行以下的程序,然后通过这个程序的运行过程进一步加深理解Spark Streaming流处理的Job的执行的过程,代码如下: object OnlineForeachRDD2DB { def main(args: Array[String]){ /* * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息, * 例如说通过setMa

Spark版本定制:通过案例对SparkStreaming透彻理解三板斧之二:解密SparkStreaming运行机制和架构

本期内容: 1.解密Spark Streaming运行机制 2.解密Spark Streaming架构 上期回顾: 1.技术界的寻龙点穴,每个领域都有自己的龙脉,Spark就是大数据界的龙脉,Spark Streaming就是Spark的龙血: 2.采用了降维(把时间Batch Interval放大)的方式,进行案例演示实战,得到的结论是:特定的时间内是RDD在执行具体的Job: 一.解密Spark Streaming运行机制和架构 运行机制概念:       Spark Streaming运行

第2课:通过案例对SparkStreaming 透彻理解三板斧之二:解密SparkStreaming运行机制和架构

本篇博文将从以下几点组织文章: 1. 解密Spark Streaming运行机制 2. 解密Spark Streaming架构 一:解密Spark Streaming运行机制 1. DAG生成模板 :DStreamGraph a) Spark Streaming中不断的有数据流进来,他会把数据积攒起来,积攒的依据是以Batch Interval的方式进行积攒的,例如1秒钟,但是这1秒钟里面会有很多的数据例如event,event就构成了一个数据的集合,而RDD处理的时候,是基于固定不变的集合产生

解密SparkStreaming运行机制和架构分析

 解密Spark Streaming Job架构和运行机制 解密Spark Streaming容错架构和运行机制 作业的生成肯定是一个动态的生成 private[streaming]valgraph: DStreamGraph = { if(isCheckpointPresent) { cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph }else{ require(batchDur_ != null,

第2课:SparkStreaming 透彻理解三板斧之二:解密SparkStreaming运行机制和架构

本期内容: 解密Spark Streaming 运行机制 解密Spark Streaming 框架 Spark Streaming是Spark的一个子框架,但是它更像是运行在Spark Core上的一个应用程序.Spark Streaming在启动时运行了很多的job,并且job之间相互配合. Job围绕了两个方面: 1. 对输入数据流进行计算的Job 2. 框架自身运行需要的Job,例如Receiver启动. Spark Streaming本身就是一个非常复杂的应用程序,如果你对SparkSt

通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的存在,任何时候宇宙中的事情一直在发生着的. Spark Streaming好比时间,一直遵循其运行机制和架构在不停的在运行,无论你写多或者少的应用程序都跳不出这个范围. 一.   通过案例透视Job执行过程的Spark Streaming机制解析,案例代码如下: import org.apache.

Spark定制班第2课:通过案例对Spark Streaming透彻理解三板斧之二:解密Spark Streaming运行机制和架构

本期内容: 1 解密Spark Streaming运行机制 2 解密Spark Streaming架构 1 解密Spark Streaming运行机制 我们看看上节课仍没有停下来的Spark Streaming程序运行留下的信息. 这个程序仍然在不断地循环运行.即使没有接收到新数据,日志中也不断循环显示着JobScheduler.BlockManager.MapPartitionsRDD.ShuffledRDD等等信息.这些都是Spark Core相关的信息.其循环的依据,也就是时间这个维度.