Spark 消息队列机制源码学习

源码学习

spark源码注释中有下面一句话:

Asynchronously passes SparkListenerEvents to registered SparkListeners

即所有spark消息SparkListenerEvents 被异步的发送给已经注册过的SparkListeners.

在SparkContext中, 首先会创建LiveListenerBus实例,这个类主要功能如下:

  • 保存有消息队列,负责消息的缓存
  • 保存有注册过的listener,负责消息的分发

该类的继承层次如下所示:

listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性.

private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {

  // Marked `private[spark]` for access in tests.
  private[spark] val listeners = new CopyOnWriteArrayList[L]

  /**
   * Add a listener to listen events. This method is thread-safe and can be called in any thread.
   */
  final def addListener(listener: L) {
    listeners.add(listener)
  }
  ...

消息队列实际上是保存在类AsynchronousListenerBus中的:

  private val EVENT_QUEUE_CAPACITY = 10000
  private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)

事件队列的长度为10000,当缓存事件数量达到上限后,新来的事件会被丢弃,具体的丢弃处理函数位于LiveListenerBus类中:

private[spark] class LiveListenerBus
  extends AsynchronousListenerBus[SparkListener, SparkListenerEvent]("SparkListenerBus")
  with SparkListenerBus {

  private val logDroppedEvent = new AtomicBoolean(false)

  override def onDropEvent(event: SparkListenerEvent): Unit = {
    if (logDroppedEvent.compareAndSet(false, true)) {
      // Only log the following message once to avoid duplicated annoying logs.
      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
        "This likely means one of the SparkListeners is too slow and cannot keep up with " +
        "the rate at which tasks are being started by the scheduler.")
    }
  }

}

通过上面代码可以看到, 处理方式输出错误日志,且通过使用变量logDroppedEvent来保证仅输出一次.

继续把目光放在类AsynchronousListenerBus上,该类是消息机制的核心.既然是消息队列,就涉及到消息的生产和消费.首先来看消息的消费方式,AsynchronousListenerBus类会创建一个消费者线程listenerThread,来从消息队列中取得消息并进行分发,下面是实现代码:

private val listenerThread = new Thread(name) {
    setDaemon(true)
    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
      while (true) {
        eventLock.acquire()
        self.synchronized {
          processingEvent = true
        }
        try {
          val event = eventQueue.poll
          if (event == null) {
            // Get out of the while loop and shutdown the daemon thread
            if (!stopped.get) {
              throw new IllegalStateException("Polling `null` from eventQueue means" +
                " the listener bus has been stopped. So `stopped` must be true")
            }
            return
          }
          postToAll(event)
        } finally {
          self.synchronized {
            processingEvent = false
          }
        }
      }
    }
  }

整个思想就是典型的生产者消费者思想.为了保证生产者和消费者对消息队列的并发访问,在每次需要获取消息的时候,调用eventLock.acquire()来获取信号量, 信号量的值就是当前队列中所含有的事件数量.如果正常获取到事件,就调用postToAll将事件分发给所有listener, 继续下一次循环. 如果获取到null值, 则有下面两种情况:

  1. 整个application正常结束, 此时stopped值已经被设置为true
  2. 系统发生了错误, 立即终止运行

下面来看看生产者,代码如下:

  def post(event: E) {
    if (stopped.get) {
      // Drop further events to make `listenerThread` exit ASAP
      logError(s"$name has already stopped! Dropping event $event")
      return
    }
    val eventAdded = eventQueue.offer(event)
    if (eventAdded) {
      eventLock.release()
    } else {
      onDropEvent(event)
    }
  }

该函数用来将事件放入到消息队列中,每成功放入一个事件,就调用eventLock.release()来增加信号量额值.以供消费者线程来进行消费. 如果队列满了,就调用onDropEvent来处理, 该函数已经在上面列出,此处不再赘述.

真正的消息路由是由SparkListenerBus的onPostEvent函数完成的:

override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
    event match {
      case stageSubmitted: SparkListenerStageSubmitted =>
        listener.onStageSubmitted(stageSubmitted)
      case stageCompleted: SparkListenerStageCompleted =>
        listener.onStageCompleted(stageCompleted)
      case jobStart: SparkListenerJobStart =>
        listener.onJobStart(jobStart)
      case jobEnd: SparkListenerJobEnd =>
        listener.onJobEnd(jobEnd)
      case taskStart: SparkListenerTaskStart =>
        listener.onTaskStart(taskStart)
      case taskGettingResult: SparkListenerTaskGettingResult =>
        listener.onTaskGettingResult(taskGettingResult)
      case taskEnd: SparkListenerTaskEnd =>
        listener.onTaskEnd(taskEnd)
      case environmentUpdate: SparkListenerEnvironmentUpdate =>
        listener.onEnvironmentUpdate(environmentUpdate)
      case blockManagerAdded: SparkListenerBlockManagerAdded =>
        listener.onBlockManagerAdded(blockManagerAdded)
      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
        listener.onBlockManagerRemoved(blockManagerRemoved)
      case unpersistRDD: SparkListenerUnpersistRDD =>
        listener.onUnpersistRDD(unpersistRDD)
      case applicationStart: SparkListenerApplicationStart =>
        listener.onApplicationStart(applicationStart)
      case applicationEnd: SparkListenerApplicationEnd =>
        listener.onApplicationEnd(applicationEnd)
      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
        listener.onExecutorMetricsUpdate(metricsUpdate)
      case executorAdded: SparkListenerExecutorAdded =>
        listener.onExecutorAdded(executorAdded)
      case executorRemoved: SparkListenerExecutorRemoved =>
        listener.onExecutorRemoved(executorRemoved)
      case blockUpdated: SparkListenerBlockUpdated =>
        listener.onBlockUpdated(blockUpdated)
      case logStart: SparkListenerLogStart => // ignore event log metadata
    }
  }

上面的代码很直观,根据不同的消息类型,调用listener对应的方法来进行处理.

下面来看看listener的定义, 所有的listener都混入了SparkListener特质.该特质定义了针对所有消息的处理函数, 定义全部为空:

@DeveloperApi
trait SparkListener {
  /**
   * Called when a stage completes successfully or fails, with information on the completed stage.
   */
  def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { }

  /**
   * Called when a stage is submitted
   */
  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }

  ......

对于特定的listener,在混入SparkListener特质之后,只需要重写相应的处理函数即可. 为了方便在进行消息路由时进行模式匹配,所有的具体的listener类均被定义为样本类.

对于spark中的事件来说,思想与listener类似,只是混入的特质不同而已, 事件混入的特质为SparkListenerEvent.


消息队列建立及发送流程介绍:

具体的消息发送流程如下所示:

在SparkContext中,会

  1. 创建LiveListenerBus类类型的成员变量listenerBus
  2. 创建各种listener,并加入到listenerBus中
  3. post一些事件到listenerBus中
  4. 调用listenerBus.start() 来启动事件处理程序

这里有一点需要注意的是, 在listenerBus.start() 调用之前, 可以向其中post消息, 这些消息会被缓存起来,等start函数调用之后, 消费者线程会分发这些缓存的消息. listenerBus.start()是在SparkContext中的setupAndStartListenerBus函数中被调用的, 下面来看看该函数的实现:

  private def setupAndStartListenerBus(): Unit = {
    // Use reflection to instantiate listeners specified via `spark.extraListeners`
    try {
      val listenerClassNames: Seq[String] =
        conf.get("spark.extraListeners", "").split(‘,‘).map(_.trim).filter(_ != "")
      for (className <- listenerClassNames) {
        // Use reflection to find the right constructor
        val constructors = {
          val listenerClass = Utils.classForName(className)
          listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]]
        }
        val constructorTakingSparkConf = constructors.find { c =>
          c.getParameterTypes.sameElements(Array(classOf[SparkConf]))
        }
        lazy val zeroArgumentConstructor = constructors.find { c =>
          c.getParameterTypes.isEmpty
        }
        val listener: SparkListener = {
          if (constructorTakingSparkConf.isDefined) {
            constructorTakingSparkConf.get.newInstance(conf)
          } else if (zeroArgumentConstructor.isDefined) {
            zeroArgumentConstructor.get.newInstance()
          } else {
            throw new SparkException(
              s"$className did not have a zero-argument constructor or a" +
                " single-argument constructor that accepts SparkConf. Note: if the class is" +
                " defined inside of another Scala class, then its constructors may accept an" +
                " implicit parameter that references the enclosing class; in this case, you must" +
                " define the listener as a top-level class in order to prevent this extra" +
                " parameter from breaking Spark‘s ability to find a valid constructor.")
          }
        }
        listenerBus.addListener(listener)
        logInfo(s"Registered listener $className")
      }
    } catch {
      case e: Exception =>
        try {
          stop()
        } finally {
          throw new SparkException(s"Exception when registering SparkListener", e)
        }
    }

    listenerBus.start(this)
    _listenerBusStarted = true
  }

这段代码首先运用反射机制来处理spark.extraListeners设置, 在spark doc中有关于该设置的解释:

A comma-separated list of classes that implement SparkListener; when initializing SparkContext, instances of these classes will be created and registered with Spark’s listener bus. If a class has a single-argument constructor that accepts a SparkConf, that constructor will be called; otherwise, a zero-argument constructor will be called. If no valid constructor can be found, the SparkContext creation will fail with an exception.

大意为:该设置制定的listener会在初始化SparkContext时被创建注册,然而对于listener的构造函数时有要求的:

  • 有一个单参数构造函数且参数为SparkConf类型, 则该构造函数被调用
  • 否则,如果有无参构造函数, 则将被调用
  • 如果没有定义构造函数,则程序异常结束

当extraListeners被构造并注册之后, listenerBus.start被调用:

def start(sc: SparkContext) {
    if (started.compareAndSet(false, true)) {
      sparkContext = sc
      listenerThread.start()
    } else {
      throw new IllegalStateException(s"$name already started!")
    }
  }

与此同时,启动消费者线程listenerThread, 开始进行消息路由.

当程序运行结束后,会调用stop函数:

  def stop() {
    if (!started.get()) {
      throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
    }
    if (stopped.compareAndSet(false, true)) {
      // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
      // `stop` is called.
      eventLock.release()
      listenerThread.join()
    } else {
      // Keep quiet
    }
  }

这里可以看到,在stop函数中调用了eventLock.release()来增加信号量的值. 然而并未向消息队列中加入新的消息,这就导致在消费者线程listenerThread读取队列时会返回null值,进而达到结束listenerThread线程的目的.

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-11-05 18:27:49

Spark 消息队列机制源码学习的相关文章

Android开发学习之路-Handler消息派发机制源码分析

注:这里只是说一下sendmessage的一个过程,post就类似的 如果我们需要发送消息,会调用sendMessage方法 public final boolean sendMessage(Message msg) { return sendMessageDelayed(msg, 0); } 这个方法会调用如下的这个方法 public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMilli

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

Android Handler消息机制源码解析

好记性不如烂笔头,今天来分析一下Handler的源码实现 Handler机制是Android系统的基础,是多线程之间切换的基础.下面我们分析一下Handler的源码实现. Handler消息机制有4个类合作完成,分别是Handler,MessageQueue,Looper,Message Handler : 获取消息,发送消息,以及处理消息的类 MessageQueue:消息队列,先进先出 Looper : 消息的循环和分发 Message : 消息实体类,分发消息和处理消息的就是这个类 主要工

【iScroll源码学习03】iScroll事件机制与滚动条的实现

[iScroll源码学习03]iScroll事件机制与滚动条的实现 前言 想不到又到周末了,周末的时间要抓紧学习才行,前几天我们学习了iScroll几点基础知识: 1. [iScroll源码学习02]分解iScroll三个核心事件点 2. [iScroll源码学习01]准备阶段 3. [iScroll源码学习00]模拟iScroll 今天我们来学习其事件机制以及滚动条的实现,完了后我们iScroll就学习的差不多了,最后会抽离iScroll的精华部分组成一个阉割版iScroll 并总结下iScr

jquery源码学习

jQuery 源码学习是对js的能力提升很有帮助的一个方法,废话不说,我们来开始学习啦 我们学习的源码是jquery-2.0.3已经不支持IE6,7,8了,因为可以少学很多hack和兼容的方法. jquery-2.0.3的代码结构如下 首先最外层为一个闭包, 代码执行的最后一句为window.$ = window.jquery = jquery 让闭包中的变量暴露倒全局中. 传参传入window是为了便于压缩 传入undefined是为了undifined被修改,他是window的属性,可以被修

Android异步消息传递机制源码分析&amp;&amp;相关知识常被问的面试题

1.Android异步消息传递机制有以下两个方式:(异步消息传递来解决线程通信问题) handler 和 AsyncTask 2.handler官方解释的用途: 1).定时任务:通过handler.postDelay(Runnable r, time)来在指定时间执行msg. 2).线程间通信:在执行较为耗时操作的时候,在子线程中执行耗时任务,然后handler(主线程的)把执行的结果通过sendmessage的方式发送给UI线程去执行用于更新UI. 3.handler源码分析 一.在Activ

Android -- 消息处理机制源码分析(Looper,Handler,Message)

android的消息处理有三个核心类:Looper,Handler和Message.其实还有一个Message Queue(消息队列),但是MQ被封装到Looper里面了,我们不会直接与MQ打交道,因此我没将其作为核心类.下面一一介绍: Looper Looper的字面意思是“循环者”,它被设计用来使一个普通线程变成Looper线程.所谓Looper线程就是循环工作的线程.在程序开发中(尤其是GUI开发中),我们经常会需要一个线程不断循环,一旦有新任务则执行,执行完继续等待下一个任务,这就是Lo

Java集合专题总结(1):HashMap 和 HashTable 源码学习和面试总结

2017年的秋招彻底结束了,感觉Java上面的最常见的集合相关的问题就是hash--系列和一些常用并发集合和队列,堆等结合算法一起考察,不完全统计,本人经历:先后百度.唯品会.58同城.新浪微博.趣分期.美团点评等都在1.2--面的时候被问过无数次,都问吐了&_&,其他公司笔试的时候,但凡有Java的题,都有集合相关考点,尤其hash表--现在总结下. Java集合概述 HashMap介绍 HashMap源码学习 关于HashMap的几个经典问题 HashTable介绍和源码学习 Hash

JDK源码学习系列05----LinkedList

                                         JDK源码学习系列05----LinkedList 1.LinkedList简介 LinkedList是基于双向链表实现的,它也可以被当作堆栈.队列或双端队列进行操作. public class LinkedList<E> extends AbstractSequentialList<E> implements List<E>, Deque<E>, Cloneable, jav