Apache Spark-1.0.0代码浅析(二):Spark初始化

LocalWordCount中,需要首先创建SparkConf配置Master、AppName等环境参数,如果程序中没有设置,则会读取系统参数。然后,以SparkConf作为参数创建SparkContext,初始化Spark环境。

val sparkConf = new SparkConf().setMaster("local").setAppName("Local Word Count")
val sc = new SparkContext(sparkConf)

初始化过程中,根据Console输出的信息可以看出,整个初始化过程做了如下工作:

spark.SecurityManager配置认证,slf4j.Slf4jLogger启动,启动Remoting监听,sparkEnv注册MapOutputTracker和BlockManagerMaster,storage.DiskBlockManager创建目录,storage.MemoryStore分配空间,network.ConnectionManager绑定端口,storage.BlockManagerMaster注册BlockManager,spark.HTTPServer启动,server.AbstractConnector启动相关链接,broadcast.HttpBroadcast启动Broadcast服务,spark.HttpFileServer配置目录,最后启动SparkUI。

15/07/14 13:20:56 INFO spark.SecurityManager: Changing view acls to: Kevin
15/07/14 13:20:56 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Kevin)
15/07/14 13:20:58 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/14 13:20:58 INFO Remoting: Starting remoting
15/07/14 13:20:58 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50494]
15/07/14 13:20:58 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:50494]
15/07/14 13:20:59 INFO spark.SparkEnv: Registering MapOutputTracker
15/07/14 13:20:59 INFO spark.SparkEnv: Registering BlockManagerMaster
15/07/14 13:20:59 INFO storage.DiskBlockManager: Created local directory at C:\Users\Kevin\AppData\Local\Temp\spark-local-20150714132059-e5a3
15/07/14 13:20:59 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
15/07/14 13:20:59 INFO network.ConnectionManager: Bound socket to port 50497 with id = ConnectionManagerId(Kevin-ThinkPad,50497)
15/07/14 13:20:59 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/07/14 13:20:59 INFO storage.BlockManagerInfo: Registering block manager Kevin-ThinkPad:50497 with 2.1 GB RAM
15/07/14 13:20:59 INFO storage.BlockManagerMaster: Registered BlockManager
15/07/14 13:20:59 INFO spark.HttpServer: Starting HTTP Server
15/07/14 13:20:59 INFO server.Server: jetty-8.1.14.v20131031
15/07/14 13:20:59 INFO server.AbstractConnector: Started [email protected]:50498
15/07/14 13:20:59 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.88.121.10:50498
15/07/14 13:20:59 INFO spark.HttpFileServer: HTTP File server directory is C:\Users\Kevin\AppData\Local\Temp\spark-105cdf2e-8671-4323-af35-1668fd462f55
15/07/14 13:20:59 INFO spark.HttpServer: Starting HTTP Server
15/07/14 13:20:59 INFO server.Server: jetty-8.1.14.v20131031
15/07/14 13:20:59 INFO server.AbstractConnector: Started [email protected]:50499
15/07/14 13:21:00 INFO server.Server: jetty-8.1.14.v20131031
15/07/14 13:21:00 INFO server.AbstractConnector: Started [email protected]:4040
15/07/14 13:21:00 INFO ui.SparkUI: Started SparkUI at http://Kevin-ThinkPad:4040

到此,初始化过程结束。

进入到SparkContext的源码,SparkContext实例化的过程中,类构造函数中执行了几个关键语句:

实例化LiveListenerBus,并启动

private[spark] val listenerBus = new LiveListenerBus
...
listenerBus.start()

创建SparkEnv

private[spark] val env = SparkEnv.create(
    conf,
    "<driver>",
    conf.get("spark.driver.host"),
    conf.get("spark.driver.port").toInt,
    isDriver = true,
    isLocal = isLocal,
    listenerBus = listenerBus)
  SparkEnv.set(env)

SparkEnv包括了众多关键的组件

class SparkEnv (
    val executorId: String,
    val actorSystem: ActorSystem,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val cacheManager: CacheManager,
    val mapOutputTracker: MapOutputTracker,
    val shuffleFetcher: ShuffleFetcher,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val connectionManager: ConnectionManager,
    val securityManager: SecurityManager,
    val httpFileServer: HttpFileServer,
    val sparkFilesDir: String,
    val metricsSystem: MetricsSystem,
    val conf: SparkConf) extends Logging

启动SparkUI并启动

private[spark] val ui = new SparkUI(this)
ui.bind()

创建TaskScheduler,并以此为参数尝试创建DAGScheduler,之后启动TaskScheduler

// Create and start the scheduler
  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => throw
      new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
  }

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler‘s
  // constructor
  taskScheduler.start()
创建TaskScheduler使用createTaskScheduler通过正则表达式匹配不同的Master类型,创建对应的TaskScheduler和backend
private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
    // Regular expression used for local[N] and local[*] master formats
    val LOCAL_N_REGEX = """local\[([0-9\*]+)\]""".r
    // Regular expression for local[N, maxRetries], used in tests with failing tasks
    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
    // Regular expression for connecting to Spark deploy clusters
    val SPARK_REGEX = """spark://(.*)""".r
    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
    val MESOS_REGEX = """(mesos|zk)://.*""".r
    // Regular expression for connection to Simr cluster
    val SIMR_REGEX = """simr://(.*)""".r

    // When running locally, don‘t try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
      case "local" =>
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(scheduler, 1)
        scheduler.initialize(backend)
        scheduler

      case LOCAL_N_REGEX(threads) =>
        def localCpuCount = Runtime.getRuntime.availableProcessors()
        // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
        val threadCount = if (threads == "*") localCpuCount else threads.toInt
        val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
        val backend = new LocalBackend(scheduler, threadCount)
        scheduler.initialize(backend)
        scheduler

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
        val backend = new LocalBackend(scheduler, threads.toInt)
        scheduler.initialize(backend)
        scheduler

      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        scheduler

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
        val memoryPerSlaveInt = memoryPerSlave.toInt
        if (sc.executorMemory > memoryPerSlaveInt) {
          throw new SparkException(
            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
              memoryPerSlaveInt, sc.executorMemory))
        }

        val scheduler = new TaskSchedulerImpl(sc)
        val localCluster = new LocalSparkCluster(
          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
        val masterUrls = localCluster.start()
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
          localCluster.stop()
        }
        scheduler

      case "yarn-standalone" | "yarn-cluster" =>
        if (master == "yarn-standalone") {
          logWarning(
            "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
        }
        val scheduler = try {
          val clazz = Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
        } catch {
          // TODO: Enumerate the exact reasons why it can fail
          // But irrespective of it, it means we cannot proceed !
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }
        val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
        scheduler.initialize(backend)
        scheduler

      case "yarn-client" =>
        val scheduler = try {
          val clazz =
            Class.forName("org.apache.spark.scheduler.cluster.YarnClientClusterScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        val backend = try {
          val clazz =
            Class.forName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        scheduler.initialize(backend)
        scheduler

      case mesosUrl @ MESOS_REGEX(_) =>
        MesosNativeLibrary.load()
        val scheduler = new TaskSchedulerImpl(sc)
        val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
        val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
        val backend = if (coarseGrained) {
          new CoarseMesosSchedulerBackend(scheduler, sc, url)
        } else {
          new MesosSchedulerBackend(scheduler, sc, url)
        }
        scheduler.initialize(backend)
        scheduler

      case SIMR_REGEX(simrUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
        scheduler.initialize(backend)
        scheduler

      case _ =>
        throw new SparkException("Could not parse Master URL: ‘" + master + "‘")
    }
  }
 

END

时间: 2024-10-15 21:46:02

Apache Spark-1.0.0代码浅析(二):Spark初始化的相关文章

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark) 概述 SparkDataFrame 启动: SparkSession 从 RStudio 来启动 创建 SparkDataFrames 从本地的 data frames 来创建 SparkDataFrames 从 Data Sources(数据源)创建 SparkDataFrame 从 Hive tables 来创建 SparkDataFrame SparkDataFrame 操作 Selecting rows(行), columns(列) Groupin

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

Apache Spark 2.2.0 中文文档 - GraphX Programming Guide | ApacheCN

GraphX Programming Guide 概述 入门 属性 Graph 示例属性 Graph Graph 运算符 运算符的汇总表 Property 运算符 Structural 运算符 Join 运算符 邻域聚合 聚合消息 (aggregateMessages) Map Reduce Triplets Transition Guide (Legacy) 计算级别信息 收集相邻点 Caching and Uncaching Pregel API Graph 建造者 Vertex and E

Apache Spark 2.2.0 中文文档 - Submitting Applications | ApacheCN

Submitting Applications 在 script in Spark的 bin 目录中的spark-submit 脚本用与在集群上启动应用程序.它可以通过一个统一的接口使用所有 Spark 支持的 cluster managers,所以您不需要专门的为每个cluster managers配置您的应用程序. 打包应用依赖 如果您的代码依赖了其它的项目,为了分发代码到 Spark 集群中您将需要将它们和您的应用程序一起打包.为此,创建一个包含您的代码以及依赖的 assembly jar

Apache Spark 2.2.0 中文文档 - Spark RDD(Resilient Distributed Datasets)

Spark RDD(Resilient Distributed Datasets)论文 概要 1: 介绍 2: Resilient Distributed Datasets(RDDs) 2.1 RDD 抽象 2.2 Spark 编程接口 2.2.1 例子 – 监控日志数据挖掘 2.3 RDD 模型的优势 2.4 不适合用 RDDs 的应用 3 Spark 编程接口 3.1 Spark 中 RDD 的操作 3.2 举例应用 3.2.1 线性回归 3.2.2 PageRank 4 表达 RDDs 5

Apache Spark 2.2.0新特性介绍(转载)

这个版本是 Structured Streaming 的一个重要里程碑,因为其终于可以正式在生产环境中使用,实验标签(experimental tag)已经被移除.在流系统中支持对任意状态进行操作:Apache Kafka 0.10 的 streaming 和 batch API支持读和写操作.除了在 SparkR, MLlib 和 GraphX 里面添加新功能外,该版本更多的工作在系统的可用性(usability).稳定性(stability)以及代码的润色(polish)并解决了超过 110

Apache Spark-1.0.0浅析(十一):Shuffle过程

一.Shuffle的产生 Shuffle Dependency是划分stages的依据,由此判断是ShuffleMapStage或ResultStage,正如下所述 * A Spark job consists of one or more stages. The very last stage in a job consists of multiple * ResultTasks, while earlier stages consist of ShuffleMapTasks. A Resul

Apache Spark-1.0.0浅析(六):资源调度——Task执行

前面说到向executorActor(task.executorID)发送LaunchTask(task)消息,在CoarseGrainedExecutorBackend中定义receive接收launchTask消息,执行executor.launchTask override def receive = { case RegisteredExecutor(sparkProperties) => logInfo("Successfully registered with driver&q