SparkContext的初始化(季篇)——测量系统、ContextCleaner及环境更新

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市

《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》

《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》

由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现。

《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(伯篇)》

《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》

《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(叔篇)》

本文展现第3章第三部分的内容:

3.9 启动测量系统MetricsSystem

MetricsSystem使用codahale提供的第三方测量仓库Metrics,有关Metrics的具体信息可以参考附录D。MetricsSystem中有三个概念:

q  Instance:指定了谁在使用测量系统;

q  Source:指定了从哪里收集测量数据;

q  Sink:指定了往哪里输出测量数据。

Spark按照Instance的不同,区分为Master、Worker、Application、Driver和Executor。

Spark目前提供的Sink有ConsoleSink、CsvSink、JmxSink、MetricsServlet、GraphiteSink等。

Spark中使用MetricsServlet作为默认的Sink。

MetricsSystem的启动代码如下。

val metricsSystem = env.metricsSystem
  metricsSystem.start()

MetricsSystem的启动过程包括以下步骤:

1) 注册Sources;

2) 注册Sinks;

3) 给Sinks增加Jetty的ServletContextHandler。

MetricsSystem启动完毕后,会遍历与Sinks有关的ServletContextHandler,并调用attachHandler将它们绑定到SparkUI上。

metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

3.9.1 注册Sources

  registerSources方法用于注册Sources,它的实现见代码清单3-44。注册Sources的过程分为以下步骤:

1) 从metricsConfig获取Driver的Properties,默认为创建MetricsSystem的过程中解析的{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。

2) 从Driver的Properties中用正则匹配以source.开头的属性。然后将属性中的Source反射得到的实例,加入ArrayBuffer[Source]。

3) 将每个Source的metricRegistry(也是MetricSet的子类型)注册到ConcurrentMap<String, Metric> metrics。这里的registerSource方法已在3.8.2节讲解过。

代码清单3-44         MetricsSystem

private def registerSources() {

    val instConfig = metricsConfig.getInstance(instance)
    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) 

    // Register all the sources related to instance
    sourceConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      try {
        val source = Class.forName(classPath).newInstance()
        registerSource(source.asInstanceOf[Source])
      } catch {
        case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
      }
    }
  }

3.9.2 注册Sinks

  registerSinks方法用于注册Sinks,它的实现见代码清单3-45。注册Sinks的步骤如下:

1) 从Driver的Properties中用正则匹配以sink.开头的属性,如:{sink.servlet.class=org.apache.spark.metrics.sink.MetricsServlet, sink.servlet.path=/metrics/json}。将其转换为Map(servlet -> {class=org.apache.spark.metrics.sink.MetricsServlet, path=/metrics/json})。

2) 将子属性class对应的类metricsServlet反射得到MetricsServlet实例。如果属性的key是servlet,将其设置为metricsServlet;如果是Sink,则加入到ArrayBuffer[Sink]中。

代码清单3-45         MetricsSystem注册Sinks的实现

  private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    sinkConfigs.foreach { kv =>
      val classPath = kv._2.getProperty("class")
      if (null != classPath) {
        try {
          val sink = Class.forName(classPath)
            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
            .newInstance(kv._2, registry, securityMgr)

          if (kv._1 == "servlet") {
            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
          } else {
            sinks += sink.asInstanceOf[Sink]
          }
        } catch {
          case e: Exception => logError("Sink class "+ classPath + " cannot be instantialized",e)
        }
      }
    }
  }

3.9.3给Sinks增加Jetty的ServletContextHandler

MetricsSystem的getServletHandlers方法,实现如下。

  def getServletHandlers = {
    require(running, "Can only call getServletHandlers on a running MetricsSystem")
    metricsServlet.map(_.getHandlers).getOrElse(Array())
  }

可以看到调用了metricsServlet的getHandlers,其实现如下。

def getHandlers = Array[ServletContextHandler](
    createServletHandler(servletPath,
      new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
  )

最终生成处理/metrics/json请求的ServletContextHandler,而请求的真正处理由getMetricsSnapshot方法,利用fastjson解析。生成的ServletContextHandler通过SparkUI的attachHandler方法,也被绑定到SparkUI。createServletHandler与attachHandler方法都已经在3.4.4节详细阐述。最终我们可以使用以下这些地址来访问测量数据。

http://localhost:4040/metrics/applications/json

http://localhost:4040/metrics/json

http://localhost:4040/metrics/master/json

3.10 创建和启动ExecutorAllocationManager

  ExecutorAllocationManager用于动态分配executor,创建和启动ExecutorAllocationManager的代码如下。

  private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =

    if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
      Some(new ExecutorAllocationManager(this, listenerBus, conf))
    } else {
      None
    }

  executorAllocationManager.foreach(_.start())

默认情况下不会创建ExecutorAllocationManager,可以修改属性spark.dynamicAllocation.enabled为true来创建。ExecutorAllocationManager可以设置动态分配最小Executor数量、动态分配最大Executor数量、每个Executor可以运行的Task数量等配置信息,并对配置信息进行校验。start方法将ExecutorAllocationListener加入到listenerBus中,ExecutorAllocationListener通过监听listenerBus里的事件,动态添加删除executor。并且通过Thread不断的添加executor,并且遍历executor,将超时的executor杀掉并且移除。ExecutorAllocationListener的实现与其他SparkListener类似,不再赘述。ExecutorAllocationManager的关键代码见代码清单3-46。

代码清单3-46         ExecutorAllocationManagerr的关键代码

  private val intervalMillis: Long = 100
  private var clock: Clock = new RealClock
  private val listener = new ExecutorAllocationListener

  def start(): Unit = {
    listenerBus.addListener(listener)
    startPolling()
  }

  private def startPolling(): Unit = {

    val t = new Thread {
      override def run(): Unit = {
        while (true) {
          try {
            schedule()
          } catch {
            case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
          }

          Thread.sleep(intervalMillis)
        }
      }
    }

    t.setName("spark-dynamic-executor-allocation")
    t.setDaemon(true)
    t.start()
  }

根据3.4.1节的内容,我们知道listenerBus内置了线程listenerThread,此线程不断从eventQueue中拉出事件对象,调用监听器的监听方法。要启动此线程,需要调用listenerBus的start方法,代码如下。

  listenerBus.start()

3.11 ContextCleaner的创建与启动

  由于配置属性spark.cleaner.referenceTracking默认是true,所以会构造并启动ContextCleaner,代码如下。

  private[spark] val cleaner: Option[ContextCleaner] = {
    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  }

  cleaner.foreach(_.start())

ContextCleaner用于清理那些超出应用范围的RDD、ShuffleDependency和Broadcast对象。ContextCleaner的组成如下:

q  referenceQueue:缓存顶级的AnyRef引用;

q  referenceBuffer:缓存AnyRef的虚引用;

q  listeners:缓存清理工作的监听器数组;

q  cleaningThread:用于具体清理工作的线程。

ContextCleaner的工作原理和listenerBus一样,也采用监听器模式,由线程来处理,此线程实际只是调用keepCleaning方法。keepCleaning的实现见代码清单3-47。

代码清单3-47         ContextCleaner的实现

  private def keepCleaning(): Unit = Utils.logUncaughtExceptions {

    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])

        // Synchronize here to avoid being interrupted on stop()
        synchronized {
          reference.map(_.task).foreach { task =>
            logDebug("Got cleaning task " + task)
            referenceBuffer -= reference.get
            task match {
              case CleanRDD(rddId) =>
                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)

              case CleanShuffle(shuffleId) =>
                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

              case CleanBroadcast(broadcastId) =>
                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
            }
          }
        }
      } catch {
        case ie: InterruptedException if stopped => // ignore
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

3.12 Spark环境更新

  在SparkContext的初始化过程中,可能对其环境造成影响,所以需要更新环境,代码如下。

postEnvironmentUpdate()
postApplicationStart()

SparkContext初始化过程中,如果设置了spark.jars属性, spark.jars指定的jar包将由addJar方法加入到httpFileServer的jarDir变量指定的路径下。spark.files指定的文件将由addFile方法加入到httpFileServer的fileDir变量指定的路径下。见代码清单3-48。

代码清单3-48         依赖文件处理

  val jars: Seq[String] =
    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

  val files: Seq[String] =
    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten

// Add each JAR given through the constructor
  if (jars != null) {
    jars.foreach(addJar)
  }

  if (files != null) {
    files.foreach(addFile)
  }

httpFileServer的addFile和addJar方法,见代码清单3-49。

代码清单3-49         HttpFileServer提供对依赖文件的访问

  def addFile(file: File) : String = {
    addFileToDir(file, fileDir)
    serverUri + "/files/" + file.getName
  }

  def addJar(file: File) : String = {
    addFileToDir(file, jarDir)
    serverUri + "/jars/" + file.getName
  }

  def addFileToDir(file: File, dir: File) : String = {
    if (file.isDirectory) {
      throw new IllegalArgumentException(s"$file cannot be a directory.")
    }

    Files.copy(file, new File(dir, file.getName))
    dir + "/" + file.getName
  }

postEnvironmentUpdate的实现见代码清单3-50,其处理步骤如下:

1) 通过调用SparkEnv的方法environmentDetails最终影响环境的JVM参数、Spark 属性、系统属性、classPath等,参见代码清单3-51。

2) 生成事件SparkListenerEnvironmentUpdate,并post到listenerBus,此事件被EnvironmentListener监听,最终影响EnvironmentPage页面中的输出内容。

代码清单3-50         SparkContext环境更新

  private def postEnvironmentUpdate() {

    if (taskScheduler != null) {
      val schedulingMode = getSchedulingMode.toString
      val addedJarPaths = addedJars.keys.toSeq
      val addedFilePaths = addedFiles.keys.toSeq
      val environmentDetails =
        SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths)
      val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)

      listenerBus.post(environmentUpdate)
    }
  }

代码清单3-51         environmentDetails的实现

   val jvmInformation = Seq(
      ("Java Version", s"$javaVersion ($javaVendor)"),
      ("Java Home", javaHome),
      ("Scala Version", versionString)
    ).sorted

    val schedulerMode =
      if (!conf.contains("spark.scheduler.mode")) {
        Seq(("spark.scheduler.mode", schedulingMode))
      } else {
        Seq[(String, String)]()
      }

    val sparkProperties = (conf.getAll ++ schedulerMode).sorted

    // System properties that are not java classpaths
    val systemProperties = Utils.getSystemProperties.toSeq
    val otherProperties = systemProperties.filter { case (k, _) =>
      k != "java.class.path" && !k.startsWith("spark.")
    }.sorted

    // Class paths including all added jars and files

    val classPathEntries = javaClassPath
      .split(File.pathSeparator)
      .filterNot(_.isEmpty)
      .map((_, "System Classpath"))
    val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
    val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

    Map[String, Seq[(String, String)]](
      "JVM Information" -> jvmInformation,
      "Spark Properties" -> sparkProperties,
      "System Properties" -> otherProperties,
      "Classpath Entries" -> classPaths)
  }

postApplicationStart方法很简单,只是向listenerBus发送了SparkListenerApplicationStart事件,代码如下。

listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
  startTime, sparkUser))

3.13 创建DAGSchedulerSource和BlockManagerSource

  在创建DAGSchedulerSource、BlockManagerSource之前首先调用taskScheduler的postStartHook方法,其目的是为了等待backend就绪,见代码清单3-52。postStartHook的实现见代码清单3-53。

  创建DAGSchedulerSource和BlockManagerSource的过程类似于ExecutorSource,只不过DAGSchedulerSource测量的信息是stage. failedStages、stage. runningStages、stage. waitingStages、stage. allJobs、stage. activeJobs,BlockManagerSource测量的信息是memory. maxMem_MB、memory. remainingMem_MB、memory. memUsed_MB、memory. diskSpaceUsed_MB。

代码清单3-52         创建DAGSchedulerSource和BlockManagerSource

  taskScheduler.postStartHook()

  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

  private def initDriverMetrics() {
    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
  }

  initDriverMetrics()

代码清单3-53         等待backend就绪的实现

override def postStartHook() {
    waitBackendReady()
  }

private def waitBackendReady(): Unit = {
    if (backend.isReady) {
      return
    }

    while (!backend.isReady) {
      synchronized {
        this.wait(100)
      }
    }
  }

3.14 将SparkContext标记为激活

  SparkContext初始化的最后将当前SparkContext的状态从contextBeingConstructed(正在构建中)改为activeContext(已激活),代码如下。

SparkContext.setActiveContext(this, allowMultipleContexts)

setActiveContext方法的实现如下。

  private[spark] def setActiveContext(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {

    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      assertNoOtherContextIsRunning(sc, allowMultipleContexts)
      contextBeingConstructed = None
      activeContext = Some(sc)
    }
  }

3.15 小结

  回顾本章, Scala与Akka基于Actor的并发编程模型给人带来深刻的印象,改变了我本人每当需要提升性能时就想到使用多线程的传统观念,Actor与事件模型有类似之处,通过异步处理,减少线程切换开销,值得开发人员借鉴。listenerBus对于监听器模式的经典应用将处理转化为事件并交给统一的线程处理,减少了线程阻塞与切换,提升了性能,希望读者朋友能应用到自己的产品开发中去。此外,使用Netty所提供的异步网络框架构建的Block传输服务,基于Jetty构建的内嵌web服务、HTTP文件服务器和SparkUI,基于codahale提供的第三方测量仓库创建的测量系统,Executor中的心跳实现等内容,都值得借鉴。

后记:自己牺牲了7个月的周末和下班空闲时间,通过研究Spark源码和原理,总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前亚马逊、京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。我开始研究源码时的Spark版本是1.2.0,经过7个多月的研究和出版社近4个月的流程,Spark自身的版本迭代也很快,如今最新已经是1.6.0。目前市面上另外2本源码研究的Spark书籍的版本分别是0.9.0版本和1.2.0版本,看来这些书的作者都与我一样,遇到了这种问题。由于研究和出版都需要时间,所以不能及时跟上Spark的脚步,还请大家见谅。但是Spark核心部分的变化相对还是很少的,如果对版本不是过于追求,依然可以选择本书。

京东(现有满100减30活动):http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

时间: 2024-08-16 16:41:09

SparkContext的初始化(季篇)——测量系统、ContextCleaner及环境更新的相关文章

SparkContext的初始化(叔篇)——TaskScheduler的启动

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟四篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

流量管理初始化目录篇(一)

这个项目是我iOS第一个从0开始的一个项目,曾经虽然做过基于cordovar 框架的项目,但是那个项目更多的是js代码来实现,谈不上太多的native代码. 由于自己的喜好和向往,来到了斯凯网络科技,虽然说是上市公司,但是进来之后我才发现负责iOS只有我一个人: 对于工作多年后的我来说,也渐渐明白了,我们都要自己承担起己的责任,依靠别人是给不了我们太多的答案的! 所以此时的我还是比较安静.淡定的---获取自己的能力有了很大的进步的原因.我目前的唯一目标只有一个:我要做一个自己并且大家也喜欢的ap

进程篇(1: 进程运行环境)--请参照本博客“操作系统”专栏

2014年5月30日  下午1:40:59 1. Unix 进程执行环境: 1.1 终止处理程序: ISO C 规定,一个程序可以登记多达32个函数,这些函数将由exit自动调用.我们称这些函数为终止处理程序(exit handler),并调用atexit函数来登记这些函数.该函数的原型如下: 1 #include <stdlib.h>2 3 int atexit(void (*function)(void)); exit调用这些终止程序的顺序与他们登记时的顺序相反(先登记后调用).同一个函数

(十三) [终篇] 一起学 Unix 环境高级编程 (APUE) 之 网络 IPC:套接字

. . . . . 目录 (一) 一起学 Unix 环境高级编程 (APUE) 之 标准IO (二) 一起学 Unix 环境高级编程 (APUE) 之 文件 IO (三) 一起学 Unix 环境高级编程 (APUE) 之 文件和目录 (四) 一起学 Unix 环境高级编程 (APUE) 之 系统数据文件和信息 (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境 (六) 一起学 Unix 环境高级编程 (APUE) 之 进程控制 (七) 一起学 Unix 环境高级编程 (APUE)

跟我学SpringCloud | 终篇:文章汇总(持续更新)

SpringCloud系列教程 | 终篇:文章汇总(持续更新) 我为什么这些文章?一是巩固自己的知识,二是希望有更加开放和与人分享的心态,三是接受各位大神的批评指教,有任何问题可以联系我: [email protected]. Github源码下载:https://github.com/meteor1993/SpringCloudLearning <跟我学SpringCloud>系列: Greenwich版 Spring Cloud Greenwich.SR1; Spring Boot 2.1

WPF 精修篇 非UI进程后台更新UI进程

原文:WPF 精修篇 非UI进程后台更新UI进程 <Grid> <Grid.RowDefinitions> <RowDefinition Height="11*"/> <RowDefinition Height="29*"/> </Grid.RowDefinitions> <StackPanel Orientation="Horizontal" Margin="0&quo

spark 源码分析之二 -- SparkContext 的初始化过程

创建或使用现有Session 从Spark 2.0 开始,引入了 SparkSession的概念,创建或使用已有的session 代码如下: 1 val spark = SparkSession 2 .builder 3 .appName("SparkTC") 4 .getOrCreate() 首先,使用了 builder 模式来创建或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代码如

Spark源码剖析——SparkContext的初始化(四)_TaskScheduler的启动

7. TaskScheduler的启动 第五节介绍了TaskScheduler的创建,要想TaskScheduler发挥作用,必须要启动它,代码: TaskScheduler在启动的时候,实际调用了backend的start方法,即同时启动了backend.local模式下,这里的backend是localSchedulerBackend.在TaskScheduler初始化时传入localSchedulerBackend.以LocalSchedulerBackend为例,启动LocalSched