08、共享变量(Broadcast Variable和Accumulator)

共享变量工作原理

Spark一个非常重要的特性就是共享变量。

默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中。此时每个task只能操作自己的那份变量副本。如果多个task想要共享某个变量,那么这种方式是做不到的。

Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量)。Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。

 Broadcast Variable

Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。

可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。记住,广播变量,是只读的。

val factor = 3

val factorBroadcast = sc.broadcast(factor)

val arr = Array(1, 2, 3, 4, 5)

val rdd = sc.parallelize(arr)

val multipleRdd = rdd.map(num => num * factorBroadcast.value())

multipleRdd.foreach(num => println(num))

Accumulator

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

val sumAccumulator = sc.accumulator(0)

val arr = Array(1, 2, 3, 4, 5)

val rdd = sc.parallelize(arr)

rdd.foreach(num => sumAccumulator += num)

println(sumAccumulator.value)

package sparkcore.java;

import java.util.Arrays;

import java.util.List;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.VoidFunction;

import org.apache.spark.broadcast.Broadcast;

/**

* 广播变量

*/

public class BroadcastVariable {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("BroadcastVariable")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 在java中,创建共享变量,就是调用SparkContext的broadcast()方法

// 获取的返回结果是Broadcast<T>类型

final int factor = 3;

final Broadcast<Integer> factorBroadcast = sc.broadcast(factor);

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

// 让集合中的每个数字,都乘以外部定义的那个factor

JavaRDD<Integer> multipleNumbers = numbers.map(new Function<Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1) throws Exception {

// 使用共享变量时,调用其value()方法,即可获取其内部封装的值

int factor = factorBroadcast.value();

return v1 * factor;

}

});

multipleNumbers.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

@Override

public void call(Integer t) throws Exception {

System.out.println(t);

}

});

sc.close();

}

}

package sparkcore.scala

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object BroadcastVariable {

def main(args: Array[String]) {

val conf = new SparkConf()

.setAppName("BroadcastVariable")

.setMaster("local")

val sc = new SparkContext(conf)

val factor = 3;

val factorBroadcast = sc.broadcast(factor)

val numberArray = Array(1, 2, 3, 4, 5)

val numbers = sc.parallelize(numberArray, 1)

val multipleNumbers = numbers.map { num => num * factorBroadcast.value }

multipleNumbers.foreach { num => println(num) }

}

}

package sparkcore.java;

import java.util.Arrays;

import java.util.List;

import org.apache.spark.Accumulator;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.VoidFunction;

/**

* 累加变量

*/

public class AccumulatorVariable {

public static void main(String[] args) {

SparkConf conf = new SparkConf()

.setAppName("Accumulator")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

// 创建Accumulator变量

// 需要调用SparkContext的accumulator()方法

final Accumulator<Integer> sum = sc.accumulator(0);

List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);

JavaRDD<Integer> numbers = sc.parallelize(numberList);

numbers.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

@Override

public void call(Integer t) throws Exception {

// 然后在函数内部,就可以对Accumulator变量,调用add()方法,累加值

sum.add(t);

}

});

// 在driver程序中,可以调用Accumulator的value()方法,获取其值

System.out.println(sum.value());

sc.close();

}

}

package sparkcore.scala

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object AccumulatorVariable {

def main(args: Array[String]) {

val conf = new SparkConf()

.setAppName("AccumulatorVariable")

.setMaster("local")

val sc = new SparkContext(conf)

val sum = sc.accumulator(0)

val numberArray = Array(1, 2, 3, 4, 5)

val numbers = sc.parallelize(numberArray, 1)

numbers.foreach { num => sum += num }

println(sum)

}

}

时间: 2025-01-08 13:10:44

08、共享变量(Broadcast Variable和Accumulator)的相关文章

spark中的Broadcast variables和Accumulator

举个例子: val factor = 3 rdd.map( num => num*factor) 以上两行代码显示了rdd的一个map操作,其中factor是一个外部变量.默认情况下,算子的函数内,如果使用到了外部变量,那么会将这个变量拷贝到执行这个函数的每一个task中.如果该变量非常大的话,那么网络传输耗费的资源会特别大,而且在每个节点上占用的内存空间也特别大. Spark提供的Broadcast Variable,是只读的.并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

Spark的介绍:前世今生

spark的 前世今生 标签(空格分隔): Spark的部分 一:大数据的spark概述 二:大数据的spark学习 一: 大数据的概述 1.1 Spark是什么? Spark,是一种通用的大数据计算框架,正如传统大数据技术Hadoop的MapReduce.Hive引擎,以及Storm流式实时计算引擎等. Spark包含了大数据领域常见的各种计算框架:比如Spark Core用于离线计算,Spark SQL用于交互式查询,Spark Streaming用于实时流式计算,Spark MLlib用于

spark 学习(二) RDD及共享变量

声明:本文基于spark的programming guide,并融合自己的相关理解整理而成 Spark应用程序总是包含着一个driver program(驱动程序),它执行着用户的main方法,并且执行大量的并行操作(parallel operations)在集群上. 概述 Spark最主要的抽象就是RDD(resilient distributed dataset) 弹性分布式数据集,RDD  就是分割元素的集合,他被分发在集群的各个节点上,并且能够进行并行操作. RDD的创建有三种方式: H

9.spark core之共享变量

简介 ??spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想. 集群中运行的每个任务都会连接驱动器获取变量.如果获取的变量比较大,执行效率会非常低下. 每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量.如果驱动器需要获取变量的结果值,这种方式是不可行的. ??spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator). 广播变量用于高

Spark源码阅读笔记之Broadcast(一)

Spark源码阅读笔记之Broadcast(一) Spark会序列化在各个任务上使用到的变量,然后传递到Executor中,由于Executor中得到的只是变量的拷贝,因此对变量的改变只在该Executor有效.序列化后的任务的大小是有限制的(由spark.akka.frameSize决定,值为其减去200K,默认为10M-200K),spark会进行检查,超出该限制的任务会被抛弃.因此,对于需要共享比较大的数据时,需要使用Broadcast. Spark实现了两种传输Broadcast的机制:

spark中的广播变量broadcast

Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPartitions(iter => { broadcastValues.getValue.foreach(println) }) 在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进

Spark大师之路:广播变量(Broadcast)源码分析

概述 最近工作上忙死了--广播变量这一块其实早就看过了,一直没有贴出来. 本文基于Spark 1.0源码分析,主要探讨广播变量的初始化.创建.读取以及清除. 类关系 BroadcastManager类中包含一个BroadcastFactory对象的引用.大部分操作通过调用BroadcastFactory中的方法来实现. BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory.HttpBroadcastFactory.这两个子类实现了对Htt

【核心API开发】Spark入门教程[3]

本教程源于2016年3月出版书籍<Spark原理.机制及应用> ,在此以知识共享为初衷公开部分内容,如有兴趣,请支持正版书籍. Spark综合了前人分布式数据处理架构和语言的优缺点,使用简洁.一致的函数式语言Scala作为主要开发语言,同时为了方便更多语言背景的人使用,还支持Java.Python和R语言.Spark因为其弹性分布式数据集(RDD)的抽象数据结构设计,通过实现抽象类RDD可以产生面对不同应用场景的子类.本章将先介绍Spark编程模型.RDD的相关概念.常用API源码及应用案例,