Spark2.1.0之源码分析——事件总线

阅读提示:阅读本文前,最好先阅读《Spark2.1.0之源码分析——事件总线》、《Spark2.1.0事件总线分析——ListenerBus的继承体系》及《Spark2.1.0事件总线分析——SparkListenerBus详解》几篇文章的内容。

LiveListenerBus继承了SparkListenerBus,并实现了将事件异步投递给监听器,达到实时刷新UI界面数据的效果。LiveListenerBus主要由以下部分组成:

  • eventQueue:是SparkListenerEvent事件的阻塞队列,队列大小可以通过Spark属性spark.scheduler.listenerbus.eventqueue.size进行配置,默认为10000(Spark早期版本中属于静态属性,固定为10000,这导致队列堆满时,只得移除一些最老的事件,最终导致各种问题与bug);
  • started:标记LiveListenerBus的启动状态的AtomicBoolean类型的变量;
  • stopped:标记LiveListenerBus的停止状态的AtomicBoolean类型的变量;
  • droppedEventsCounter:使用AtomicLong类型对删除的事件进行计数,每当日志打印了droppedEventsCounter后,会将droppedEventsCounter重置为0;
  • lastReportTimestamp:用于记录最后一次日志打印droppedEventsCounter的时间戳;
  • processingEvent:用来标记当前正有事件被listenerThread线程处理;
  • logDroppedEvent:AtomicBoolean类型的变量,用于标记是否由于eventQueue已满,导致新的事件被删除;
  • eventLock:用于当有新的事件到来时释放信号量,当对事件进行处理时获取信号量;
  • listeners:继承自LiveListenerBus的监听器数组;
  • listenerThread:处理事件的线程。

异步事件处理线程

listenerThread用于异步处理eventQueue中的事件,为了便于说明,这里将展示listenerThread及LiveListenerBus中的主要代码片段,见代码清单1。

代码清单1         LiveListenerBus主要逻辑的代码片段

  1. private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()

  2.  

    private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)

  3.  

  4.  

    private def validateAndGetQueueSize(): Int = {

  5.  

    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)

  6.  

    if (queueSize <= 0) {

  7.  

    throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!")

  8.  

    }

  9.  

    queueSize

  10.  

    }

  11.  

  12.  

    private val started = new AtomicBoolean(false)

  13.  

    private val stopped = new AtomicBoolean(false)

  14.  

    private val droppedEventsCounter = new AtomicLong(0L)

  15.  

    @volatile private var lastReportTimestamp = 0L

  16.  

    private var processingEvent = false

  17.  

    private val logDroppedEvent = new AtomicBoolean(false)

  18.  

    private val eventLock = new Semaphore(0)

  19.  

  20.  

    private val listenerThread = new Thread(name) {

  21.  

    setDaemon(true)

  22.  

    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {

  23.  

    LiveListenerBus.withinListenerThread.withValue(true) {

  24.  

    while (true) {

  25.  

    eventLock.acquire() // 获取信号量

  26.  

    self.synchronized {

  27.  

    processingEvent = true

  28.  

    }

  29.  

    try {

  30.  

    val event = eventQueue.poll //从eventQueue中获取事件

  31.  

    if (event == null) {

  32.  

    // Get out of the while loop and shutdown the daemon thread

  33.  

    if (!stopped.get) {

  34.  

    throw new IllegalStateException("Polling `null` from eventQueue means" +

  35.  

    " the listener bus has been stopped. So `stopped` must be true")

  36.  

    }

  37.  

    return

  38.  

    }

  39.  

    postToAll(event) // 事件处理

  40.  

    } finally {

  41.  

    self.synchronized {

  42.  

    processingEvent = false

  43.  

    }

  44.  

    }

  45.  

    }

  46.  

    }

  47.  

    }

  48.  

    }

通过分析代码清单1,listenerThread的工作步骤为:

  1. 不断获取信号量(当可以获取信号量时,说明还有事件未处理);
  2. 通过同步控制,将processingEvent设置为true;
  3. 从eventQueue中获取事件;
  4. 调用超类ListenerBus的postToAll方法(postToAll方法对监听器进行遍历,并调用SparkListenerBus的doPostEvent方法对事件进行匹配后执行监听器的相应方法);
  5. 每次循环结束依然需要通过同步控制,将processingEvent设置为false;

值得一提的是,listenerThread的run方法中调用了Utils的tryOrStopSparkContext,tryOrStopSparkContext方法可以保证当listenerThread的内部循环抛出异常后启动一个新的线程停止SparkContext(SparkContext的内容将在第4章详细介绍,tryOrStopSparkContext方法的具体实现请阅读《附录A Spark2.1核心工具类Utils》)。

LiveListenerBus的消息投递

在解释了异步线程listenerThread的工作内容后,还有一个要点没有解释:eventQueue中的事件是如何放进去的呢?由于eventQueue定义在LiveListenerBus中,因此ListenerBus和SparkListenerBus中并没有操纵eventQueue的方法,要将事件放入eventQueue只能依靠LiveListenerBus自己了,其post方法就是为此目的而生的,见代码清单2。

代码清单2        向LiveListenerBus投递SparkListenerEvent事件

  1. def post(event: SparkListenerEvent): Unit = {

  2.  

    if (stopped.get) {

  3.  

    logError(s"$name has already stopped! Dropping event $event")

  4.  

    return

  5.  

    }

  6.  

    val eventAdded = eventQueue.offer(event) // 向eventQueue中添加事件

  7.  

    if (eventAdded) {

  8.  

    eventLock.release()

  9.  

    } else {

  10.  

    onDropEvent(event)

  11.  

    droppedEventsCounter.incrementAndGet()

  12.  

    }

  13.  

    // 打印删除事件数的日志

  14.  

    val droppedEvents =www.thd540.com droppedEventsCounter.get

  15.  

    if (droppedEvents > 0) {

  16.  

    if (System.currentTimeMillis(www.tianjiuyule178.com) - lastReportTimestamp >= 60 * 1000) {

  17.  

    if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {

  18.  

    val prevLastReportTimestamp www.meiwanyule.cn = lastReportTimestamp

  19.  

    lastReportTimestamp =www.fengshen157.com System.currentTimeMillis()

  20.  

    logWarning(s"Dropped www.dashuju178.com $droppedEvents SparkListenerEvents since " +

  21.  

    new java.util.Date(prevLastReportTimestamp))

  22.  

    }

  23.  

    }

  24.  

    }

  25.  

    }

从代码清单2看到post方法的处理步骤如下:

  1. 判断LiveListenerBus是否已经处于停止状态;
  2. 向eventQueue中添加事件。如果添加成功,则释放信号量进而催化listenerThread能够有效工作。如果eventQueue已满造成添加失败,则移除事件,并对删除事件计数器droppedEventsCounter进行自增;
  3. 如果有事件被删除,并且当前系统时间距离上一次打印droppedEventsCounter超过了60秒则将droppedEventsCounter打印到日志。

LiveListenerBus与监听器

与LiveListenerBus配合使用的监听器,并非是父类SparkListenerBus的类型参数SparkListenerInterface,而是继承自SparkListenerInterface的SparkListener及其子类。图1列出了Spark中监听器SparkListener以及它的6种最常用的实现[1]。

图1     SparkListener的类继承体系

SparkListener虽然实现了SparkListenerInterface中的每个方法,但是其实都是空实现,具体的实现需要交给子类去完成。

《Spark2.1.0之源码分析——事件总线》中首先对事件总线的接口定义进行了一些介绍,之后《Spark2.1.0事件总线分析——ListenerBus的继承体系》一文展示了ListenerBus的继承体系,然后《Spark2.1.0事件总线分析——SparkListenerBus详解》选择ListenerBus的子类SparkListenerBus进行分析,最后本文选择LiveListenerBus作为具体的实现例子进行分析,这里将通过图2更加直观的展示ListenerBus、SparkListenerBus及LiveListenerBus的工作原理。

图2     LiveListenerBus的工作流程图

最后对于图2作一些补充说明:图中的DAGScheduler、SparkContext、BlockManagerMasterEndpoint、DriverEndpoint及LocalSchedulerBackend都是LiveListenerBus的事件来源,它们都是通过调用LiveListenerBus的post方法将消息交给异步线程listenerThread处理的。



[1] 除了本节列出的的六种SparkListener的子类外,还有很多其他的子类,这里就不一一列出了,感兴趣的读者可以查阅Spark相关文档或阅读源码知晓。

关于《Spark内核设计的艺术 架构设计与实现》

原文地址:https://www.cnblogs.com/qwangxiao/p/9716590.html

时间: 2024-08-04 01:39:25

Spark2.1.0之源码分析——事件总线的相关文章

jQuery 2.0.3 源码分析 事件绑定 - bind/live/delegate/on

转:http://www.cnblogs.com/aaronjs/p/3440647.html?winzoom=1 事件(Event)是JavaScript应用跳动的心脏,通过使用JavaScript ,你可以监听特定事件的发生,并规定让某些事件发生以对这些事件做出响应 事件的基础就不重复讲解了,本来是定位源码分析实现的, 所以需要有一定的基础才行 为了下一步更好的理解内部的实现,所以首先得清楚的认识到事件接口的划分 网上资料遍地都是,但是作为一个jQuery系列的源码分析,我还是很有必要在重新

MyVoix2.0.js 源码分析 WebSpeech与WebAudio篇

楔 子 随着移动互联网时代的开启,各种移动设备走进了我们的生活.无论是日常生活中人手一部的手机,还是夜跑者必备的各种智能腕带,亦或者是充满未来科技感的google glass云云,它们正渐渐改变着我们的生活习惯以及用户交互习惯.触摸屏取代了实体按键,Siri开始慢慢释放我们的双手,而leap motion之类的硬件更是让我们彻底不需要接触IT设备便能通过手势控制它们.在这样的大背景下,前端的交互将涉及越来越多元的交叉学科,我们正如十几年前人们经历Css的诞生一样,见证着一场带动整个行业乃至社会的

转载Aaron博客 ---- jQuery 2.0.3 源码分析core - 整体架构

jQuery 2.0.3 源码分析core - 整体架构 整体架构 拜读一个开源框架,最想学到的就是设计的思想和实现的技巧. 废话不多说,jquery这么多年了分析都写烂了,老早以前就拜读过, 不过这几年都是做移动端,一直御用zepto, 最近抽出点时间把jquery又给扫一遍 我也不会照本宣科的翻译源码,结合自己的实际经验一起拜读吧! github上最新是jquery-master,加入了AMD规范了,我就以官方最新2.0.3为准 整体架构 jQuery框架的核心就是从HTML文档中匹配元素并

转载Aaron ---- jQuery 2.0.3 源码分析core - 选择器

jQuery 2.0.3 源码分析core - 选择器(02) 声明:本文为原创文章,如需转载,请注明来源并保留原文链接Aaron,谢谢! 打开jQuery源码,一眼看去到处都充斥着正则表达式,jQuery框架的基础就是查询了,查询文档元素对象,所以狭隘的说呢,jQuery就是一个选择器,并这个基础上构建和运行查询过滤器! 工欲善其事,必先利其器,所以先从正则入手 我们来分解一个表达式 // A simple way to check for HTML strings // Prioritize

jQuery 2.0.3 源码分析 Deferrred概念

转载http://www.cnblogs.com/aaronjs/p/3348569.html JavaScript编程几乎总是伴随着异步操作,传统的异步操作会在操作完成之后,使用回调函数传回结果,而回调函数中则包含了后续的工作.这也 是造成异步编程困难的主要原因:我们一直习惯于“线性”地编写代码逻辑,但是大量异步操作所带来的回调函数,会把我们的算法分解地支离破碎.此时我们不能 用if来实现逻辑分支,也不能用while/for/do来实现循环,更不用提异步操作之间的组合.错误处理以及取消操作了.

jQuery 2.0.3 源码分析 Deferred(最细的实现剖析,带图)

转载http://www.cnblogs.com/aaronjs/p/3356505.html Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html ******************构建Deferred对象时候的流程图************************** **********************源码解析********************** 因为callback被剥离出去后,整个deferred

最细的实现剖析:jQuery 2.0.3源码分析Deferred

Deferred的概念请看第一篇 http://www.cnblogs.com/aaronjs/p/3348569.html **构建Deferred对象时候的流程图** **源码解析** 因为callback被剥离出去后,整个deferred就显得非常的精简 jQuery.extend({ Deferred:function(){} when:function() )}对于extend的继承这个东东,在之前就提及过jquery如何处理内部jquery与init相互引用this的问题 对于JQ的

【朝花夕拾】Android自定义View篇之(六)Android事件分发机制(中)从源码分析事件分发逻辑及经常遇到的一些“诡异”现象

前言 转载请注明,转自[https://www.cnblogs.com/andy-songwei/p/11039252.html]谢谢! 在上一篇文章[[朝花夕拾]Android自定义View篇之(五)Android事件分发机制(上)Touch三个重要方法的处理逻辑][下文简称(五),请先阅读完(五)再阅读本文],我们通过示例和log来分析了Android的事件分发机制.这些,我们只是看到了现象,如果要进一步了解事件分发机制,这是不够的,我们还需要透过现象看本质,去研究研究源码.本文将从源码(基

zookeeper(10)源码分析-事件监听Watcher(3)

今天继续源码分析,分析一下org.apache.zookeeper.server下的WatchManager类. WatcherManager类用于管理watchers和相应的触发器. 类的属性 //watchTable表示从节点路径到watcher集合的映射 private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>()