[Apache Spark源代码阅读]天堂之门——SparkContext解析

略微了解Spark源代码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,很多大牛也在源代码分析的文章中对其做了非常多相关的深入分析和解读。这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex。

SparkContex位于项目的源代码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包括SparkContextClasss声明和其伴生对象SparkContextObject。而之所以将SparkContext称为整个程序的入口,原因在于,无论我们是从本地还是HDFS读取文件,总要首先创建一个SparkContext对象,然后基于这个SC对象,展开兴许的RDD对象创建、转换等操作。

在创建SparkContex对象的过程中,进行了一系列的初始化操作,主要包含下面内容:

  1. 加载配置文件SparkConf
  2. 创建SparkEnv
  3. 创建TaskScheduler
  4. 创建DAGScheduler

1、 加载配置文件SparkConf

在SparkConf初始化时,会将相关的配置參数传递给SparkContex,包含master、appName、sparkHome、jars、environment等信息,这里的构造函数有多中表达形式,但最归初始化的结果都是殊途同归,SparkContex获取了全部相关的本地配置和执行时配置信息。

def this(master: String, appName: String, conf: SparkConf) =
    this(SparkContext.updatedConf(conf, master, appName))

def this(
      master: String,
      appName: String,
      sparkHome: String = null,
      jars: Seq[String] = Nil,
      environment: Map[String, String] = Map(),
      preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
  {
    this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment))
    this.preferredNodeLocationData = preferredNodeLocationData
  }

2、创建SparkEnv

SparkEnv是一个很重要的变量,其内包括了很多Spark执行时的重要组件(变量),包括 MapOutputTracker、ShuffleFetcher、BlockManager等,这里是通过SparkEnv类的伴生对象SparkEnv Object内的Create方法实现的。

private[spark] val env = SparkEnv.create(
    conf,
    "<driver>",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal,
    listenerBus = listenerBus)
  SparkEnv.set(env)

3、创建TaskScheduler和DAGScheduler

以下这段代码很重要,它初始化了SparkContex里两个很关键的变量,TaskScheduler和DAGScheduler。

private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => throw
      new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
  }

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
  // constructor
  taskScheduler.start()

首先,TaskScheduler是依据Spark的执行模式进行初始化的,详细代码在SparkContext中的createTaskScheduler方法中。以Standalone模式为例,它会将sc传递给TaskSchedulerImpl,并在返回Scheduler对象之前,创建SparkDeploySchedulerBackend,并将其初始化,最后返回Scheduler对象。

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        scheduler

创建TaskScheduler对象后,再将TaskScheduler对象传參至DAGScheduler,用来创建DAGScheduler对象,

def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
    this(
      sc,
      taskScheduler,
      sc.listenerBus,
      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
      sc.env.blockManager.master,
      sc.env)
  }

之后,再调用其start()方法将其启动,当中包含SchedulerBackend的启动。

override def start() {
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
  }

除此之外,SparkContex还包含一些重要的函数方法,比如

1、runjob

runjob是spark中全部任务提交的入口,诸如rdd中的一些常见操作和变换,都会调用SparkContex的runjob方法,提交任务。

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      allowLocal: Boolean,
      resultHandler: (Int, U) => Unit) {
    if (dagScheduler == null) {
      throw new SparkException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite)
    val start = System.nanoTime
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
      resultHandler, localProperties.get)
    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
    rdd.doCheckpoint()
  }

2、textFile

从HDFS路径读取单个数据文件,首先创建HadoopRDD,通过map操作,返回RDD对象。

3、wholeTextFiles

从HDFS某个目录读取多个文件。

4、parallelize

读取本地文件,并转换为RDD。

时间: 2024-08-14 03:34:44

[Apache Spark源代码阅读]天堂之门——SparkContext解析的相关文章

[Apache Spark源码阅读]天堂之门——SparkContext解析

稍微了解Spark源码的人应该都知道SparkContext,作为整个Project的程序入口,其重要性不言而喻,许多大牛也在源码分析的文章中对其做了很多相关的深入分析和解读.这里,结合自己前段时间的阅读体会,与大家共同讨论学习一下Spark的入口对象—天堂之门—SparkContex. SparkContex位于项目的源码路径\spark-master\core\src\main\scala\org\apache\spark\SparkContext.scala中,源文件包含Classs Sp

在Apache Spark中使用UDF

用户自定义函数(UDF)是大多数SQL环境的一个关键特性,其主要用于扩展系统的内置功能.UDF允许开发人员通过抽象其低级语言实现在更高级语言(如SQL)中应用的新函数.Apache Spark也不例外,其为UDF与Spark SQL工作流集成提供了各种选项. 在本篇博文中,我们将回顾Python.Java和Scala上的Apache Spark UDF和UDAF(用户自定义的聚合函数)实现的简单示例.我们还将讨论重要的UDF API功能和集成点,包括各发行版本之间的当前可用性.总而言之,我们将介

Apache Spark源码走读之11 -- sql的解析与执行

欢迎转载,转载请注明出处,徽沪一郎. 概要 在即将发布的spark 1.0中有一个新增的功能,即对sql的支持,也就是说可以用sql来对数据进行查询,这对于DBA来说无疑是一大福音,因为以前的知识继续生效,而无须去学什么scala或其它script. 一般来说任意一个sql子系统都需要有parser,optimizer,execution三大功能模块,在spark中这些又都是如何实现的呢,这些实现又有哪些亮点和问题?带着这些疑问,本文准备做一些比较深入的分析. SQL模块分析有几大难点,分别为

Spark天堂之门(SparkContext)解密(DT大数据梦工厂)

内容: 1.Spark天堂之门: 2.SparkContext使用案例鉴赏: 3.SparkContext内幕: 4.SparkContext源码解密: SparkContext是编写任意Spark程序的第一个对象,用SparkConf为传入的参数 ==========Spark天堂之门:SparkContext !!!============ 1.Spark程序在运行的时候分为Driver和Executors: 2.Spark程序编写是基于SparkContext的,具体来说包含两个方面: 1

ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext

今天在用Spark把Kafka的数据往ES写的时候,代码一直报错,错误信息如下: 15/10/20 17:28:56 ERROR actor.OneForOneStrategy: org.apache.spark.SparkContext java.io.NotSerializableException: org.apache.spark.SparkContext at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java

Spark--&gt;combineByKey【请阅读Apache spark官网文档】

这篇文章,很有必要看,写的不错.但是看过后,不要忘记查看Apache spark官网.因为这篇文章理解还是和源码.官网文档 不一致.有一点错误![cnblogs的代码编辑器 不支持Scala,所以 语言的关键字 没有高亮显示] 在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组.聚合或者将两个包含Pair数据的RDD根据key进行join.从函数的抽象层面看,这些操作具有共同的特征,都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)

全面解析Apache Spark中的决策树

Apache Spark中的决策树 决策树是在顺序决策问题进行分类,预测和促进决策的有效方法.决策树由两部分组成: 决策(Desion) 结果(Outcome) 决策树包含三种类型的节点: 根节点(Root node):包含所有数据的树的顶层节点. 分割节点(Splitting node):将数据分配给子组(subgroup)的节点. 终端节点(Terminal node):最终决定(即结果). 分割节点(Splitting node),仅就离散数学中的树的概念而言,就是指分支节点. 为了抵达终

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas