spark内核揭秘-08-spark的Web监控页面

在SparkContext中可以看到初始化UI代码:

// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
  if (conf.getBoolean("spark.ui.enabled", true)) {
    Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
      env.securityManager,appName))
  } else {
    // For tests, do not enable the UI
    None
  }

// Bind the UI before starting the task scheduler to communicate
// the bound port to the cluster manager properly
ui.foreach(_.bind())

创建SparkUI对象:

def createLiveUI(
    sc: SparkContext,
    conf: SparkConf,
    listenerBus: SparkListenerBus,
    jobProgressListener: JobProgressListener,
    securityManager: SecurityManager,
    appName: String): SparkUI =  {
  create(Some(sc), conf, listenerBus, securityManager, appName,
    jobProgressListener = Some(jobProgressListener))
}

进入create方法:

进入SparkUI:

SparkUI 继承了WebUI:

而WebUI类定义了def initialize()方法

SparkUI的initialize()实现方法:

上面代码分析:增加页面的tab页面

1、JobsTab:

2、stagesTab:

3、StorageTab:

4、EnvironmentTab:

5、ExecutorsTab:

6、createStaticHandler:

7、createRedirectHandler:

当执行完initialize()方法后,我们回到SparkContext 的Initialize the spark UI:

bind  port:

从上面代码可以看出来,启动了端口号为4040的本地JettyServer

上面代码分析:

1、创建ContextHandlerCollection并将handlers设置到ContextHandlerCollection中

2、增加一个filter:

3、创建Jetty Server并绑定端口号并创建一个QueuedThreadPool,设置到Server中:

4、启动Jetty server,如果启动失败,就抛出异常:

5、试着重试几次启动startServiceOnPort,如果启动失败,就更换新端口号(规则是1+oldPort),重新启动:

最终启动成功后,会把信息东西存放在ServerInfo中:

时间: 2024-10-26 05:38:25

spark内核揭秘-08-spark的Web监控页面的相关文章

spark内核揭秘-01-spark内核核心术语解析

Application: Application是创建了SparkContext实例对象的spark用户,包含了Driver程序: Spark-shell是一个应用程序,因为spark-shell在启动的时候创建了一个SparkContext对象,其名称为sc: Job: 和Spark的action相对应,每一个action例如count.saveAsTextFile等都会对应一个job实例,该job实例包含多任务的并行计算. Driver Program: 运行main函数并且创建SparkC

spark内核揭秘-03-spark核心组件

spark核心组件如下所示: 在SparkContext初始化的时候,会初始化一系列内容: 查看内存使用情况: 创建和启动scheduler: 集群核心组件中的Block tracker是用于block和partition对应关系的管理. 集群核心组件中的shuffle tracker是用于记录shuffle操作的过程细节. 从集群中也可以看出,Executor在执行任务的时候是采用多线程的方式执行的并能够在HDFS或者HBase等系统上读取数据. 而在实际的Driver Program运行的时

spark内核揭秘-02-spark集群概览

Spark集群预览: 官方文档对spark集群的初步描述如下,这是一个典型的主从结构: 官方文档对spark集群中的一些关键点给出详细的指导: 其Worker的定义如下所示: 需要注意的是Spark Driver所在的集群需要和Spark集群最好位于同一个网络环境中,因为Driver中的SparkContext实例需发送任务给不同Worker Node的Executor并接受Executor的一些执行结果信息,一般而言,在企业实际的生产环境中Driver所在的机器是的配置往往都是比较不错的,尤其

spark内核揭秘-06-TaskSceduler启动源码解析初体验

TaskScheduler实例对象启动源代码如下所示: 从上面代码可以看出来,taskScheduler的启动是在SparkContext 找到TaskSchedulerImpl实现类中的start方法实现: 1.从上代码看到,先启动CoarseGrainedSchedulerBackend, 从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会

spark内核揭秘-04-spark任务调度系统个人理解

spark的任务调度系统如下所示: 从上图中科院看出来由RDDObject产生DAG,然后进入了DAGScheduler阶段,DAGScheduler是面向state的高层次的调度器,DAGScheduler把DAG拆分成很多的tasks,每组的tasks都是一个state,每当遇到shuffle就会产生新的state,可以看出上图一共有三个state:DAGScheduler需要记录那些RDD被存入磁盘等物化动作,同时需勋勋task的最优化调度,例如数据本地性等:DAGScheduler还要监

spark内核揭秘-14-Spark性能优化的10大问题及其解决方案

问题1:reduce task数目不合适 解决方案: 需要根据实际情况调整默认配置,调整方式是修改参数spark.default.parallelism.通常的,reduce数目设置为core数目的2-3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太小,任务运行缓慢.所以要合理修改reduce的task数目即spark.default.parallelism 问题2:shuffle磁盘IO时间长 解决方案: 设置spark.local.dir为多个磁盘,并设置磁盘的IO速度快的磁盘,通

spark内核揭秘-10-RDD源码分析

RDD的核心方法: 首先看一下getPartitions方法的源码: getPartitions返回的是一系列partitions的集合,即一个Partition类型的数组 我们就想进入HadoopRDD实现: 1.getJobConf():用来获取job Configuration,获取配置方式有clone和非clone方式,但是clone方式 是not thread-safe,默认是禁止的,非clone方式可以从cache中获取,如cache中没有那就创建一个新的,然后再放到cache中 2

spark内核揭秘-05-SparkContext核心源码解析初体验

SparkContext在获得了一系列的初始化信息后开始创建并启动TaskScheduler实例: 进入createTaskScheduler方法: 我们看一下其Standalone的方式: 在上述代码中首先实例化一个TaskSchedulerImpl: 然后构建出了masterUrls: 接着创建出关键的backend: 进入SparkDeploySchedulerBackend实现: 从以上截图可以看出来,SparkDeploySchedulerBackend核心是为了启动CoarseGra

spark内核揭秘-09-RDD的count操作 触发Job全生命周期-02

接着上一篇文章继续分析代码: 3.1.3.3.3.1.进入TaskSet 方法: 3.1.3.3.3.2.进入taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) 方法: 从源代码中可以看出DAGScheduler中向TaskScheduler以Stage为单位提交任务,Stage是以TaskSet为单位的,构建一个TaskSet