<Spark><Advanced Programming>

Introduction

  • 介绍两种共享变量的方式:

    • accumulators:聚集信息
    • broadcast variables:高效地分布large values
  • 介绍对高setup costs任务的批操作,比如查询数据库时连接数据的消耗。

Accumulators

  • 当我们向Spark传送函数时(比如map()函数或给filter()的condition),他们可以使用driver program中在他们定义之外的变量。但是cluster中的每个task都get a new copy of each variable,并且更新那些副本而不会传播到driver。
  • Spark的共享变量--accumulators和broadcast variables,通过两种通信模式(聚集结果和广播)放松了这种限制。
  • Accumulators提供了一种简单的语法,用于从worker聚集values返回到driver program。
  • Accumulators的一个最广泛的用例就是统计在job执行期间发生的events数,用于debugging。
  • val sc = new SparkContext(...)
    val file = sc.textFile("file.txt")
    
    val blankLines = sc.accumulator(0)    // create an Accumulator[Int] initialized to 0
    
    val callSigns = file.flatMap(line => {
        if (line == ""){
            blankLines += 1   // Add to the accumulator
        }
        line.split(" ")
    })callSigns.saveAsTextFile("output.txt")println("Blank lines: " + blankLines.value)

    注意只有在调用saveAsTextFile之后才能得到正确的count of blankLines,因为transfomation是lazy的。

  • Summary of accumulators:
    • 我们提供调用SparkContext.accumulator(initialValue)方法创建一个accumulator,它会返回一个org.apache.spark.Accumulator[T]对象,T是initialValue的类型;
    • Spark闭包中的worker code可以通过 += 来add accumulator;
    • driver program可以call accumulator的value property来访问accumulator的值。
  • 注意worker node不可以访问accumulator的value --> 因为从tasks的角度来看,accumulators是write-only的变量。这样的设定使得accumulators可以被高效地实现,而不需要每次更新的时候都通信。

Accumulators and Fault Tolerance

  • Spark通过re-executikng failed or slow tasks来处理failed or slow machines。
  • 那么错误与Accumulators之间呢? --> 对于在actions中使用的Accumulators,Spark仅仅对每个Accumulator执行一次每个task的更新。因此,如果我们想要一个绝对可靠的value counter,而不用考虑failures或者多次赋值,那么我们必须将操作放到类似foreach()这样的操作中。
  • 对于在transformation中使用的Accumulators,这种保证是不存在的。在transformation中对Accumulators的更新可能执行多次。所以对transformation中的Accumulators最好只在debug时用。[for version1.2.0]

Custom Accumulators

  • Spark支持Double、Long、Float、Int等类型的Accumulators。同时还提供了一个自定义Accumulators type的API,来实现自定义Accumulators types以及支持自定义聚集操作(比如找最大值而不是add)。
  • 可支持的操作必须是commutative and associative的。[感觉就像是顺序对操作不重要就可以了]
    • An operation op is commutative if a op b = b op a for all values a, b.
    • An operation op is associative if (a op b) op c = a op (b op c) for all values a, b, and c.

Broadcast Variables

  • Spark支持的另一种共享变量的方式:broadcast variables。允许程序高效地发送大的,只读的value给所有worker nodes。
  • 你可能会想到说Spark会自动地发送闭包中所引用的变量到worker node。这是方便的,但同时也是很不高效的。因为:
    1. 缺省的task launching 机制是对small task sizes优化的;
    2. 你可能在多个并行操作中使用相同的变量,但是Spark会分别为每个Operation发送一次。考虑下面的操作:
# Look up the locations of the call signs on the # RDD contactCounts. We load a list of call sign # prefixes to country code to support this lookup. signPrefixes = loadCallSignTable()

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts
                       .map(processSignCount)
                       .reduceByKey((lambda x, y: x+ y)))

    上面的代码中,如果signPrefixes是一个很大的table,那么将该表从master传到每个slave将是很昂贵的。而且如果后期还要用到signPrefixes,它将会被再次发送到每个节点。通过将signPrefixes变成broadcast变量可以解决这个问题。如下:

    

// Look up the countries for each call sign for the
// contactCounts RDD. We load an array of call sign
// prefixes to country code to support this lookup.
val signPrefixes = sc.broadcast(loadCallSignTable())

val countryContactCounts = contactCounts.map{case (sign, count) =>
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
}.reduceByKey((x, y) => x + y) 

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
  • 总的来说,broadcast变量的使用分为以下几步:

    1. 创建一个Broadcast[T]的SparkContext.broadcast对象of type T。T可以是任何类型,只要它是Serializable的。
    2. 同value property访问该值
    3. 该变量只会被传送给每个node一次,而且被当做read-only来使用,也就是说更新不会propagated到其他nodes。(最简单的满足read-only方式的方法是broadcast a primitive value or a reference to an immutable object)。
  • broadcast variable就是一个spark.broadcast.Broadcast[T] 类型的对象,它wraps一个类型T的值。我们可以在tasks中访问该值。该值只发送到每个节点一次,通过使用高效的BitTorrent-like communication mechanism。

Optimizing Broadcasts

  • 当你broadcast a large values, it‘s important to choose a data serialization format that is both fast and compact。Scala缺省使用的JAVA Serialization库是很低效的(JAVA Serialization只对原生类型的数组高效)。因此你可以通过选择一个不同的序列化库(通过使用spark.serializer property)来优化。

Working on a Per-Partition Basis

时间: 2024-10-15 21:14:12

<Spark><Advanced Programming>的相关文章

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index

IOS测试框架之:athrun的InstrumentDriver源码阅读笔记

athrun的InstrumentDriver源码阅读笔记 作者:唯一 athrun是淘宝的开源测试项目,InstrumentDriver是ios端的实现,之前在公司项目中用过这个框架,没有深入了解,现在回来记录下. 官方介绍:http://code.taobao.org/p/athrun/wiki/instrumentDriver/ 优点:这个框架是对UIAutomation的java实现,在代码提示.用例维护方面比UIAutomation强多了,借junit4的光,我们可以通过junit4的

Yii源码阅读笔记 - 日志组件

?使用 Yii框架为开发者提供两个静态方法进行日志记录: Yii::log($message, $level, $category);Yii::trace($message, $category); 两者的区别在于后者依赖于应用开启调试模式,即定义常量YII_DEBUG: defined('YII_DEBUG') or define('YII_DEBUG', true); Yii::log方法的调用需要指定message的level和category.category是格式为“xxx.yyy.z

源码阅读笔记 - 1 MSVC2015中的std::sort

大约寒假开始的时候我就已经把std::sort的源码阅读完毕并理解其中的做法了,到了寒假结尾,姑且把它写出来 这是我的第一篇源码阅读笔记,以后会发更多的,包括算法和库实现,源码会按照我自己的代码风格格式化,去掉或者展开用于条件编译或者debug检查的宏,依重要程度重新排序函数,但是不会改变命名方式(虽然MSVC的STL命名实在是我不能接受的那种),对于代码块的解释会在代码块前(上面)用注释标明. template<class _RanIt, class _Diff, class _Pr> in

CI框架源码阅读笔记5 基准测试 BenchMark.php

上一篇博客(CI框架源码阅读笔记4 引导文件CodeIgniter.php)中,我们已经看到:CI中核心流程的核心功能都是由不同的组件来完成的.这些组件类似于一个一个单独的模块,不同的模块完成不同的功能,各模块之间可以相互调用,共同构成了CI的核心骨架. 从本篇开始,将进一步去分析各组件的实现细节,深入CI核心的黑盒内部(研究之后,其实就应该是白盒了,仅仅对于应用来说,它应该算是黑盒),从而更好的去认识.把握这个框架. 按照惯例,在开始之前,我们贴上CI中不完全的核心组件图: 由于BenchMa

CI框架源码阅读笔记2 一切的入口 index.php

上一节(CI框架源码阅读笔记1 - 环境准备.基本术语和框架流程)中,我们提到了CI框架的基本流程,这里这次贴出流程图,以备参考: 作为CI框架的入口文件,源码阅读,自然由此开始.在源码阅读的过程中,我们并不会逐行进行解释,而只解释核心的功能和实现. 1.       设置应用程序环境 define('ENVIRONMENT', 'development'); 这里的development可以是任何你喜欢的环境名称(比如dev,再如test),相对应的,你要在下面的switch case代码块中

Apache Storm源码阅读笔记

欢迎转载,转载请注明出处. 楔子 自从建了Spark交流的QQ群之后,热情加入的同学不少,大家不仅对Spark很热衷对于Storm也是充满好奇.大家都提到一个问题就是有关storm内部实现机理的资料比较少,理解起来非常费劲. 尽管自己也陆续对storm的源码走读发表了一些博文,当时写的时候比较匆忙,有时候衔接的不是太好,此番做了一些整理,主要是针对TridentTopology部分,修改过的内容采用pdf格式发布,方便打印. 文章中有些内容的理解得益于徐明明和fxjwind两位的指点,非常感谢.

CI框架源码阅读笔记4 引导文件CodeIgniter.php

到了这里,终于进入CI框架的核心了.既然是"引导"文件,那么就是对用户的请求.参数等做相应的导向,让用户请求和数据流按照正确的线路各就各位.例如,用户的请求url: http://you.host.com/usr/reg 经过引导文件,实际上会交给Application中的UsrController控制器的reg方法去处理. 这之中,CodeIgniter.php做了哪些工作?我们一步步来看. 1.    导入预定义常量.框架环境初始化 之前的一篇博客(CI框架源码阅读笔记2 一切的入

jdk源码阅读笔记之java集合框架(二)(ArrayList)

关于ArrayList的分析,会从且仅从其添加(add)与删除(remove)方法入手. ArrayList类定义: p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 18.0px Monaco } span.s1 { color: #931a68 } public class ArrayList<E> extends AbstractList<E> implements List<E> ArrayList基本属性: /** *

dubbo源码阅读笔记--服务调用时序

上接dubbo源码阅读笔记--暴露服务时序,继续梳理服务调用时序,下图右面红线流程. 整理了调用时序图 分为3步,connect,decode,invoke. 连接 AllChannelHandler.connected(Channel) line: 38 HeartbeatHandler.connected(Channel) line: 47 MultiMessageHandler(AbstractChannelHandlerDelegate).connected(Channel) line: