SparkContext 是spark的程序入口,相当于熟悉的‘main’函数。它负责链接spark集群、创建RDD、创建累加计数器、创建广播变量。
/** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ class SparkContext(config: SparkConf) extends Logging {
创建sarpkContext唯一需要的参数就是sparkConf。它是一组K-V属性对,定义如下:
/* * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load * values from any `spark.*` Java system properties set in your application as well. In this case, * parameters you set directly on the `SparkConf` object take priority over system properties. * * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and * get the same configuration no matter what the system properties are. * * All setter methods in this class support chaining. For example, you can write * `new SparkConf().setMaster("local").setAppName("My app")`. * * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified * by the user. Spark does not support modifying the configuration at runtime. * * @param loadDefaults whether to also load values from Java system properties */class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
所有可以配置的属性如下:
/** * Creates a modified version of a SparkConf with the parameters that can be passed separately * to SparkContext, to make it easier to write SparkContext‘s constructors. This ignores * parameters that are passed as the default value of null, instead of throwing an exception * like SparkConf would. */private[spark] def updatedConf( conf: SparkConf, master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map()): SparkConf ={ val res = conf.clone() res.setMaster(master) res.setAppName(appName) if (sparkHome != null) { res.setSparkHome(sparkHome) } if (jars != null && !jars.isEmpty) { res.setJars(jars) } res.setExecutorEnv(environment.toSeq) res}
创建RDD的方法是它的主要功能:
类型1)根据scala 的对象创建RDD
// Methods for creating RDDs /** Distribute a local Scala collection to form an RDD. * * @note Parallelize acts lazily. If `seq` is a mutable collection and is * altered after the call to parallelize and before the first action on the * RDD, the resultant RDD will reflect the modified collection. Pass a copy of * the argument to avoid this. */def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())}
类型2):从存储设备读取数据来创建RDD。
/** Get an RDD for a Hadoop file with an arbitrary InputFormat * * ‘‘‘Note:‘‘‘ Because Hadoop‘s RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD will create many references to the same object. * If you plan to directly cache Hadoop writable objects, you should first copy them using * a `map` function. * */def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path)}
类型3)从其他RDD创建新的RDD
/** Build the union of a list of RDDs. */def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) /** Build the union of a list of RDDs passed as variable-length arguments. */def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = new UnionRDD(this, Seq(first) ++ rest)
创建累加变量Accumulable: 应用程序只能对它最“+=”更新操作但是不能读它的值,只有sparkContex才能使用它的值。
/** * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * * You must define how to add data, and how to merge two of these together. For some data types, * such as a counter, these might be the same operation. In that case, you can use the simpler * [[org.apache.spark.Accumulator]]. They won‘t always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` * @param name human-readable name for use in Spark‘s web UI * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */class Accumulable[R, T] ( @transient initialValue: R, param: AccumulableParam[R, T], val name: Option[String]) extends Serializable {
它能直接执行一个job:注意它的参数,以及它其实只是调用dagScheduler.runJob
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than * shipping it out to the cluster, for short actions like first(). */def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { if (dagScheduler == null) { throw new SparkException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) val start = System.nanoTime dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) logInfo( "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint()}
/** * :: Experimental :: * Submit a job for execution and return a FutureJob holding the result. */@Experimentaldef submitJob[T, U, R]( rdd: RDD[T], processPartition: Iterator[T] => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit, resultFunc: => R): SimpleFutureAction[R] ={ val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), partitions, callSite, allowLocal = false, resultHandler, localProperties.get) new SimpleFutureAction(waiter, resultFunc)}
sparkContex的半生对象暴露了它的一些实现方式,比如如何从用户的输入转化到内部实现,值得留意。
/** * The SparkContext object contains a number of implicit conversions and parameters for use with * various Spark features. */object SparkContext extends Logging {
/** Creates a task scheduler based on a given master URL. Extracted for testing. */private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { // Regular expression used for local[N] and local[*] master formats val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster by mesos:// or zk:// url val MESOS_REGEX = """(mesos|zk)://.*""".r // Regular expression for connection to Simr cluster val SIMR_REGEX = """simr://(.*)""".r // When running locally, don‘t try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) scheduler case LOCAL_N_REGEX(threads) => def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => def localCpuCount = Runtime.getRuntime.availableProcessors() // local[*, M] means the number of cores on the computer with M failures // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) scheduler case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val masterUrls = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() } scheduler case "yarn-standalone" | "yarn-cluster" => if (master == "yarn-standalone") { logWarning( "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") } val scheduler = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { // TODO: Enumerate the exact reasons why it can fail // But irrespective of it, it means we cannot proceed ! case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } val backend = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } scheduler.initialize(backend) scheduler case "yarn-client" => val scheduler = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } val backend = try { val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } scheduler.initialize(backend) scheduler case mesosUrl @ MESOS_REGEX(_) => MesosNativeLibrary.load() val scheduler = new TaskSchedulerImpl(sc) val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { new CoarseMesosSchedulerBackend(scheduler, sc, url) } else { new MesosSchedulerBackend(scheduler, sc, url) } scheduler.initialize(backend) scheduler case SIMR_REGEX(simrUrl) => val scheduler = new TaskSchedulerImpl(sc) val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) scheduler.initialize(backend) scheduler case _ => throw new SparkException("Could not parse Master URL: ‘" + master + "‘") }}
总的来说,sparkContex是整个spark程序的触发点,负责重要的初始化初始化工作。而它设计到的RDD和DAGScheduler才是重头戏。
时间: 2024-12-05 16:19:36