SparkContext初始化过程

SparkContext的初始化是Driver应用程序提交执行的前提,Spark Driver用于提交用户的应用程序,可以看做是Spark的客户端,了解Spark Driver的初始化,有助于理解用户应用程序在客户端的处理过程

SparkContext的初始化参数由SparkConf负责,SparkConf的构造很简单, 主要通过ConcurrentHashMap来维护各种Spark的配置属性

// 用于存储配置信息
private val settings = new ConcurrentHashMap[String, String]()

if (loadDefaults) {
  loadFromSystemProperties(false)
}

// 加载任何以 "spark." 开头的系统属性
private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
  // Load any spark.* system properties
  for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
    set(key, value, silent)
  }
  this
}

下面是SparkContext的初始化步骤

1、创建Spark执行环境

2、创建RDD清理器metadataCleaner

3、创建并初始化SparkUI

4、Hadoop相关配置及Executor环境变量的设置

5、创建任务调度 TaskScheduler

6、创建和启动DAGScheduler

7、TaskScheduler的启动

8、初始化BlockManager (BlockManager是存储体系的重要组件,用于管理块信息)

9、启动测量系统 MetricsSystem

10、创建和启动Executor分配管理器ExecutorAllocationManager

11、ContextCleanr 的创建与启动

12、Spark环境更新

13、创建DAGSchedulerSource 和 BlockMangerSource

14、将SpakrContext标记为激活

SparkContext 的主构造器参数为SparkConf

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

  // 存储了线程栈中最靠近栈顶的用户类及靠近栈底的scla或者spark核心类信息
  private val creationSite: CallSite = Utils.getCallSite()
  //用于设置SparkContext是否在一个jvm内保持单例,默认是单例
  private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false)
  //用来确保实例的唯一性,并标记当前实例为正在构建
  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

属性:spark.driver.allowMultipleContexts 用来控制SparkContext是否为单例

接下来会对SparkConf进行复制,然后对各种配置信息进行赋值

private[spark] def conf: SparkConf = _conf

/**
 * Return a copy of this SparkContext‘s configuration. The configuration ‘‘cannot‘‘ be
 * changed at runtime.
 */
def getConf: SparkConf = conf.clone()

def jars: Seq[String] = _jars
def files: Seq[String] = _files
def master: String = _conf.get("spark.master")
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
def appName: String = _conf.get("spark.app.name")

private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
private[spark] def eventLogDir: Option[URI] = _eventLogDir
private[spark] def eventLogCodec: Option[String] = _eventLogCodec

def isLocal: Boolean = Utils.isLocalMaster(_conf)

属性:

spark.master  运行模式(yarn, mesos, standalone)

spark.submit.deployMode 设置部署模式,默认为client模式

spark.app.name 指定应用程序的名称

spark.eventLog.enabled 是否启动event日志

配置校验

_conf.validateSettings()

if (!_conf.contains("spark.master")) {
  throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
  throw new SparkException("An application name must be set in your configuration")
}

// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
  throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " +
    "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
}

spark.master和spark.app.name是必须要指定的值,否则会抛出异常

时间: 2024-12-23 15:34:37

SparkContext初始化过程的相关文章

spark源码解读-SparkContext初始化过程

sparkcontext是spark应用程序的入口,每个spark应用都会创建sparkcontext,用于连接spark集群来执行计算任务.在sparkcontext初始化过程中会创建SparkEnv,SparkUI,TaskSchedule,DAGSchedule等多个核心类,我们会逐个分析他们. 下面我们看一下sparkcontext的初始化过程,首先判断一些参数, try { _conf = config.clone() _conf.validateSettings() if (!_co

spark 源码分析之二 -- SparkContext 的初始化过程

创建或使用现有Session 从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下: 1 val spark = SparkSession 2 .builder 3 .appName("SparkTC") 4 .getOrCreate() 首先,使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如

Spark-源码-TaskScheduler初始化过程, ClientActor向Master发送注册任务信息过程

Spark版本 1.3 Spark源码 Spark.createTaskScheduler TaskScheduler初始化过程 1.// SparkContext中 /** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc:

java代码的初始化过程研究

刚刚在ITeye上看到一篇关于java代码初始化的文章,看到代码我试着推理了下结果,虽然是大学时代学的知识了,没想到还能做对.(看来自己大学时掌握的基础还算不错,(*^__^*) 嘻嘻--)但是博主写的不够详细具体,我想在这详细谈一下java代码的具体初始化过程. 首先要清楚,初始化分为两个过程:类初始化.对象初始化. 类初始化是指类加载器将类加载到内存时,对类成员的初始化过程,也就是有static修饰的变量.对于加载完的类,它的类变量都会赋一个默认值,即使你定义时就赋值了.比如int类型就是0

JAVA对象的初始化过程

出处:http://blog.csdn.net/andrew323/article/details/4665379 下面我们通过两个例题来说明对象的实例化过程. 例1:   编译并运行该程序会有以下输出 Static Block Employee Company:china soft Non-Static Block Employee phone:0755-51595599 Employee(String) Empoloyee() 下面我们来对结果做分析: 1 在代码34行实例化对象时, 先对给

Java对象相关元素的初始化过程

1.类的成员变量.构造函数.成员方法的初始化过程 当一个类使用new关键字来创建新的对象的时候,比如Person per = new Person();JVM根据Person()寻找匹配的类,然后找到这个类相匹配的构造方法,这里是无参构造,如果程序中没有给出任何构造方法,则JVM默认会给出一个无参构造.当创建一个对象的时候一定对调用该类的构造方法,构造方法就是为了对对象的数据进行初始化.JVM会对给这个对象分配内存空间,也就是对类的成员变量进行分配内存空间,如果类中在定义成员变量就赋值的话,就按

对Socket CAN的理解(5)——【Socket CAN控制器的初始化过程】

转载请注明出处:http://blog.csdn.net/Righthek 谢谢! 对于一般的CAN模块,进行初始化时,最关键的是以下两步: 1.  配置CAN的位时序: 2.  配置CAN的消息报文: 下面,我们来详细分析上面提到的关键两步. 一.初始化步骤: 1.  第一步,进入初始化模式,在CAN控制寄存器中,将Init位置1: 2.  第二步,在CAN控制寄存器中,将CCE位置1: 3.  第三步,等待Init位置1,此步聚为了确保已经进入初始化模式: 4.  第四步,将位时序的值写入到

IOC容器的初始化过程

1.ClassPathXmlApplicationContext类体系结构 左边的黄色部分是ApplicationContext体系继承结构,右边是BeanFactory结构体系,两个体系是典型的模板方法设计模式的使用. 从该继承体系可以看出: (1)BeanFactory是一个bean工厂的最基本定义,里面包含了一个bean工厂的几个最基本方法:getBean(),containsBean()等,是一个很纯粹的bean工厂,不关注资源.资源位置.事件等. ApplicationContext是

Java初始化过程

以下程序执行的结果是: class X{ Y y=new Y(); public X(){ System.out.print("X"); } } class Y{ public Y(){ System.out.print("Y"); } } public class Z extends X{ Y y=new Y(); public Z(){ System.out.print("Z"); } public static void main(Stri