9.spark core之共享变量

简介

??spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。

  • 集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
  • 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。

??spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。

  • 广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
  • 累加器用于在驱动器中对数据结果进行聚合。

广播变量

原理

  • 广播变量只能在Driver端定义,不能在Executor端定义。
  • 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
  • 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

用法

  • 通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
  • 通过value属性访问该对象的值
  • 变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)

    实例

    ??查询每个国家的呼号个数

    python

# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

scala

// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

val countryContactCounts = contactCounts.map{case (sign, count) => {
    val country = lookupInArray(sign, signPrefixes.value)
    (country, count)
    }}.reduceByKey((x, y) => x+y)

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

java

// 将呼号前缀(国家代码)作为广播变量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());

JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
    public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
        String sign = callSignCount._1();
        String country = lookupCountry(sign, signPrefixes.value());
        return new Tuple2(country, callSignCount._2());
    }
}).reduceByKey(new SumInts());

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");

累加器

原理

  • 累加器在Driver端定义赋初始值。
  • 累加器只能在Driver端读取最后的值,在Excutor端更新。

用法

  • 通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
  • Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
  • 驱动器程序可以调用累加器的value属性来访问累加器的值

实例

??累加空行

python

file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)

def extractCallSigns(line):
    global blankLines # 访问全局变量
    if (line == ""):
        blankLines += 1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value

scala

val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0

val callSigns = file.flatMap(line => {
    if (line == "") {
        blankLines += 1 //累加器加1
    }
    line.split(" ")
})

callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)

java

JavaRDD<String> rdd = sc.textFile(args[1]);

final Accumulator<Integer> blankLines = sc.accumulator(0);

JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {
    public Iterable<String> call(String line) {
        if ("".equals(line)) {
            blankLines.add(1);
        }
        return Arrays.asList(line.split(" "));
    }
});

callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());

原文地址:http://blog.51cto.com/12967015/2172863

时间: 2024-12-10 02:10:44

9.spark core之共享变量的相关文章

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

第0章 预备知识0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD.DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD.DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dst

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

【Spark Core】任务执行机制和Task源码浅析1

引言 上一小节<TaskScheduler源码与任务提交原理浅析2>介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息. 我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor执行的Task分为ShuffleMap

TypeError: Error #1034: 强制转换类型失败:无法将 mx.controls::[email&#160;protected] 转换为 spark.core.IViewport。

1.错误描述 TypeError: Error #1034: 强制转换类型失败:无法将 mx.controls::[email protected] 转换为 spark.core.IViewport. at mx.binding::Binding/defaultDestFunc()[E:\dev\4.0.0\frameworks\projects\framework\src\mx\binding\Binding.as:270] at Function/http://adobe.com/AS3/2

这些组件分别处理Spark Core提供内存计算框架

Spark不仅支持Scala编写应用程序,而且支持Java和Python等语言进行编写,特别是Scala是一种高效.可拓展的语言,能够用简洁的代码处理较为复杂的处理工作. l通用性强 Spark生态圈即BDAS(伯克利数据分析栈)包含了Spark Core.Spark SQL.Spark Streaming.MLLib和GraphX等组件,这些组件分别处理Spark Core提供内存计算框架.SparkStreaming的实时处理应用.Spark SQL的即席查询.MLlib或MLbase的机器

Spark Core源代码分析: Spark任务模型

概述 一个Spark的Job分为多个stage,最后一个stage会包含一个或多个ResultTask,前面的stages会包含一个或多个ShuffleMapTasks. ResultTask运行并将结果返回给driver application. ShuffleMapTask将task的output依据task的partition分离到多个buckets里.一个ShuffleMapTask相应一个ShuffleDependency的partition,而总partition数同并行度.redu

急中生智~利用Spark core完成&quot;ETL&quot;!

背景介绍:今天接到老板分配的一个小任务:开发一个程序,实现从数据库中抽取数据并生成报表的功能(这是我们数据库审计平台准备上线的一个功能).既然是要生成报表,那么首先得有数据,于是便想到从该业务系统的测试环境抽取业务表的数据,然后装载至自己云主机上的Mysql中.本来以为只要"select ...into outfile"和"load data infile..."两个命令就可以搞定的,可是还是出了意外.测试环境导出的txt文件在云主机load时,报了"Ro