Spark之RDD持久化、广播、累加器

RDD持久化、广播、累加器实质上分别涉及了RDD的数据如何保存,RDD在构建高效算法的时候涉及了persist或者checkpoint,以及广播和累加器,通过spark-shell可以试验一些小功能,spark-shell本身是spark的发行包推出的一个程序,通过这个程序可以直接写代码,spark-shell会把代码直接进行运行。

1.1.   RDD持久化实战

从2个层面考虑持久化:

1)操作RDD的时候怎么保存结果(属于Action的部分)

下面使用Spark-shell进行实战:

1.1.1.  Action级别的操作进行持久化——启动运行环境

我们使用基于Hadoop的HDFS的文件系统,所以只需启动Hadoop的HDFS即可:

查看启动是否成功:

启动Spark集群:

启动日志管理器:

启动spark-shell:

构建一个RDD:

1.1.1.1.           reduce

执行一个Action操作:

1.1.1.2.           map

1.1.1.3.           collect

把各个Executor上的结果进行收集后在集群终端显示。

我们可以看一下collect的源码:(RDD.scala 926行)

/** * Return an array that contains all of the elements in this RDD. */def collect(): Array[T] = withScope {  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)  Array.concat(results: _*)}

collect返回一个Araay,它的工作机制流程图如下图:

如果想在命令终端上看到结果必须使用collect。

凡是Action级别的操作都会触发sc.runJob:

1.1.1.4.           count

1.1.1.5.           take

1.1.1.6.           countByKey

/** * Count the number of elements for each key, collecting the results to a local Map. * * Note that this method should only be used if the resulting map is expected to be small, as * the whole thing is loaded into the driver‘s memory. * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */def countByKey(): Map[K, Long] = self.withScope {  self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap}

统计每个key出现的次数:

1.1.1.7.           saveAsTextFile

saveAsTextFile可以直接把数据写到HDFS上。

2)在实现算法的时候要进行cache、persist,另外还有一个是checkpoint

1.1.2.  通过persist进行持久化

Spark在默认情况下它的数据是放在内存中的,放在内存中适合高速的迭代。在一下情况下需要持久化:

1)在某步骤计算特别耗时

2)计算链条特别长的情况

3)checkpoint所在的RDD也一定要持久化(在checkpoint之前persist)

4)shuffle之后

5)shuffle之前(框架默认帮助我们把数据持久化到本地磁盘),如果shuffle出错的话,所有的父RDD都要重新计算,代价很大的。

如果发现内存经常不够用或出现OOM的话,一个非常好的方式就是MEMORY中的内容序列化,当然了,在使用数据的时候需要反序列化,反序列话是很消耗CPU的;

下面两个是优先放到内存,在内存放不下的情况下再放到磁盘:

val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)

上面这两个执行时间没什么区别,但是如果先cache再count

执行时间就会提高将近20倍。

由此也得出一个结论:cache后一定不能立即有其它算子!!!

cache不是一个Action,因为它并没有执行一个作业。persist是lazy级别的,unpersist是eager级别的,cache之后可以使用unpersist清除cache。cache只放在内存,而persist可以是内存也可以是磁盘。

1.2.   Spark广播实战

在构建发算法时至关重要,无论是在降低网络传输的数据量、提高内存的使用效率,还是加快程序的运行速度,广播对于我们而言都是非常重要的。

为什么需要广播?

广播是由Driver发给当前Application分配的所有Executor内存级别的全局制度变量,Executor中的线程池中的线程共享该全局变量,极大地减少了网络传输(否则的话每个Task都要纯属一次该变量)并极大地节省了内存,当然也隐形的提高了CPU的有效工作。

1.3.   Spark累加器实战

累加器在Spark集群中是一个全局的指针步减的变量,且在所有的Executor中只能修改累加器的内容,也就是说只能增加累加器的内容,在Executor中不可以读累加器的内容,在Driver中可以读累加器的内容。

备注:

资料来源于:DT_大数据梦工厂(IMF传奇行动绝密课程)-IMF

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

Life is short,you need to Spark.

时间: 2024-10-20 23:48:04

Spark之RDD持久化、广播、累加器的相关文章

Spark笔记整理(五):Spark RDD持久化、广播变量和累加器

[TOC] Spark RDD持久化 RDD持久化工作原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升1

Spark之RDD的定义及五大特性

RDD是分布式内存的一个抽象概念,是一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,能横跨集群所有节点并行计算,是一种基于工作集的应用抽象. RDD底层存储原理:其数据分布存储于多台机器上,事实上,每个RDD的数据都以Block的形式存储于多台机器上,每个Executor会启动一个BlockManagerSlave,并管理一部分Block:而Block的元数据由Driver节点上的BlockManagerMaster保存,BlockManagerSlave生成Block后向Block

Spark IMF传奇行动第18课:RDD持久化、广播、累加器总结

昨晚听了王家林老师的Spark IMF传奇行动第18课:RDD持久化.广播.累加器,作业是unpersist试验,阅读累加器源码看内部工作机制: scala> val rdd = sc.parallelize(1 to 1000) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 scala> rdd.persist res0: rdd.type

RDD持久化、广播、累加器(DT大数据梦工厂)

内容: 1.RDD持久化实战: 2.Spark广播实战: 3.Spark累加器实战: 持久化实战几个方面: 1.怎么保存结果: 2.实现算法的时候cache.persist: 3.checkpoint 广播: 构建算法至关重要,降低网络传输数据量.提高内存的使用效率.加快程序的运行速度 累加器: 全局的指针部件的变量,在executor中只能修改累加器的内容,不能读累加器的内容,在driver中才能读取 ========== Action============ collect.count.sa

Spark 调优之RDD持久化级别及kryo序列化性能测试

我们上篇文章中讲了,RDD的持久化是spark优化中必须掌握的,并且,在内存不足的情况下,我们可以将持久化类型选择为MEMORY_ONLY_SER,减少内存的占用,持久化更多的partition,并且不同的序列化方法也会影响序列化性能.下面,我们就来测试下,持久化级别和序列化方法的选择对RDD持久化大小的影响.我选择了一个170.9MB的日志文件,传到了百度网盘 提取码:ffae 测试环境是windows,IDEA参数配置 MEMORY_ONLY 代码为 case class CleanedLo

Spark核心—RDD初探

本文目的 ? 最近在使用Spark进行数据清理的相关工作,初次使用Spark时,遇到了一些挑(da)战(ken).感觉需要记录点什么,才对得起自己.下面的内容主要是关于Spark核心-RDD的相关的使用经验和原理介绍,作为个人备忘,也希望对读者有用. ? 为什么选择Spark ? 原因如下 代码复用:使用Scala高级语言操作Spark,灵活方便,面向对象,函数编程的语言特性可以全部拿来.Scala基本上可以无缝集成java及其相关库.最重要的是,可以封装组件,沉淀工作,提高工作效率.之前用hi

Spark发行笔记8:解读Spark Streaming RDD的全生命周期

本节主要内容: 一.DStream与RDD关系的彻底的研究 二.StreamingRDD的生成彻底研究 Spark Streaming RDD思考三个关键的问题: RDD本身是基本对象,根据一定时间定时产生RDD的对象,随着时间的积累,不对其管理的话会导致内存会溢出,所以在BatchDuration时间内执行完RDD操作后,需对RDD进行管理. 1.DStream生成RDD的过程,DStream到底是怎么生成RDD的? 2.DStream和RDD到底什么关系? 3.运行之后怎么对RDD处理? 所

07、RDD持久化

Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升10倍.对于迭代式算法和快速交互式应用来说,RDD持久化,是非

Spark和RDD模型研究

1背景介绍 现今分布式计算框架像MapReduce和Dryad都提供了高层次的原语,使用户不用操心任务分发和错误容忍,非常容易地编写出并行计算程序.然而这些框架都缺乏对分布式内存的抽象和支持,使其在某些应用场景下不够高效和强大.RDD(Resilient Distributed Datasets弹性分布式数据集)模型的产生动机主要来源于两种主流的应用场景: ?  迭代式算法:迭代式机器学习.图算法,包括PageRank.K-means聚类和逻辑回归(logistic regression) ?