Spark大师之路:广播变量(Broadcast)源码分析

概述

最近工作上忙死了……广播变量这一块其实早就看过了,一直没有贴出来。

本文基于Spark 1.0源码分析,主要探讨广播变量的初始化、创建、读取以及清除。

类关系

BroadcastManager类中包含一个BroadcastFactory对象的引用。大部分操作通过调用BroadcastFactory中的方法来实现。

BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory、HttpBroadcastFactory。这两个子类实现了对HttpBroadcast、TorrentBroadcast的封装,而后面两个又同时集成了Broadcast抽象类。

图……就不画了

BroadcastManager的初始化

SparkContext初始化时会创建SparkEnv对象env,这个过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

构造BroadcastManager对象时会调用initialize方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。

 val broadcastFactoryClass =
          conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")

        broadcastFactory =
          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

        // Initialize appropriate BroadcastFactory and BroadcastObject
        broadcastFactory.initialize(isDriver, conf, securityManager)

两个工厂类的initialize方法都是对其相应实体类的initialize方法的调用,下面分开两个类来看。

HttpBroadcast的initialize方法

  def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
    synchronized {
      if (!initialized) {
        bufferSize = conf.getInt("spark.buffer.size", 65536)
        compress = conf.getBoolean("spark.broadcast.compress", true)
        securityManager = securityMgr
        if (isDriver) {
          createServer(conf)
          conf.set("spark.httpBroadcast.uri",  serverUri)
        }
        serverUri = conf.get("spark.httpBroadcast.uri")
        cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
        compressionCodec = CompressionCodec.createCodec(conf)
        initialized = true
      }
    }
  }

除了一些变量的初始化外,主要做两件事情,一是createServer(只有在Driver端会做),其次是创建一个MetadataCleaner对象。

createServer

  private def createServer(conf: SparkConf) {
    broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
    server = new HttpServer(broadcastDir, securityManager)
    server.start()
    serverUri = server.uri
    logInfo("Broadcast server started at " + serverUri)
  }

首先创建一个存放广播变量的目录,默认是

conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)

然后初始化一个HttpServer对象并启动(封装了jetty),启动过程中包括加载资源文件,起端口和线程用来监控请求等。这部分的细节在org.apache.spark.HttpServer类中,此处不做展开。

创建MetadataCleaner对象

一个MetadataCleaner对象包装了一个定时计划Timer,每隔一段时间执行一个回调函数,此处传入的回调函数为cleanup:

  private def cleanup(cleanupTime: Long) {
    val iterator = files.internalMap.entrySet().iterator()
    while(iterator.hasNext) {
      val entry = iterator.next()
      val (file, time) = (entry.getKey, entry.getValue)
      if (time < cleanupTime) {
        iterator.remove()
        deleteBroadcastFile(file)
      }
    }
  }

即清楚存在吵过一定时长的broadcast文件。在时长未设定(默认情况)时,不清除:

 if (delaySeconds > 0) {
    logDebug(
      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
      "and period of " + periodSeconds + " secs")
    timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
  }

TorrentBroadcast的initialize方法

  def initialize(_isDriver: Boolean, conf: SparkConf) {
    TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
    synchronized {
      if (!initialized) {
        initialized = true
      }
    }
  }

Torrent在此处没做什么,这也可以看出和Http的区别,Torrent的处理方式就是p2p,去中心化。而Http是中心化服务,需要启动服务来接受请求。

创建broadcast变量

调用SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法来初始化一个广播变量,实现如下:

def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

即调用broadcastManager的newBroadcast方法:

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

再调用工厂类的newBroadcast方法,此处返回的是一个Broadcast对象。

HttpBroadcastFactory的newBroadcast

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
    new HttpBroadcast[T](value_, isLocal, id)

即创建一个新的HttpBroadcast对象并返回。

构造对象时主要做两件事情:

 HttpBroadcast.synchronized {
    SparkEnv.get.blockManager.putSingle(
      blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  }

  if (!isLocal) {
    HttpBroadcast.write(id, value_)
  }

1.将变量id和值放入blockManager,但并不通知master

2.调用伴生对象的write方法

def write(id: Long, value: Any) {
    val file = getFile(id)
    val out: OutputStream = {
      if (compress) {
        compressionCodec.compressedOutputStream(new FileOutputStream(file))
      } else {
        new BufferedOutputStream(new FileOutputStream(file), bufferSize)
      }
    }
    val ser = SparkEnv.get.serializer.newInstance()
    val serOut = ser.serializeStream(out)
    serOut.writeObject(value)
    serOut.close()
    files += file
  }

write方法将对象值按照指定的压缩、序列化写入指定的文件。这个文件所在的目录即是HttpServer的资源目录,文件名和id的对应关系为:

case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

TorrentBroadcastFactory的newBroadcast方法

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
    new TorrentBroadcast[T](value_, isLocal, id)

同样是创建一个TorrentBroadcast对象,并返回。

  TorrentBroadcast.synchronized {
    SparkEnv.get.blockManager.putSingle(
      broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
  }

  if (!isLocal) {
    sendBroadcast()
  }

做两件事情,第一步和Http一样,第二步:

  def sendBroadcast() {
    val tInfo = TorrentBroadcast.blockifyObject(value_)
    totalBlocks = tInfo.totalBlocks
    totalBytes = tInfo.totalBytes
    hasBlocks = tInfo.totalBlocks

    // Store meta-info
    val metaId = BroadcastBlockId(id, "meta")
    val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
    TorrentBroadcast.synchronized {
      SparkEnv.get.blockManager.putSingle(
        metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
    }

    // Store individual pieces
    for (i <- 0 until totalBlocks) {
      val pieceId = BroadcastBlockId(id, "piece" + i)
      TorrentBroadcast.synchronized {
        SparkEnv.get.blockManager.putSingle(
          pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
      }
    }
  }

可以看出,先将元数据信息缓存到blockManager,再将块信息缓存过去。开头可以看到有一个分块动作,是调用伴生对象的blockifyObject方法:

def blockifyObject[T](obj: T): TorrentInfo

此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。

元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。

数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象

读取广播变量的值

通过调用bc.value来取得广播变量的值,其主要实现在反序列化方法readObject中

HttpBroadcast的反序列化

 HttpBroadcast.synchronized {
      SparkEnv.get.blockManager.getSingle(blockId) match {
        case Some(x) => value_ = x.asInstanceOf[T]
        case None => {
          logInfo("Started reading broadcast variable " + id)
          val start = System.nanoTime
          value_ = HttpBroadcast.read[T](id)
          /*
           * We cache broadcast data in the BlockManager so that subsequent tasks using it
           * do not need to re-fetch. This data is only used locally and no other node
           * needs to fetch this block, so we don't notify the master.
           */
          SparkEnv.get.blockManager.putSingle(
            blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
          val time = (System.nanoTime - start) / 1e9
          logInfo("Reading broadcast variable " + id + " took " + time + " s")
        }
      }
    }

首先查看blockManager中是否已有,如有则直接取值,否则调用伴生对象的read方法进行读取:

def read[T: ClassTag](id: Long): T = {
    logDebug("broadcast read server: " +  serverUri + " id: broadcast-" + id)
    val url = serverUri + "/" + BroadcastBlockId(id).name

    var uc: URLConnection = null
    if (securityManager.isAuthenticationEnabled()) {
      logDebug("broadcast security enabled")
      val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager)
      uc = newuri.toURL.openConnection()
      uc.setAllowUserInteraction(false)
    } else {
      logDebug("broadcast not using security")
      uc = new URL(url).openConnection()
    }

    val in = {
      uc.setReadTimeout(httpReadTimeout)
      val inputStream = uc.getInputStream
      if (compress) {
        compressionCodec.compressedInputStream(inputStream)
      } else {
        new BufferedInputStream(inputStream, bufferSize)
      }
    }
    val ser = SparkEnv.get.serializer.newInstance()
    val serIn = ser.deserializeStream(in)
    val obj = serIn.readObject[T]()
    serIn.close()
    obj
  }

使用serverUri和block id对应的文件名直接开启一个HttpConnection将中心服务器上相应的数据取过来,使用配置的压缩和序列化机制进行解压和反序列化。

这里可以看到,所有需要用到广播变量值的executor都需要去driver上pull广播变量的内容。

取到值后,缓存到blockManager中,以便下次使用。

TorrentBroadcast的反序列化

private def readObject(in: ObjectInputStream) {
    in.defaultReadObject()
    TorrentBroadcast.synchronized {
      SparkEnv.get.blockManager.getSingle(broadcastId) match {
        case Some(x) =>
          value_ = x.asInstanceOf[T]

        case None =>
          val start = System.nanoTime
          logInfo("Started reading broadcast variable " + id)

          // Initialize @transient variables that will receive garbage values from the master.
          resetWorkerVariables()

          if (receiveBroadcast()) {
            value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)

            /* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
             * This creates a trade-off between memory usage and latency. Storing copy doubles
             * the memory footprint; not storing doubles deserialization cost. Also,
             * this does not need to be reported to BlockManagerMaster since other executors
             * does not need to access this block (they only need to fetch the chunks,
             * which are reported).
             */
            SparkEnv.get.blockManager.putSingle(
              broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

            // Remove arrayOfBlocks from memory once value_ is on local cache
            resetWorkerVariables()
          } else {
            logError("Reading broadcast variable " + id + " failed")
          }

          val time = (System.nanoTime - start) / 1e9
          logInfo("Reading broadcast variable " + id + " took " + time + " s")
      }
    }
  }

和Http一样,都是先查看blockManager中是否已经缓存,若没有,则调用receiveBroadcast方法:

def receiveBroadcast(): Boolean = {
    // Receive meta-info about the size of broadcast data,
    // the number of chunks it is divided into, etc.
    val metaId = BroadcastBlockId(id, "meta")
    var attemptId = 10
    while (attemptId > 0 && totalBlocks == -1) {
      TorrentBroadcast.synchronized {
        SparkEnv.get.blockManager.getSingle(metaId) match {
          case Some(x) =>
            val tInfo = x.asInstanceOf[TorrentInfo]
            totalBlocks = tInfo.totalBlocks
            totalBytes = tInfo.totalBytes
            arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
            hasBlocks = 0

          case None =>
            Thread.sleep(500)
        }
      }
      attemptId -= 1
    }
    if (totalBlocks == -1) {
      return false
    }

    /*
     * Fetch actual chunks of data. Note that all these chunks are stored in
     * the BlockManager and reported to the master, so that other executors
     * can find out and pull the chunks from this executor.
     */
    val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
    for (pid <- recvOrder) {
      val pieceId = BroadcastBlockId(id, "piece" + pid)
      TorrentBroadcast.synchronized {
        SparkEnv.get.blockManager.getSingle(pieceId) match {
          case Some(x) =>
            arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
            hasBlocks += 1
            SparkEnv.get.blockManager.putSingle(
              pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)

          case None =>
            throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
        }
      }
    }

    hasBlocks == totalBlocks
  }

和写数据一样,同样是分成两个部分,首先取元数据信息,再根据元数据信息读取实际的block信息。注意这里都是从blockManager中读取的,这里贴出blockManager.getSingle的分析。

调用栈中最后到BlockManager.doGetRemote方法,中间有一条语句:

 val locations = Random.shuffle(master.getLocations(blockId))

即将存有这个block的节点信息随机打乱,然后使用:

 val data = BlockManagerWorker.syncGetBlock(
        GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))

来获取。

从这里可以看出,Torrent方法首先将广播变量数据分块,并存到BlockManager中;每个节点需要读取广播变量时,是分块读取,对每一块都读取其位置信息,然后随机选一个存有此块数据的节点进行get;每个节点读取后会将包含的快信息报告给BlockManagerMaster,这样本地节点也成为了这个广播网络中的一个peer。

与Http方式形成鲜明对比,这是一个去中心化的网络,只需要保持一个tracker即可,这就是p2p的思想。

广播变量的清除

广播变量被创建时,紧接着有这样一句代码:

cleaner.foreach(_.registerBroadcastForCleanup(bc))

cleaner是一个ContextCleaner对象,会将刚刚创建的广播变量注册到其中,调用栈为:

  def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
    registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
  }
  private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
    referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
  }

等出现广播变量被弱引用时(关于弱引用,可以参考:http://blog.csdn.net/lyfi01/article/details/6415726),则会执行

cleaner.foreach(_.start())

start方法中会调用keepCleaning方法,会遍历注册的清理任务(包括RDD、shuffle和broadcast),依次进行清理:

private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
    while (!stopped) {
      try {
        val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
          .map(_.asInstanceOf[CleanupTaskWeakReference])
        reference.map(_.task).foreach { task =>
          logDebug("Got cleaning task " + task)
          referenceBuffer -= reference.get
          task match {
            case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
            case CleanShuffle(shuffleId) =>
              doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
            case CleanBroadcast(broadcastId) =>
              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
          }
        }
      } catch {
        case e: Exception => logError("Error in cleaning thread", e)
      }
    }
  }

doCleanupBroadcast调用以下语句:

broadcastManager.unbroadcast(broadcastId, true, blocking)

然后是:

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }

每个工厂类调用其对应实体类的伴生对象的unbroadcast方法。

HttpBroadcast中的变量清除

 def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
    if (removeFromDriver) {
      val file = getFile(id)
      files.remove(file)
      deleteBroadcastFile(file)
    }
  }

1是删除blockManager中的缓存,2是删除本地持久化的文件

TorrentBroadcast中的变量清除

  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
  }

小结

Broadcast可以使用在executor端多次使用某个数据的场景(比如说字典),Http和Torrent两种方式对应传统的CS访问方式和P2P访问方式,当广播变量较大或者使用较频繁时,采用后者可以减少driver端的压力。

BlockManager在此处充当P2P中的tracker角色,没有展开描述,后续会开专题讲这个部分。

声明:本文为原创,禁止用于任何商业目的,转载请注明出处:http://blog.csdn.net/asongoficeandfire/article/details/37584643

Spark大师之路:广播变量(Broadcast)源码分析,布布扣,bubuko.com

时间: 2024-11-02 14:46:35

Spark大师之路:广播变量(Broadcast)源码分析的相关文章

《深入理解SPARK:核心思想与源码分析》——SparkContext的初始化(中)

<深入理解Spark:核心思想与源码分析>一书前言的内容请看链接<深入理解SPARK:核心思想与源码分析>一书正式出版上市 <深入理解Spark:核心思想与源码分析>一书第一章的内容请看链接<第1章 环境准备> <深入理解Spark:核心思想与源码分析>一书第二章的内容请看链接<第2章 SPARK设计理念与基本架构> 由于本书的第3章内容较多,所以打算分别开辟三篇随笔分别展现. <深入理解Spark:核心思想与源码分析>一

通过 spark.files 传入spark任务依赖的文件源码分析

版本:spak2.3 相关源码:org.apache.spark.SparkContext 在创建spark任务时候,往往会指定一些依赖文件,通常我们可以在spark-submit脚本使用--files /path/to/file指定来实现. 但是公司产品的架构是通过livy来调spark任务,livy的实现其实是对spark-submit的一个包装,所以如何指定依赖文件归根到底还是在spark这边.既然不能通过命令行--files指定,那在编程中怎么指定?任务在各个节点上运行时又是如何获取到这

java ThreadLocal线程设置私有变量底层源码分析

前面也听说了ThreadLocal来实现高并发,以前都是用锁来实现,看了挺多资料的,发现其实还是区别挺大的(感觉严格来说ThreadLocal并不算高并发的解决方案),现在总结一下吧. 高并发中会出现的问题就是线程安全问题,可以说是多个线程对共享资源访问如何处理的问题,处理不当会的话,会出现结果和预期会完全不同. 一般情况下,多个线程访问一个变量都是公用他们的值,不过有时候虽然也是访问共享变量,不过每个线程却需要自己的私有变量.这个时候ThreadLocal就有用武之地了.下面是个ThreadL

Tomcat7.0源码分析——启动与停止服务

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s

Tomcat7.0源码分析——启动与停止服务原理

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s

Tomcat7.0源码分析——请求原理分析(中)

前言 在<TOMCAT7.0源码分析--请求原理分析(上)>一文中已经介绍了关于Tomcat7.0处理请求前作的初始化和准备工作,请读者在阅读本文前确保掌握<TOMCAT7.0源码分析--请求原理分析(上)>一文中的相关知识以及HTTP协议和TCP协议的一些内容.本文重点讲解Tomcat7.0在准备好接受请求后,请求过程的原理分析. 请求处理架构 在正式开始之前,我们先来看看图1中的Tomcat请求处理架构. 图1 Tomcat请求处理架构 图1列出了Tomcat请求处理架构中的主

Tomcat7.0源码分析——请求原理分析(上)

前言 谈起Tomcat的诞生,最早可以追溯到1995年.近20年来,Tomcat始终是使用最广泛的Web服务器,由于其使用Java语言开发,所以广为Java程序员所熟悉.很多人早期的J2EE项目,由程序员自己实现Jsp页面或者Servlet接受请求,后来借助Struts1.Struts2.Spring等中间件后,实际也是利用Filter或者Servlet处理请求,大家肯定要问了,这些Servlet处理的请求来自哪里?Tomcat作为Web服务器是怎样将HTTP请求交给Servlet的呢? 本文就

Spark之SQL解析(源码阅读十)

如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么.之前总结的已经写了传统数据库与Spark的sql解析之间的差别.那么我们下来直切主题~ 如今的Spark已经支持多种多样的数据源的查询与加载,兼容了Hive,可用JDBC的方式或者ODBC来连接Spark SQL.下图为官网给出的架构.那么sparkSql呢可以重用Hive本身提供的元数据仓库(MetaStore).HiveQL.以及用户自定义函数(UDF)及序列化和反序列化的工具(SerDes). 下来我们来

Tomcat7.0源码分析——请求原理分析

Tomcat7.0源码分析--请求原理分析 谈起Tomcat的诞生,最早可以追溯到1995年.近20年来,Tomcat始终是使用最广泛的Web服务器,由于其使用Java语言开发,所以广为Java程序员所熟悉.很多人早期的J2EE项目,由程序员自己实现Jsp页面或者Servlet接受请求,后来借助Struts1.Struts2.spring等中间件后,实际也是利用Filter或者Servlet处理请求,大家肯定要问了,这些Servlet处理的请求来自哪里?Tomcat作为Web服务器是怎样将HTT

Tomcat7.0源码分析——生命周期管理

前言 从server.xml文件解析出来的各个对象都是容器,比如:Server.Service.Connector等.这些容器都具有新建.初始化完成.启动.停止.失败.销毁等状态.tomcat的实现提供了对这些容器的生命周期管理,本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. Tomcat生命周期类接口设计 我们先阅读图1,从中了解Tomcat涉及生命周期管理的主要类. 图1 Tomcat生命周期类接口设计 这里对图1中涉及的主要类作个简单介绍: Lifecycle:定义了容器生命