Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

欢迎转载,转载请注明出处,徽沪一郎.

概要

WEB UI和Metrics子系统为外部观察监测Spark内部运行情况提供了必要的窗口,本文将简略的过一下其内部代码实现。

WEB UI

先上图感受一下spark webui 假设当前已经在本机运行standalone cluster模式,输入http://127.0.0.1:8080将会看到如下页面

driver application默认会打开4040端口进行http监听,可以看到application相关的详细信息

显示每个stage的详细信息

启动过程

本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是非常轻巧的servlet engine和http server。能够嵌入到用户程序中执行,不用像tomcat或jboss那样需要自己独立的jvm进程。

SparkUI在SparkContext初始化的时候创建

// Initialize the Spark UI , registering all
associated listeners
private [spark] val ui = new SparkUI (this)
ui.bind ()

initialize的主要工作是注册页面处理句柄,WebUI的子类需要实现自己的initialize函数

bind将真正启动jetty server.

def bind () {
assert (! serverInfo .isDefined , " Attempted to bind %
s more than once!". format ( className ))
try {
// 启 动 JettyServer
serverInfo = Some( startJettyServer (" 0.0.0.0 ",
port , handlers , conf))
logInfo (" Started %s at http ://%s:%d". format (
className , publicHostName , boundPort ))
} catch {
case e: Exception =>
logError (" Failed to bind %s". format ( className )
, e)
System .exit (1)
}
}

在startJettyServer函数中将JettyServer运行起来的关键处理函数是connect

 def connect(currentPort: Int): (Server, Int) = {
      val server = new Server(new InetSocketAddress(hostName, currentPort))
      val pool = new QueuedThreadPool
      pool.setDaemon(true)
      server.setThreadPool(pool)
      server.setHandler(collection)

      Try {
        server.start()
      } match {
        case s: Success[_] =>
          (server, server.getConnectors.head.getLocalPort)
        case f: Failure[_] =>
          val nextPort = (currentPort + 1) % 65536
          server.stop()
          pool.stop()
          val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort."
          if (f.toString.contains("Address already in use")) {
            logWarning(s"$msg - $f")
          } else {
            logError(msg, f.exception)
          }
          connect(nextPort)
      }
    }

    val (server, boundPort) = connect(port)
    ServerInfo(server, boundPort, collection)
  }

数据获取

页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。

需要指出的是,数据尽管得以自动更新,但页面并没有,还是需要手工刷新才能得到最新的数据。

上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在什么时候注册进去的, 注意研究一下SparkUI.initialize函

 def initialize() {
    listenerBus.addListener(storageStatusListener)
    val jobProgressTab = new JobProgressTab(this)
    attachTab(jobProgressTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
    attachHandler(
      createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
    if (live) {
      sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
    }
  }

举一个实际例子来看看Notifier发送Event的时刻,比如有任务提交的时 resourceOffer->taskStarted->handleBeginEvent

private [ scheduler ] def handleBeginEvent (task: Task[_
], taskInfo : TaskInfo ) {
listenerBus .post( SparkListenerTaskStart (task.
stageId , taskInfo ))
submitWaitingStages ()
}

post其实是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另一个处理线程listenerThread

override def run (): Unit = Utils.
logUncaughtExceptions {
while (true) {
eventLock . acquire ()
// Atomically remove and process this event
LiveListenerBus .this. synchronized {
val event = eventQueue .poll
if (event == SparkListenerShutdown ) {
// Get out of the while loop and shutdown
the daemon thread
return
}
Option (event). foreach ( postToAll )
}
}
}

Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现如下

def postToAll(event: SparkListenerEvent) {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        foreachListener(_.onStageSubmitted(stageSubmitted))
      case stageCompleted: SparkListenerStageCompleted =>
        foreachListener(_.onStageCompleted(stageCompleted))
      case jobStart: SparkListenerJobStart =>
        foreachListener(_.onJobStart(jobStart))
      case jobEnd: SparkListenerJobEnd =>
        foreachListener(_.onJobEnd(jobEnd))
      case taskStart: SparkListenerTaskStart =>
        foreachListener(_.onTaskStart(taskStart))
      case taskGettingResult: SparkListenerTaskGettingResult =>
        foreachListener(_.onTaskGettingResult(taskGettingResult))
      case taskEnd: SparkListenerTaskEnd =>
        foreachListener(_.onTaskEnd(taskEnd))
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        foreachListener(_.onEnvironmentUpdate(environmentUpdate))
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        foreachListener(_.onBlockManagerAdded(blockManagerAdded))
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        foreachListener(_.onBlockManagerRemoved(blockManagerRemoved))
      case unpersistRDD: SparkListenerUnpersistRDD =>
        foreachListener(_.onUnpersistRDD(unpersistRDD))
      case applicationStart: SparkListenerApplicationStart =>
        foreachListener(_.onApplicationStart(applicationStart))
      case applicationEnd: SparkListenerApplicationEnd =>
        foreachListener(_.onApplicationEnd(applicationEnd))
      case SparkListenerShutdown =>
    }
  }

Metrics

在系统设计中,测量模块是不可或缺的组成部分。通过这些测量数据来感知系统的运行情况。

在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述如下。

  • instance 表示谁在使用metrics system, 目前已知的有master, worker, executor和client driver会创建metrics system用以测量
  • source 表示数据源,从哪里获取数据
  • sinks 数据目的地,将从source获取的数据发送到哪

Spark目前支持将测量数据保存或发送到如下目的地

  • ConsoleSink 输出到console
  • CSVSink 定期保存成为CSV文件
  • JmxSink 注册到JMX,以通过JMXConsole来查看
  • MetricsServlet 在SparkUI中添加MetricsServlet用以查看Task运行时的测量数据
  • GraphiteSink 发送给Graphite以对整个系统(不仅仅包括spark)进行监控

下面从MetricsSystem的创建,数据源的添加,数据更新与发送几个方面来跟踪一下源码。

初始化过程

MetricsSystem依赖于由codahale提供的第三方库Metrics,可以在metrics.codahale.com找到更为详细的介绍。

以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程中就会创建MetricsSystem,具体调用关系如下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem

注册数据源,继续以SparkContext为例

  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)

  private def initDriverMetrics() {
    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
  }
initDriverMetrics()

数据读取

数据读取由Sink来完成,在Spark中创建的Sink子类如下图所示

读取最新的数据,以CsvSink为例,最主要的就是创建CsvReporter,启动之后会定期更新最近的数据到console。不同类型的Sink所使用的Reporter是不一样的。

 val reporter: CsvReporter = CsvReporter.forRegistry(registry)
      .formatFor(Locale.US)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .convertRatesTo(TimeUnit.SECONDS)
      .build(new File(pollDir))

  override def start() {
    reporter.start(pollPeriod, pollUnit)
  }

Spark中关于metrics子系统的配置文件详见conf/metrics.properties. 默认的Sink是MetricsServlet,在任务提交执行之后,输入http://127.0.0.1:4040/metrics/json会得到以json格式保存的metrics信息。

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析

时间: 2024-08-04 08:40:44

Apache Spark源码走读之21 -- WEB UI和Metrics初始化及数据更新过程分析的相关文章

Apache Spark源码走读之21 -- 浅谈mllib中线性回归的算法实现

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 机器学习算法是的主要目的是找到最能够对数据做出合理解释的模型,这个模型是假设函数,一步步的推导基本遵循这样的思路 假设函数 为了找到最好的假设函数,需要找到合理的评估标准,一般来说使用损失函数来做为评估标准 根据损失函数推出目标函数 现在问题转换成为如何找到目标函数的最优解,也就是目标函数的最优化

Apache Spark源码走读之13 -- hiveql on spark实现详解

欢迎转载,转载请注明出处,徽沪一郎 概要 在新近发布的spark 1.0中新加了sql的模块,更为引人注意的是对hive中的hiveql也提供了良好的支持,作为一个源码分析控,了解一下spark是如何完成对hql的支持是一件非常有趣的事情. Hive简介 Hive的由来 以下部分摘自Hadoop definite guide中的Hive一章 "Hive由Facebook出品,其设计之初目的是让精通SQL技能的分析师能够对Facebook存放在HDFS上的大规模数据集进行分析和查询. Hive大大

Apache Spark源码走读之7 -- Standalone部署方式分析

欢迎转载,转载请注明出处,徽沪一郎. 楔子 在Spark源码走读系列之2中曾经提到Spark能以Standalone的方式来运行cluster,但没有对Application的提交与具体运行流程做详细的分析,本文就这些问题做一个比较详细的分析,并且对在standalone模式下如何实现HA进行讲解. 没有HA的Standalone运行模式 先从比较简单的说起,所谓的没有ha是指master节点没有ha. 组成cluster的两大元素即Master和Worker.slave worker可以有1到

Apache Spark源码走读之12 -- Hive on Spark运行环境搭建

欢迎转载,转载请注明出处,徽沪一郎. 楔子 Hive是基于Hadoop的开源数据仓库工具,提供了类似于SQL的HiveQL语言,使得上层的数据分析人员不用知道太多MapReduce的知识就能对存储于Hdfs中的海量数据进行分析.由于这一特性而收到广泛的欢迎. Hive的整体框架中有一个重要的模块是执行模块,这一部分是用Hadoop中MapReduce计算框架来实现,因而在处理速度上不是非常令人满意.由于Spark出色的处理速度,有人已经成功将HiveQL的执行利用Spark来运行,这就是已经非常

Apache Spark源码走读之9 -- Spark源码编译

欢迎转载,转载请注明出处,徽沪一郎. 概要 本来源码编译没有什么可说的,对于java项目来说,只要会点maven或ant的简单命令,依葫芦画瓢,一下子就ok了.但到了Spark上面,事情似乎不这么简单,按照spark officical document上的来做,总会出现这样或那样的编译错误,让人懊恼不已. 今天闲来无事,又重试了一把,居然o了,做个记录,以备后用. 准备 我的编译机器上安装的Linux是archlinux,并安装后如下软件 scala 2.11 maven git 下载源码 第

Apache Spark源码走读之14 -- Graphx实现剖析

欢迎转载,转载请注明出处,徽沪一郎. 概要 图的并行化处理一直是一个非常热门的话题,这里头的重点有两个,一是如何将图的算法并行化,二是找到一个合适的并行化处理框架.Spark作为一个非常优秀的并行处理框架,将一些并行化的算法移到其上面就成了一个很自然的事情. Graphx是一些图的常用算法在Spark上的并行化实现,同时提供了丰富的API接口.本文就Graphx的代码架构及pagerank在graphx中的具体实现做一个初步的学习. Google为什么赢得了搜索引擎大战 当Google还在起步的

Apache Spark源码走读之5 -- DStream处理的容错性分析

欢迎转载,转载请注明出处,徽沪一郎,谢谢. 在流数据的处理过程中,为了保证处理结果的可信度(不能多算,也不能漏算),需要做到对所有的输入数据有且仅有一次处理.在Spark Streaming的处理机制中,不能多算,比较容易理解.那么它又是如何作到即使数据处理结点被重启,在重启之后这些数据也会被再次处理呢? 环境搭建 为了有一个感性的认识,先运行一下简单的Spark Streaming示例.首先确认已经安装了openbsd-netcat. 运行netcatnc -lk 9999 运行spark-s

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析

欢迎转载,转载请注明出处,徽沪一郎. 概要 本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的. Standalone部署的节点组成 介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多. 在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外

Apache Spark源码走读之16 -- spark repl实现详解

欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码的实时交互式执行,这是为什么呢? 既然scala已经提供了repl,为什么spark还要自己单独搞一套spark repl,这其中的缘由到底何在? 显然,这些都是问题,要解开这些谜团,只有再次开启一段源码分析之旅了. 全局视图 上图显示了java源文件从编译到加载执行的全局视图,整个过程中最主要的步