spark中的Broadcast variables和Accumulator

举个例子:

val factor = 3

rdd.map( num => num*factor)

以上两行代码显示了rdd的一个map操作,其中factor是一个外部变量。默认情况下,算子的函数内,如果使用到了外部变量,那么会将这个变量拷贝到执行这个函数的每一个task中。如果该变量非常大的话,那么网络传输耗费的资源会特别大,而且在每个节点上占用的内存空间也特别大。

Spark提供的Broadcast Variable,是只读的。并且在每个节点上只会有一份副本,而不会为每个task都拷贝一份副本。因此其最大作用,就是减少变量到各个节点的网络传输消耗,以及在各个节点上的内存消耗。此外,spark自己内部也使用了高效的广播算法来减少网络消耗。

可以通过调用SparkContext的broadcast()方法,来针对某个变量创建广播变量。然后在算子的函数内,使用到广播变量时,每个节点只会拷贝一份副本了。每个节点可以使用广播变量的value()方法获取值。广播变量是只读的。

例子:

package cn.spark.study.core

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object BroadcastVariable {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName(“broadcastVariable”)

.setMaster(“local”);

val sc = new SparkContext(conf)

val list = Array(1,2,3,4,5)

val factor = 3
val broadcastfactor = sc.broadcast(factor)

val numbersRDD = sc.parallelize(list, 1)
val multipleNumber = numbersRDD.map { num => num*broadcastfactor.value }
multipleNumber.foreach { num => println(num) }

}

}

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

例子:

package cn.spark.study.core

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

object AccumulatorTest {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName(“AccumulatorTest”)

.setMaster(“local”);

val sc = new SparkContext(conf)

val sum = sc.accumulator(0)

val list = Array(1,2,3,4,5)

val numbersRDD = sc.parallelize(list, 1)
numbersRDD.foreach { num => sum.add(num) }

println(sum)

}

}

时间: 2024-10-12 12:47:32

spark中的Broadcast variables和Accumulator的相关文章

spark中的广播变量broadcast

Spark中的Broadcast处理 首先先来看一看broadcast的使用代码: val values = List[Int](1,2,3) val broadcastValues = sparkContext.broadcast(values) rdd.mapPartitions(iter => { broadcastValues.getValue.foreach(println) }) 在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进

08、共享变量(Broadcast Variable和Accumulator)

共享变量工作原理 Spark一个非常重要的特性就是共享变量. 默认情况下,如果在一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中.此时每个task只能操作自己的那份变量副本.如果多个task想要共享某个变量,那么这种方式是做不到的. Spark为此提供了两种共享变量,一种是Broadcast Variable(广播变量),另一种是Accumulator(累加变量).Broadcast Variable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,

【Spark篇】---Spark中广播变量和累加器

一.前述 Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量. 累机器相当于统筹大变量,常用于计数,统计. 二.具体原理 1.广播变量 广播变量理解图 注意事项 1.能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的.可以将RDD的结果广播出去. 2. 广播变量只能在Driver端定义,不能在Executor端定义. 3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量

Spark中的矩阵乘法分析

前言: 矩阵乘法在数据挖掘/机器学习中是常用的计算步骤,并且在大数据计算中,shuffle过程是不可避免的,矩阵乘法的不同计算方式shuffle的数据量都不相同.通过对矩阵乘法不同计算方式的深入学习,希望能够对大数据算法实现的shuffle过程优化有所启发.网上有很多分布式矩阵乘法相关的文章和论文,但是鲜有对Spark中分布式矩阵乘法的分析.本文针对Spark中分布式矩阵乘法的实现进行必要的说明讨论. 分布式矩阵乘法原理: 矩阵乘法计算可以分为内积法和外积法.根据实现颗粒度的不同,也可以分为普通

解决spark中遇到的数据倾斜问题

一. 数据倾斜的现象 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败. 二. 数据倾斜的原因 常见于各种shuffle操作,例如reduceByKey,groupByKey,join等操作. 数据问题 key本身分布不均匀(包括大量的key为空) key的设置不合理 spark使用问题 shuffle时的并发度不够 计算方式有误 三. 数据倾斜的后果 spark中一个stage的执行时间受限于最后那个执行完的task,因此运行缓慢的任务会拖累整个

初解,Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Scala语言的Actor而产生的消息驱动框架Akka的使用,

Scala深入浅出实战中级--进阶经典(第66讲:Scala并发编程实战初体验及其在Spark源码中应用解析)内容介绍和视频链接 2015-07-24 DT大数据梦工厂 从明天起,做一个勤奋的人 看视频.下视频,分享视频 DT大数据梦工厂-Scala深入浅出实战中级--进阶经典:第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析 本期视频通过代码实战详解了Java语言基于加锁的并发编程模型的弊端以及Scala语言中基于Actor的并发编程的机制,并展示了在Spark中基于Sc

spark中的RDD以及DAG

今天,我们就先聊一下spark中的DAG以及RDD的相关的内容 1.DAG:有向无环图:有方向,无闭环,代表着数据的流向,这个DAG的边界则是Action方法的执行 2.如何将DAG切分stage,stage切分的依据:有宽依赖的时候要进行切分(shuffle的时候, 也就是数据有网络的传递的时候),则一个wordCount有两个stage, 一个是reduceByKey之前的,一个事reduceByKey之后的(图1), 则我们可以这样的理解,当我们要进行提交上游的数据的时候, 此时我们可以认

Spark中GraphX图运算pregel详解

由于本人文字表达能力不足,还是多多以代码形式表述,首先展示测试代码,然后解释: package com.txq.spark.test import org.apache.spark.graphx.util.GraphGeneratorsimport org.apache.spark.graphx._import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext, SparkException, gra

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR