spark 笔记 5: SparkContext,SparkConf

SparkContext 是spark的程序入口,相当于熟悉的‘main’函数。它负责链接spark集群、创建RDD、创建累加计数器、创建广播变量。

/** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * * @param config a Spark Config object describing the application configuration. Any settings in *   this config overrides the default configs as well as system properties. */

class SparkContext(config: SparkConf) extends Logging {

创建sarpkContext唯一需要的参数就是sparkConf。它是一组K-V属性对,定义如下:

/* * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. * * Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load * values from any `spark.*` Java system properties set in your application as well. In this case, * parameters you set directly on the `SparkConf` object take priority over system properties. * * For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and * get the same configuration no matter what the system properties are. * * All setter methods in this class support chaining. For example, you can write * `new SparkConf().setMaster("local").setAppName("My app")`. * * Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified * by the user. Spark does not support modifying the configuration at runtime. * * @param loadDefaults whether to also load values from Java system properties */class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

所有可以配置的属性如下:

/** * Creates a modified version of a SparkConf with the parameters that can be passed separately * to SparkContext, to make it easier to write SparkContext‘s constructors. This ignores * parameters that are passed as the default value of null, instead of throwing an exception * like SparkConf would. */private[spark] def updatedConf(    conf: SparkConf,    master: String,    appName: String,    sparkHome: String = null,    jars: Seq[String] = Nil,    environment: Map[String, String] = Map()): SparkConf ={  val res = conf.clone()  res.setMaster(master)  res.setAppName(appName)  if (sparkHome != null) {    res.setSparkHome(sparkHome)  }  if (jars != null && !jars.isEmpty) {    res.setJars(jars)  }  res.setExecutorEnv(environment.toSeq)  res}

创建RDD的方法是它的主要功能:

类型1)根据scala 的对象创建RDD

// Methods for creating RDDs

/** Distribute a local Scala collection to form an RDD. * * @note Parallelize acts lazily. If `seq` is a mutable collection and is * altered after the call to parallelize and before the first action on the * RDD, the resultant RDD will reflect the modified collection. Pass a copy of * the argument to avoid this. */def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())}

类型2):从存储设备读取数据来创建RDD。

/** Get an RDD for a Hadoop file with an arbitrary InputFormat  *  * ‘‘‘Note:‘‘‘ Because Hadoop‘s RecordReader class re-uses the same Writable object for each  * record, directly caching the returned RDD will create many references to the same object.  * If you plan to directly cache Hadoop writable objects, you should first copy them using  * a `map` function.  * */def hadoopFile[K, V](    path: String,    inputFormatClass: Class[_ <: InputFormat[K, V]],    keyClass: Class[K],    valueClass: Class[V],    minPartitions: Int = defaultMinPartitions    ): RDD[(K, V)] = {  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.  val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))  val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)  new HadoopRDD(    this,    confBroadcast,    Some(setInputPathsFunc),    inputFormatClass,    keyClass,    valueClass,    minPartitions).setName(path)}

类型3)从其他RDD创建新的RDD

/** Build the union of a list of RDDs. */def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)

/** Build the union of a list of RDDs passed as variable-length arguments. */def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =  new UnionRDD(this, Seq(first) ++ rest)

创建累加变量Accumulable: 应用程序只能对它最“+=”更新操作但是不能读它的值,只有sparkContex才能使用它的值。

/** * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * * You must define how to add data, and how to merge two of these together.  For some data types, * such as a counter, these might be the same operation. In that case, you can use the simpler * [[org.apache.spark.Accumulator]]. They won‘t always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `R` and `T` * @param name human-readable name for use in Spark‘s web UI * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */class Accumulable[R, T] (    @transient initialValue: R,    param: AccumulableParam[R, T],    val name: Option[String])  extends Serializable {

它能直接执行一个job:注意它的参数,以及它其实只是调用dagScheduler.runJob

/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than * shipping it out to the cluster, for short actions like first(). */def runJob[T, U: ClassTag](    rdd: RDD[T],    func: (TaskContext, Iterator[T]) => U,    partitions: Seq[Int],    allowLocal: Boolean,    resultHandler: (Int, U) => Unit) {  if (dagScheduler == null) {    throw new SparkException("SparkContext has been shutdown")  }  val callSite = getCallSite  val cleanedFunc = clean(func)  logInfo("Starting job: " + callSite.shortForm)  val start = System.nanoTime  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,    resultHandler, localProperties.get)  logInfo(    "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")  rdd.doCheckpoint()}
/** * :: Experimental :: * Submit a job for execution and return a FutureJob holding the result. */@Experimentaldef submitJob[T, U, R](    rdd: RDD[T],    processPartition: Iterator[T] => U,    partitions: Seq[Int],    resultHandler: (Int, U) => Unit,    resultFunc: => R): SimpleFutureAction[R] ={  val cleanF = clean(processPartition)  val callSite = getCallSite  val waiter = dagScheduler.submitJob(    rdd,    (context: TaskContext, iter: Iterator[T]) => cleanF(iter),    partitions,    callSite,    allowLocal = false,    resultHandler,    localProperties.get)  new SimpleFutureAction(waiter, resultFunc)}

sparkContex的半生对象暴露了它的一些实现方式,比如如何从用户的输入转化到内部实现,值得留意。

/** * The SparkContext object contains a number of implicit conversions and parameters for use with * various Spark features. */object SparkContext extends Logging {
/** Creates a task scheduler based on a given master URL. Extracted for testing. */private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {  // Regular expression used for local[N] and local[*] master formats  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r  // Regular expression for local[N, maxRetries], used in tests with failing tasks  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r  // Regular expression for simulating a Spark cluster of [N, cores, memory] locally  val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r  // Regular expression for connecting to Spark deploy clusters  val SPARK_REGEX = """spark://(.*)""".r  // Regular expression for connection to Mesos cluster by mesos:// or zk:// url  val MESOS_REGEX = """(mesos|zk)://.*""".r  // Regular expression for connection to Simr cluster  val SIMR_REGEX = """simr://(.*)""".r

// When running locally, don‘t try to re-execute tasks on failure.  val MAX_LOCAL_TASK_FAILURES = 1

master match {    case "local" =>      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)      val backend = new LocalBackend(scheduler, 1)      scheduler.initialize(backend)      scheduler

case LOCAL_N_REGEX(threads) =>      def localCpuCount = Runtime.getRuntime.availableProcessors()      // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.      val threadCount = if (threads == "*") localCpuCount else threads.toInt      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)      val backend = new LocalBackend(scheduler, threadCount)      scheduler.initialize(backend)      scheduler

case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>      def localCpuCount = Runtime.getRuntime.availableProcessors()      // local[*, M] means the number of cores on the computer with M failures      // local[N, M] means exactly N threads with M failures      val threadCount = if (threads == "*") localCpuCount else threads.toInt      val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)      val backend = new LocalBackend(scheduler, threadCount)      scheduler.initialize(backend)      scheduler

case SPARK_REGEX(sparkUrl) =>      val scheduler = new TaskSchedulerImpl(sc)      val masterUrls = sparkUrl.split(",").map("spark://" + _)      val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)      scheduler.initialize(backend)      scheduler

case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>      // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.      val memoryPerSlaveInt = memoryPerSlave.toInt      if (sc.executorMemory > memoryPerSlaveInt) {        throw new SparkException(          "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(            memoryPerSlaveInt, sc.executorMemory))      }

val scheduler = new TaskSchedulerImpl(sc)      val localCluster = new LocalSparkCluster(        numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)      val masterUrls = localCluster.start()      val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)      scheduler.initialize(backend)      backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {        localCluster.stop()      }      scheduler

case "yarn-standalone" | "yarn-cluster" =>      if (master == "yarn-standalone") {        logWarning(          "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")      }      val scheduler = try {        val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")        val cons = clazz.getConstructor(classOf[SparkContext])        cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]      } catch {        // TODO: Enumerate the exact reasons why it can fail        // But irrespective of it, it means we cannot proceed !        case e: Exception => {          throw new SparkException("YARN mode not available ?", e)        }      }      val backend = try {        val clazz =          Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")        val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])        cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]      } catch {        case e: Exception => {          throw new SparkException("YARN mode not available ?", e)        }      }      scheduler.initialize(backend)      scheduler

case "yarn-client" =>      val scheduler = try {        val clazz =          Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")        val cons = clazz.getConstructor(classOf[SparkContext])        cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

} catch {        case e: Exception => {          throw new SparkException("YARN mode not available ?", e)        }      }

val backend = try {        val clazz =          Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")        val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])        cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]      } catch {        case e: Exception => {          throw new SparkException("YARN mode not available ?", e)        }      }

scheduler.initialize(backend)      scheduler

case mesosUrl @ MESOS_REGEX(_) =>      MesosNativeLibrary.load()      val scheduler = new TaskSchedulerImpl(sc)      val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)      val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs      val backend = if (coarseGrained) {        new CoarseMesosSchedulerBackend(scheduler, sc, url)      } else {        new MesosSchedulerBackend(scheduler, sc, url)      }      scheduler.initialize(backend)      scheduler

case SIMR_REGEX(simrUrl) =>      val scheduler = new TaskSchedulerImpl(sc)      val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)      scheduler.initialize(backend)      scheduler

case _ =>      throw new SparkException("Could not parse Master URL: ‘" + master + "‘")  }}

总的来说,sparkContex是整个spark程序的触发点,负责重要的初始化初始化工作。而它设计到的RDD和DAGScheduler才是重头戏。

来自为知笔记(Wiz)

时间: 2024-12-05 16:19:36

spark 笔记 5: SparkContext,SparkConf的相关文章

spark快速大数据分析学习笔记*初始化sparkcontext(一)

初始化SparkContext 1// 在java中初始化spark 2 import org.apache.spark.SparkConf; 3 import org.apache.spark.api.java.JavaSparkContext; 4 5 SparkConf conf=new SparkConf().setMaster("local").setAppName("my app"); 6 //集群url:本例是运行在本地单机local:应用名,可以在集

spark[源码]-sparkContext概述

h2 { color: #fff; background-color: #7CCD7C; padding: 3px; margin: 10px 0px } h3 { color: #fff; background-color: #008eb7; padding: 3px; margin: 10px 0px } SparkContext概述 sparkContext是所有的spark应用程序的发动机引擎,就是说你想要运行spark程序就必须创建一个,不然就没的玩了.sparkContext负责初始

spark教程(四)-SparkContext 和 RDD 算子

SparkContext SparkContext 是在 spark 库中定义的一个类,作为 spark 库的入口点: 它表示连接到 spark,在进行 spark 操作之前必须先创建一个 SparkContext 的实例,并且只能创建一个: 利用 SparkContext 实例创建的对象都是 RDD,这是相对于 SparkSession 说的,因为 它创建的对象都是 DataFrame: 创建 sc class SparkContext(__builtin__.object): def __i

spark[源码]-sparkContext详解

h2 { color: #fff; background-color: #7CCD7C; padding: 3px; margin: 10px 0px } h3 { color: #fff; background-color: #008eb7; padding: 3px; margin: 10px 0px } spark简述 sparkContext在Spark应用程序的执行过程中起着主导作用,它负责与程序和spark集群进行交互,包括申请集群资源.创建RDD.accumulators及广播变量

spark中的SparkContext实例的textFile使用的小技巧

网上很多例子,包括官网的例子,都是用textFile来加载一个文件创建RDD,类似sc.textFile("hdfs://n1:8020/user/hdfs/input") textFile的参数是一个path,这个path可以是: 1. 一个文件路径,这时候只装载指定的文件 2. 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件) 3. 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件 第三点是一个使用小技巧,现在假设我的数据结构为先按天分区,再按小

spark 笔记 10: TaskScheduler相关

任务调度器的接口类.应用程序可以定制自己的调度器来执行.当前spark只实现了一个任务调度器TaskSchedulerImpl ===================task scheduler begin==================== -> TaskSchedulerImpl::submitTasks(taskSet: TaskSet)  处理接受task,它做了同步操作. -> new TaskSetManager(this, taskSet, maxTaskFailures)

spark 笔记 15: ShuffleManager,shuffle map两端的stage/task的桥梁

无论是Hadoop还是spark,shuffle操作都是决定其性能的重要因素.在不能减少shuffle的情况下,使用一个好的shuffle管理器也是优化性能的重要手段. ShuffleManager的主要功能是在task直接传递数据,所以getWriter和getReader是它的主要接口. 大流程: 1)需求方:当一个Stage依赖于一个shuffleMap的结果,那它在DAG分解的时候就能识别到这个依赖,并注册到shuffleManager: 2)供应方:也就是shuffleMap,它在结束

Spark笔记:RDD基本操作(上)

本文主要是讲解spark里RDD的基础操作.RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD当作一个数组,这样的理解对我们学习RDD的API是非常有帮助的.本文所有示例代码都是使用scala语言编写的. Spark里的计算都是操作RDD进行,那么学习RDD的第一个问题就是如何构建RDD,构建RDD从数据来源角度分为两类:第一类是从内存里直接读取数据,第二类就是从文件系统里读取,当然这里的文件

spark 笔记 8: Task/TaskContext

DAGScheduler最终创建了task set,并提交给了taskScheduler.那先得看看task是怎么定义和执行的. Task是execution执行的一个单元. /** * A unit of execution. We have two kinds of Task's in Spark: * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] *