spark 源码分析之三 -- LiveListenerBus介绍

LiveListenerBus

  首先,它定义了 4 个 消息堵塞队列,队列的名字分别为shared、appStatus、executorManagement、eventLog。队列的类型是 org.apache.spark.scheduler.AsyncEventQueue#AsyncEventQueue,保存在 queues 变量中。每一个队列上都可以注册监听器,如果队列没有监听器,则会被移除。

  它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态。 如果总线没有启动,有事件过来,先放到 一个待添加的可变数组中,否则直接将事件 post 到每一个队列中。

  其直接依赖类是 AsyncEventQueue, 相当于 LiveListenerBus 的多事件队列是对 AsyncEventQueue 进一步的封装。

AsyncEventQueue

  其继承关系如下:

  

  它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态。

  其内部维护了listenersPlusTimers 主要就是用来保存注册到这个总线上的监听器对象的。

  post 操作将事件放入内部的 LinkedBlockingQueue中,默认大小是 10000。

  有一个事件分发器,它不停地从 LinkedBlockingQueue 执行 take 操作,获取事件,并将事件进一步分发给所有的监听器,由org.apache.spark.scheduler.SparkListenerBus#doPostEvent 方法实现事件转发,具体代码如下:

 1 protected override def doPostEvent(
 2       listener: SparkListenerInterface,
 3       event: SparkListenerEvent): Unit = {
 4     event match {
 5       case stageSubmitted: SparkListenerStageSubmitted =>
 6         listener.onStageSubmitted(stageSubmitted)
 7       case stageCompleted: SparkListenerStageCompleted =>
 8         listener.onStageCompleted(stageCompleted)
 9       case jobStart: SparkListenerJobStart =>
10         listener.onJobStart(jobStart)
11       case jobEnd: SparkListenerJobEnd =>
12         listener.onJobEnd(jobEnd)
13       case taskStart: SparkListenerTaskStart =>
14         listener.onTaskStart(taskStart)
15       case taskGettingResult: SparkListenerTaskGettingResult =>
16         listener.onTaskGettingResult(taskGettingResult)
17       case taskEnd: SparkListenerTaskEnd =>
18         listener.onTaskEnd(taskEnd)
19       case environmentUpdate: SparkListenerEnvironmentUpdate =>
20         listener.onEnvironmentUpdate(environmentUpdate)
21       case blockManagerAdded: SparkListenerBlockManagerAdded =>
22         listener.onBlockManagerAdded(blockManagerAdded)
23       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
24         listener.onBlockManagerRemoved(blockManagerRemoved)
25       case unpersistRDD: SparkListenerUnpersistRDD =>
26         listener.onUnpersistRDD(unpersistRDD)
27       case applicationStart: SparkListenerApplicationStart =>
28         listener.onApplicationStart(applicationStart)
29       case applicationEnd: SparkListenerApplicationEnd =>
30         listener.onApplicationEnd(applicationEnd)
31       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
32         listener.onExecutorMetricsUpdate(metricsUpdate)
33       case executorAdded: SparkListenerExecutorAdded =>
34         listener.onExecutorAdded(executorAdded)
35       case executorRemoved: SparkListenerExecutorRemoved =>
36         listener.onExecutorRemoved(executorRemoved)
37       case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
38         listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
39       case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
40         listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
41       case executorBlacklisted: SparkListenerExecutorBlacklisted =>
42         listener.onExecutorBlacklisted(executorBlacklisted)
43       case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
44         listener.onExecutorUnblacklisted(executorUnblacklisted)
45       case nodeBlacklisted: SparkListenerNodeBlacklisted =>
46         listener.onNodeBlacklisted(nodeBlacklisted)
47       case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
48         listener.onNodeUnblacklisted(nodeUnblacklisted)
49       case blockUpdated: SparkListenerBlockUpdated =>
50         listener.onBlockUpdated(blockUpdated)
51       case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
52         listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
53       case _ => listener.onOtherEvent(event)
54     }
55   }

  然后去调用 listener 的相对应的方法。

  就这样,事件总线上的消息事件被监听器消费了。

原文地址:https://www.cnblogs.com/johnny666888/p/11117559.html

时间: 2024-08-02 07:37:47

spark 源码分析之三 -- LiveListenerBus介绍的相关文章

Spark 源码分析系列

如下,是 spark 源码分析系列的一些文章汇总,持续更新中...... Spark RPC spark 源码分析之五--Spark RPC剖析之创建NettyRpcEnv spark 源码分析之六--Spark RPC剖析之Dispatcher和Inbox.Outbox剖析 spark 源码分析之七--Spark RPC剖析之RpcEndPoint和RpcEndPointRef剖析 spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClie

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

Spark源码分析之五:Task调度(一)

在前四篇博文中,我们分析了Job提交运行总流程的第一阶段Stage划分与提交,它又被细化为三个分阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. Stage划分与提交阶段主要是由DAGScheduler完成的,而DAGScheduler负责Job的逻辑调度,主要职责也即DAG图的分解,按照RDD间是否为shuffle dependency,将整个Job划分为一个个stage,并将每个stage转化为tasks的集合--TaskSet.

Accuracy(准确率), Precision(精确率), 和F1-Measure, 结合Spark源码分析

例子 某大学一个系,总共100人,其中男90人,女10人,现在根据每个人的特征,预测性别 Accuracy(准确率) Accuracy=预测正确的数量需要预测的总数 计算 由于我知道男生远多于女生,所以我完全无视特征,直接预测所有人都是男生 我预测所的人都是男生,而实际有90个男生,所以 预测正确的数量 = 90 需要预测的总数 = 100 Accuracy = 90 / 100 = 90% 问题 在男女比例严重不均匀的情况下,我只要预测全是男生,就能获得极高的Accuracy. 所以在正负样本