第五课:Spark Streamingf 源码运行架构图

第五课:Spark Streamingf 源码运行架构图

作者:大数据技术研发人员:谢彪

一、运行架构图

二、SparkStreming 运行流程

(一)、在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,在JobScheduler 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:

1、JobGenerator启动后会不断的根据batchDuration生成一个个的Job.

2、ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息

每个BatchInterval会产生一个具体的Job,其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD 的DAG而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行),为什么使用线程池呢?

1、作业不断生成,所以为了提升效率,我们需要线程池;这和在Executor中通过线程池执行Task有异曲同工之妙;

2、有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持。

运行源码流程解读:

1.创建StreamingContext

StreamingContext是Spark Streaming是运行基础,也是负责管理和其运行的重要组件,其代码如下:

可以看到,StreamingContext内部包涵了一个SparkContext,这个可以告诉我们Streaming就是Spark Core上的一个应用程序。

2.创建SocketInputDStream

首先进入StreamingContext的初始化

其次进入SocketTextStream类,可知它继承一个SocketInputDstream

而SocketInputDStream又继承>DStream。

在 SocketInputDStream之中有一个关键的内部类SocketReceiver,

执行调用流程:

-       1、 ->启动线程调用SocketReceiver.onStart()

-       2、->SocketReceiver.receive()

receive方法是实现具体的逻辑代码(根据不同类型的receiver有不同的实现,建立Socket连接,然后一直循环store)

2.2 DStream,本身是RDD的模板,主要提供生成新RDD的功能

2.2.1 RDD的存储:

可以看到RDD是使用HashMap存储起来,DStream的本质就是存储的一系列的数据流。

2.2.2 RDD的生成:

getOrCompute方法完成了RDD的生成,是SparkStreaming的核心部分。

3.StreamingContext启动(start)

/**
 * Start the execution of the streams.
 *
 * @throws IllegalStateException if the StreamingContext is already stopped.
 */
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()

          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start()
          }

进入Scheduler.start:

def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")

从以上可知:在JobScheduler 的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和ReceiverTacker的start方法:

1、JobGenerator启动后会不断的根据batchDuration生成一个个的Job.

2、ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过ReceivedBlockTracker来管理接受到的元数据信息

具体的内部源码运行在以后中分解!

作者:大数据技术研发人员:谢彪

  • 资料来源于:DT_大数据梦工厂(Spark发行版本定制) 
  • DT大数据梦工厂微信公众号:DT_Spark 
  • 新浪微博:http://www.weibo.com/ilovepains
  • 王家林老师每晚20:00免费大数据实战

YY直播:68917580

时间: 2024-10-08 02:38:12

第五课:Spark Streamingf 源码运行架构图的相关文章

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

Spark SQL源码分析之核心流程

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql. 2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里. 前一段时间测试过Shark,并且对Spark

Spark 定制版:009~Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

本讲内容: a. Receiver启动的方式设想 b. Receiver启动源码彻底分析 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上一讲中,我们给大家具体分析了RDD的物理生成和逻辑生成过程,彻底明白DStream和RDD之间的关系,及其内部其他有关类的具体依赖等信息: a. DStream是RDD的模板,其内部generatedRDDs 保存了每个BatchDuration时间生成的RDD对象实例.DStream的依赖构成了RDD

第一篇:Spark SQL源码分析之核心流程

/** Spark SQL源码分析系列文章*/ 自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql.    2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark

【Spark SQL 源码分析系列文章】

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

Spark SQL 源码分析系列文章

从决定写Spark SQL源码分析的文章,到现在一个月的时间里,陆陆续续差不多快完成了,这里也做一个整合和索引,方便大家阅读,这里给出阅读顺序 :) 第一篇 Spark SQL源码分析之核心流程 第二篇 Spark SQL Catalyst源码分析之SqlParser 第三篇 Spark SQL Catalyst源码分析之Analyzer 第四篇 Spark SQL Catalyst源码分析之TreeNode Library 第五篇 Spark SQL Catalyst源码分析之Optimize

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

Spark SQL 源码分析之 In-Memory Columnar Storage 之 cache table

/** Spark SQL源码分析系列文章*/ Spark SQL 可以将数据缓存到内存中,我们可以见到的通过调用cache table tableName即可将一张表缓存到内存中,来极大的提高查询效率. 这就涉及到内存中的数据的存储形式,我们知道基于关系型的数据可以存储为基于行存储结构 或 者基于列存储结构,或者基于行和列的混合存储,即Row Based Storage.Column Based Storage. PAX Storage. Spark SQL 的内存数据是如何组织的? Spar