Spark笔记整理(五):Spark RDD持久化、广播变量和累加器

[TOC]


Spark RDD持久化

RDD持久化工作原理

Spark非常重要的一个功能特性就是可以将RDD持久化在内存中。当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition。这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD。

巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍。对于迭代式算法和快速交互式应用来说,RDD持久化,是非常重要的。

要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。而且Spark的持久化机制还是自动容错的,如果持久化的RDD的任何partition丢失了,那么Spark会自动通过其源RDD,使用transformation操作重新计算该partition。

cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用的persist()的无参版本,同时就是调用persist(MEMORY_ONLY),将数据持久化到内存中。如果需要从内存中去除缓存,那么可以使用unpersist()方法。

RDD持久化使用场景

1、第一次加载大量的数据到RDD中

2、频繁的动态更新RDD Cache数据,不适合使用Spark Cache、Spark lineage

RDD持久化策略

持久化策略的选择

? 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

? 如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

? 如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

? 通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

测试案例

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark RDD的持久化
  */
object _01SparkPersistOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
        val sc = new SparkContext(conf)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

        var start = System.currentTimeMillis()
        val linesRDD = sc.textFile("D:/data/spark/sequences.txt")
        // linesRDD.cache()
        // linesRDD.persist(StorageLevel.MEMORY_ONLY)

        // 执行第一次RDD的计算
        val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        // retRDD.cache()
        // retRDD.persist(StorageLevel.DISK_ONLY)
        retRDD.count()
        println("第一次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")

        // 执行第二次RDD的计算
        start = System.currentTimeMillis()
        // linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count()
        retRDD.count()
        println("第二次计算消耗的时间:" + (System.currentTimeMillis() - start) + "ms")

        // 持久化使用结束之后,要想卸载数据
        // linesRDD.unpersist()

        sc.stop()

    }
}

设置相关的持久化策略,再观察执行时间就可以有一个较为直观的理解。

共享变量

提供了两种有限类型的共享变量,广播变量和累加器。

介绍之前,先直接看下面一个例子:

package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 共享变量
  *     我们在dirver中声明的这些局部变量或者成员变量,可以直接在transformation中使用,
  *     但是经过transformation操作之后,是不会将最终的结果重新赋值给dirver中的对应的变量。
  *     因为通过action,触发了transformation的操作,transformation的操作,都是通过
  *     DAGScheduler将代码打包 序列化 交由TaskScheduler传送到各个Worker节点中的Executor去执行,
  *     在transformation中执行的这些变量,是自己节点上的变量,不是dirver上最初的变量,我们只不过是将
  *     driver上的对应的变量拷贝了一份而已。
  *
  *
  *     这个案例也反映出,我们需要有一些操作对应的变量,在driver和executor上面共享
  *
  *     spark给我们提供了两种解决方案——两种共享变量
  *         广播变量
  *         累加器
  */
object _02SparkShareVariableOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
        val sc = new SparkContext(conf)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

        val linesRDD = sc.textFile("D:/data/spark/hello.txt")
        val wordsRDD = linesRDD.flatMap(_.split(" "))
        var num = 0
        val parisRDD = wordsRDD.map(word => {
            num += 1
            println("map--->num = " + num)
            (word, 1)
        })
        val retRDD = parisRDD.reduceByKey(_ + _)

        println("num = " + num)
        retRDD.foreach(println)
        println("num = " + num)
        sc.stop()
    }
}

输出结果如下:

num = 0
map--->num = 1
map--->num = 1
map--->num = 2
map--->num = 2
map--->num = 3
map--->num = 4
(hello,3)
(you,1)
(me,1)
(he,1)
num = 0

广播变量

Spark的另一种共享变量是广播变量。通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。

这样理解, 一个worker中的executor,有5个task运行,假如5个task都需要这从份共享数据,就需要向5个task都传递这一份数据,那就十分浪费网络资源和内存资源了。使用了广播变量后,只需要向该worker传递一次就可以了。

创建并使用广播变量的过程如下:

在一个类型T的对象obj上使用SparkContext.brodcast(obj)方法,创建一个Broadcast[T]类型的广播变量,obj必须满足Serializable。 通过广播变量的.value()方法访问其值。 另外,广播过程可能由于变量的序列化时间过程或者序列化变量的传输过程过程而成为瓶颈,而Spark Scala中使用的默认的Java序列化方法通常是低效的,因此可以通过spark.serializer属性为不同的数据类型实现特定的序列化方法(如Kryo)来优化这一过程。

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

/**
  * 使用Spark广播变量
  *
  * 需求:
  *     用户表:
  *         id name age gender(0|1)
  *
  *     要求,输出用户信息,gender必须为男或者女,不能为0,1
  */
object _03SparkBroadcastOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
        val sc = new SparkContext(conf)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

        val userList = List(
            "001,刘向前,18,0",
            "002,冯  剑,28,1",
            "003,李志杰,38,0",
            "004,郭  鹏,48,2"
        )

        val genderMap = Map("0" -> "女", "1" -> "男")

        val genderMapBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap)

        val userRDD = sc.parallelize(userList)
        val retRDD = userRDD.map(info => {
            val prefix = info.substring(0, info.lastIndexOf(","))   // "001,刘向前,18"
            val gender = info.substring(info.lastIndexOf(",") + 1)
            val genderMapValue = genderMapBC.value
            val newGender = genderMapValue.getOrElse(gender, "男")
            prefix + "," + newGender
        })
        retRDD.foreach(println)
        sc.stop()
    }
}

输出结果如下:

001,刘向前,18,女
003,李志杰,38,女
002,冯  剑,28,男
004,郭  鹏,48,男

当然这个案例只是演示一下代码的使用,并不能看出其运行的机制。

累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

非常类似于在MR中的一个Counter计数器,主要用于统计各个程序片段被调用的次数,和整体进行比较,来对数据进行一个评估。

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.core.p3

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Spark共享变量之累加器Accumulator
  *
  * 需要注意的是,累加器的执行必须需要Action触发
  */
object _04SparkAccumulatorOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
        val sc = new SparkContext(conf)
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
        Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

        // 要对这些变量都*7,同时统计能够被3整除的数字的个数
        val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)

        val listRDD:RDD[Int] = sc.parallelize(list)
        var counter = 0
        val counterAcc = sc.accumulator[Int](0)
        val mapRDD = listRDD.map(num =>  {
            counter += 1
            if(num % 3 == 0) {
                counterAcc.add(1)
            }
            num * 7
        })
        // 下面这种操作又执行了一次RDD计算,所以可以考虑上面的方案,减少一次RDD的计算
        // val ret = mapRDD.filter(num => num % 3 == 0).count()
        mapRDD.foreach(println)
        println("counter===" + counter)
        println("counterAcc===" + counterAcc.value)
        sc.stop()
    }
}

输出结果如下:

49
56
7
63
14
70
21
77
28
84
35
91
42
counter===0
counterAcc===4

原文地址:http://blog.51cto.com/xpleaf/2108614

时间: 2024-08-09 02:13:49

Spark笔记整理(五):Spark RDD持久化、广播变量和累加器的相关文章

【Spark篇】---Spark中广播变量和累加器

一.前述 Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量. 累机器相当于统筹大变量,常用于计数,统计. 二.具体原理 1.广播变量 广播变量理解图 注意事项 1.能不能将一个RDD使用广播变量广播出去? 不能,因为RDD是不存储数据的.可以将RDD的结果广播出去. 2. 广播变量只能在Driver端定义,不能在Executor端定义. 3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量

Spark学习之路 (四)Spark的广播变量和累加器

一.概述 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序.通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变(broadcast variable)和累加器(accumulator) 二.广播变量broadcast variable 2.1 为什么

Spark(四)Spark的广播变量和累加器

一.概述 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序.通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator) 二.广播变量broadcast variable 2.1 为什

Spark学习之路 (四)Spark的广播变量和累加器[转]

概述 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本.这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序.通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator) 广播变量broadcast variable 为什么要将变量定义成

广播变量、累加器、collect

广播变量.累加器.collect spark集群由两类集群构成:一个驱动程序,多个执行程序. 1.广播变量 broadcast 广播变量为只读变量,它由运行sparkContext的驱动程序创建后发送给会参与计算     的节点.也可被非驱动程序所在节点(即工作节点)访问,访问是调用该变量的value方法. 广播变量是存储在内存中. sc.parallelize(List("1","2","3")).map(x => broadcastAL

广播变量与累加器

1.广播变量机制 2.累加器介绍

五、RDD持久化

Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中.当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用.这个能力使后续的动作速度更快(通常快10倍以上).对应迭代算法和快速的交互使用来说,缓存是一个关键的工具. 你能通过persist()或者cache()方法持久化一个rdd.首先,在action中计算得到rdd:然后,将其保存在

Python学习笔记整理(五)Python中的列表.

列表和字段,这两种类型几乎是Python所有脚本的主要工作组件.他们都可以在原处进行修改,可以按需求增加或缩短,而且包含任何种类的对象或者被嵌套. 一.列表 列表的主要属性: *任意对象的有序集合 从功能上看,列表就是收集其他对象的地方,可以把它看作组.列表所包含每一项都保持了从左到右的位置顺序(它们是序列) *通过偏移读取 和字符串一样,可以通过列表对象的偏移对其进行索引,从而读取对象的某一部分内容.可以自行分片和合并之类的任务. *可变长度,异构以及任意嵌套 列表可以实地增长或者缩短,并且可

老笔记整理五:C实现10阶内通过展开代数余子式求行列式的值

这个分为两部分,先是写出了C实现计算三阶行列式,然后过了一段时间突然有了思路才写下了10阶内这段代码.真怀念那段写代码的日子. 一:C实现计算三阶行列式 最近高数课在上线性代数,二阶的还能口算,三阶的有点麻烦,想陆陆续续地把公式都用C来实现.因为二阶的行列式如果用C来写就是一句话:val=det[0][0]*det[1][1]-det[0][1]*det[1][0];太简单了这里就不写了,主要写关于三阶的.只要把这个三阶行列式里每一个元素打进去就能算出值来了.过两天再写余子式的展开. 1 #in