Spark的TorrentBroadcast:概念和原理

依据Spark 1.4.1源码

SparkContext的broadcast方法

注释

可以用SparkContext将一个变量广播到所有的executor上,使得所有executor都能获取这个变量代表的数据。

SparkContext对于broadcast方法的注释为:

/** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. */def broadcast[T: ClassTag](value: T): Broadcast[T]

"Broadcast a read-only variable to the cluster",指出了这个变量是只读的。只所以是只读的,我认为是因为每个executor的多个task之间共享一个被广播的变量,所以存在线程安全的问题,但是如果多个线程都“读”一个变量,仍然不能保证读操作是线程安全的,这里或许仍然需要Spark再说明一下。

(以下需要仔细区分“Broadcast变量”和"被broadcast的变量“)

"returning a Broadcast object for reading it in distributed functions",这句指出了Broadcast变量是在被分布执行的函数中使用。而被分布式执行的函数是被包含在Spark的task中分发到各个executor执行的,因此Broadcast变量作为被分发的task的一部分,需要随task一起经过序列化和反序列化的过程。

但是被broadcast的变量可能很大,而分发task的机制不是为了在集群中分发大量数据实现的,所以被broadcast的变量不宜随task一起简单地序列化和反序列化。TorrentBroadcast通过一些巧妙的方法,避免了被广播的数据随分布式执行的函数一起序列化。

总之,Broadcast变量是随task进行序列化反序列化的,而被broadcast的变量则通过另外的手段到达executor。

Broadcast变量实际是被广播变量的容器,使用时需要使用其value方法从中取出被广播的变量,而value方法是broadcast机制实现的关键之一。

调用关系

def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
      // This is a warning instead of an exception in order to avoid breaking user programs that
      // might have created RDD broadcast variables but not used them:
      logWarning("Can not directly broadcast RDDs; instead, call collect() and "
        + "broadcast the result (see SPARK-5063)")
    }
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }

首先判断一个被广播的是不是一个RDD,因为RDD是distributed,一个RDD变量并不包含有RDD中的数据集,也无法在每个executor直接获取整个RDD的数据(而是应该在driver端collect RDD的数据,然后再广播),所以Spark不支持广播RDD(但实际上可以做得到在广播RDD时,在每个executor上得到RDD中的所有数据,只是Spark没有去实现)。注意,即使广播了RDD也不会抛异常。

然后使用BroadcastManager的newBroadcast方法来生成一个Broadcast变量。而BroadcastManager会去调用BroadcastFactory的newBroadcast方法获取Broadcast变量。

Spark里的BroadcastFactor是可以配置的

 val broadcastFactoryClass =
          conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")

 broadcastFactory =
          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]

默认值即是TorrentBroadcastFactory, 它的newBroadcast方法只是new一个TorrentBroadcast对象。

 override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }

所以TorrentBroadcast机制的核心就在TorrentBroadcast类。

TorrentBroadcast的原理

注释

/** * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]]. * * The mechanism is as follows: * * The driver divides the serialized object into small chunks and * stores those chunks in the BlockManager of the driver. * * On each executor, the executor first attempts to fetch the object from its BlockManager. If * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or * other executors if available. Once it gets the chunks, it puts the chunks in its own * BlockManager, ready for other executors to fetch from. * * This prevents the driver from being the bottleneck in sending out multiple copies of the * broadcast data (one per executor) as done by the [[org.apache.spark.broadcast.HttpBroadcast]]. * * When initialized, TorrentBroadcast objects read SparkEnv.get.conf. * * @param obj object to broadcast * @param id A unique identifier for the broadcast variable. */

这段注释说明了TorrentBroadcast实现的原理,其中关键的部分在于利用BlockManager的分布式结构来储存和获取数据块。

driver把序列化后的对象(即value)分块很多块,并且把这些块存到driver的BlockManager里。

在executor端,executor首先试图从自己的BlockManager中获取被broadcast变量的块,如果它不存在,就使用远程抓取从driver 以及/或者其它的

executor上获取这个块。当executor获取了一个块,它就把这个块放在自己的BlockManager里,以使得其它的executor可以抓取它。

这防止了被广播的数据只从driver端被拷贝,这样当要拷贝的次数很多的时候(每个executor都会拷贝一次),driver端容易成为瓶颈(就像HttpBroadcast所做的一样).

这段注释时的代词用得不准确,executor是没有专门的机制用于处理Broadcast变量的,所有的魔法都在Broadcast变量本身。可以这么描述:

driver端把数据分块,每个块做为一个block存进driver端的BlockManager,每个executor会试图获取所有的块,来组装成一个被broadcast的变量。“获取块”的方法是首先从executor自身的BlockManager中获取,如果自己的BlockManager中没有这个块,就从别的BlockManager中获取。这样最初的时候,driver是获取这些块的唯一的源,但是随着各个BlockManager从driver端获取了不同的块(TorrentBroadcast会有意避免各个executor以同样的顺序获取这些块),“块”的源就多了起来,每个executor就可能从多个源中的一个,包括driver和其它executor的BlockManager中获取块,这要就使得流量在整个集群中更均匀,而不是由driver作为唯一的源。

原理就是这样啦,但是TorrentBoradcast的实现有很多有意思的细节,可以仔细分析一下。

时间: 2024-08-25 17:32:19

Spark的TorrentBroadcast:概念和原理的相关文章

Storm概念、原理详解及其应用(一)BaseStorm

本文借鉴官文,添加了一些解释和看法,其中有些理解,写的比较粗糙,有问题的地方希望大家指出.写这篇文章,是想把一些官文和资料中基础.重点拿出来,能总结出便于大家理解的话语.与大多数"wordcount"代码不同的是,并不会有如何运行第一storm代码等内容,只有在运行完代码后,发现需要明白:"知其然,并知其所以然". Storm是什么?为什么要用Storm?为什么不用Spark? 第一个问题,以下概念足以解释: Storm是基于数据流的实时处理系统,提供了大吞吐量的实

Spark(一): 基本架构及原理

Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势: Spark提供了一个全面.统一的框架用于管理各种有着不同性质(文本数据.图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求 官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍

云计算技术的产生、概念、原理、应用和前景

006年谷歌推出了“Google 101计划”,并正式提出“云”的概念和理论.随后亚马逊.微软.惠普.雅虎.英特尔.IBM等公司都宣布了自己的“云计划”,云安全.云存储.内部云.外部云.公共云.私有云……一堆让人眼花缭乱的概念在不断冲击人们的神经.那么到底什么是云计算技术呢?对云计算技术的产生.概念.原理.应用和前景又在哪里? 一.云计算思想的产生 传统模式下,企业建立一套IT系统不仅仅需要购买硬件等基础设施,还有买软件的许可证,需要专门的人员维护.当企业的规模扩大时还要继续升级各种软硬件设施以

Java中的泛型 (上) - 基本概念和原理

本节我们主要来介绍泛型的基本概念和原理 后续章节我们会介绍各种容器类,容器类可以说是日常程序开发中天天用到的,没有容器类,难以想象能开发什么真正有用的程序.而容器类是基于泛型的,不理解泛型,我们就难以深刻理解容器类.那,泛型到底是什么呢? 什么是泛型? 一个简单泛型类 我们通过一个简单的例子来说明泛型类的基本概念.实现原理和好处. 基本概念 我们直接来看代码: public class Pair<T> { T first; T second; public Pair(T first, T se

linux系统-用户管理-概念及原理

linux系统-用户管理-概念及原理 一 相关概念 1 /etc/passwd 的文件格式:用户名:密码位:用户标识号:组标识号:注释等相关描述:宿主目录:默认的shell,列如 chengzhi:x:500:500:chengzhi-CentOS:/home/chengzhi:/bin/shell 2 /etc/shadow 的文件格式:用户名:md5加密的密码(md5是单向不可逆的算法,固定长度):最后一次修改的时间:最大时间间隔:最小时间间隔:警告时间:闲置时间:失效文件:标志,前面两项比

黑马程序员-----封装的概念及原理

------<a href="http://www.itheima.com" target="blank">Java培训.Android培训.iOS培训..Net培训</a>.期待与您交流! ----- 第一讲   封装的概念及原理 本小节知识点: 1.[了解]为什么要进行封装 2.[了解]封装的原理 1.为什么要进行封装 不封装的缺点: 当一个类把自己的属性暴露给外部的时候,那么该类就是去对属性的管理权. 将数据隐藏起来,只能用此类的方法函

Java IO学习笔记:概念与原理

Java IO学习笔记:概念与原理 一.概念 Java中对文件的操作是以流的方式进行的.流是Java内存中的一组有序数据序列.Java将数据从源(文件.内存.键盘.网络)读入到内存 中,形成了流,然后将这些流还可以写到另外的目的地(文件.内存.控制台.网络),之所以称为流,是因为这个数据序列在不同时刻所操作的是源的不同部分. 二.分类 流的分类,Java的流分类比较丰富,刚接触的人看了后会感觉很晕.流分类的方式很多: 1.按照输入的方向分,输入流和输出流,输入输出的参照对象是Java程序. 2.

LDAP服务器的概念和原理简单介绍

LDAP服务器的概念和原理简单介绍 LDAP和JNDI关系

Spark的TorrentBroadcast:实现

序列化和反序列化 前边提到,TorrentBroadcast的关键就在于特制的序列化和反序列化方法.1.1版的TorrentBroadcast实现了自己的readObject和writeObject方法,但是1.4.1版的TorrentBroadcast没有实现自己的readObject方法,那么它是如何进行序列化和反序列化的呢? // obj就是被广播的对象 private val numBlocks: Int = writeBlocks(obj) override protected def