SparkContext的初始化是Driver应用程序提交执行的前提,Spark Driver用于提交用户的应用程序,可以看做是Spark的客户端,了解Spark Driver的初始化,有助于理解用户应用程序在客户端的处理过程
SparkContext的初始化参数由SparkConf负责,SparkConf的构造很简单, 主要通过ConcurrentHashMap来维护各种Spark的配置属性
// 用于存储配置信息 private val settings = new ConcurrentHashMap[String, String]() if (loadDefaults) { loadFromSystemProperties(false) } // 加载任何以 "spark." 开头的系统属性 private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = { // Load any spark.* system properties for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { set(key, value, silent) } this }
下面是SparkContext的初始化步骤
1、创建Spark执行环境
2、创建RDD清理器metadataCleaner
3、创建并初始化SparkUI
4、Hadoop相关配置及Executor环境变量的设置
5、创建任务调度 TaskScheduler
6、创建和启动DAGScheduler
7、TaskScheduler的启动
8、初始化BlockManager (BlockManager是存储体系的重要组件,用于管理块信息)
9、启动测量系统 MetricsSystem
10、创建和启动Executor分配管理器ExecutorAllocationManager
11、ContextCleanr 的创建与启动
12、Spark环境更新
13、创建DAGSchedulerSource 和 BlockMangerSource
14、将SpakrContext标记为激活
SparkContext 的主构造器参数为SparkConf
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { // 存储了线程栈中最靠近栈顶的用户类及靠近栈底的scla或者spark核心类信息 private val creationSite: CallSite = Utils.getCallSite() //用于设置SparkContext是否在一个jvm内保持单例,默认是单例 private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) //用来确保实例的唯一性,并标记当前实例为正在构建 SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
属性:spark.driver.allowMultipleContexts 用来控制SparkContext是否为单例
接下来会对SparkConf进行复制,然后对各种配置信息进行赋值
private[spark] def conf: SparkConf = _conf /** * Return a copy of this SparkContext‘s configuration. The configuration ‘‘cannot‘‘ be * changed at runtime. */ def getConf: SparkConf = conf.clone() def jars: Seq[String] = _jars def files: Seq[String] = _files def master: String = _conf.get("spark.master") def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client") def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) private[spark] def eventLogDir: Option[URI] = _eventLogDir private[spark] def eventLogCodec: Option[String] = _eventLogCodec def isLocal: Boolean = Utils.isLocalMaster(_conf)
属性:
spark.master 运行模式(yarn, mesos, standalone)
spark.submit.deployMode 设置部署模式,默认为client模式
spark.app.name 指定应用程序的名称
spark.eventLog.enabled 是否启动event日志
配置校验
_conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn cluster mode, but isn‘t running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") }
spark.master和spark.app.name是必须要指定的值,否则会抛出异常