RDD的操作
1.1 概述
RDD整体包含两大类操作
transformation 从现有中创建一个新的数据集
action 在对数据集做一定程度的计算后将结果返回
以MapReduce来说,Map就是一个transformation ,它是从每个文件块上执行一个方法来抽取转换,最终形成一个新的数据集.而Reduce就是一个action,它在对数据集执行一个函数进行计算后返回一个结果
对于所有的transformation,都是Lazy的,也就是说它不会立即执行,只是单纯的记住怎么样从原来的数据集进行转换的逻辑而已,它仅在某一个计算需要的情况下,才会被真正执行.
因为transformation 的Lazy性,RDD支持在每次计算时都进行重新计算,当然你可以将这个RDD保存下来 (persist or cache方法)避免每次重计算
这种保存既可以是硬盘,也可以是内存,甚至可以选择同步多个副本到多个节点中
1.2 集群环境下的操作
集群环境,所有操作最终会交给executors去执行.而变量,会以数据副本的形式交给executors.很多时候,这与我们非集群环境下的开发思维有非常大的不同.
1.2.1 集群下的闭包
RDD是支持闭包操作的.但务必注意的是Spark不保证对闭包之外的对象引用进行的变化.
原因是闭包的会被序列化发生给每一个executor,对于闭包的之外的对象引用会拷贝一个副本给executor.这时多个executor执行至少是跨JVM的
这时对这个副本对象的变更没有任何意义,因为每个JVM(executor)的副本都是独立的.
1.2.2 集群下的print
集群环境下,print不会在driver端有任何输出.
原因也是一样,print最终是在每个executor执行,其输出也是在每个executor的stdout上,在driver端,是不会有这些输出的.
如果想在driver输出,一个比较简单的办法是调用collect()将结果发送到driver端在进行print,但这样可能会造成driver内存爆掉(所有executor的数据涌入).比较推荐的做法是rdd.take(100).foreach(println)
1.2.3 共享变量
因为集群下,变量只会以副本方式交给executor,而不会将变量的值变化传回driver,所以一个可读写共享的变量是非常有用的.
Spark提供了两种共享变量 broadcast(广播变量) 和 accumulators(累加器)
1.2.3.1 广播变量(broadcast)
广播变量允许将一个只读变量的副本发送到每个机器上(executor机器),而不是对每一个任务发送一个副本.这样在同一机器上的多个任务,就可以反复使用这个变量了.
注意:
广播变量只会对每个节点分发一次,所以一般来说,广播变量不应该再被修改了.以保证每个广播变量的副本的值都是一致的
如果广播变量被修改,则需要将广播变量重新分发
另:
举个例子:Spark的action操作本身是通过一系列的stage来完成的,这些Stage是通过分布式的shuffle操作来进行切分的.而在每个Stage之间,Spark自动使用广播变量.
这里用法说明,只有数据本身会在多个Stage的多个任务中反复使用,或者说缓存这个数据是非常重要且非常必要的情况下,使用广播变量才有意义.
广播变量的使用如下:
// SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值 val broadcastVar = sc.broadcast(Array(1, 2, 3)) val v = broadcastVar.value
1.2.3.2 累加器(accumulators)
累加器变量仅支持累加操作,因为可以在并行计算执行一些特殊的计算(比计数或者求和).并且累加器的变化是可以在UI的Task界面上看见的(注,不支持Python)
累加器操作,依然遵循RDD的Lazy原则:
累加器更新操作是在Action中,并且在每个任务中只会执行一次(如果任务失败重启了,累加器更新不会执行)
而在transformation中,累加器依然不会立即执行更新,如果transformation被重新执行了,则累加器操作会重复执行
对于累加器变量,Spark原生支持数值类型.一个使用例子如下
val accum = sc.longAccumulator("My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) println(accum.value)
也可以创建继承AccumulatorV2的类型,来实现自定义类型的累加器,例子如下:
//两个泛型参数->累加的元素类型和结果类型可以不同的 class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { private val myVector: MyVector = MyVector.createZeroVector def reset(): Unit = { myVector.reset() } def add(v: MyVector): Unit = { myVector.add(v) } ... } // 创建一个自定义的累加器类型: val myVectorAcc = new VectorAccumulatorV2 //将这个触发器注册到SparkContext: sc.register(myVectorAcc, "MyVectorAcc1")
1.3 RDD的一些基本操作
1.3.1 Transformations 操作
map 将原来RDD中的每个项,用自定义的map函数进行映射转变为新的元素,并返回一个新的RDD
filter 对原来RDD进行过滤,将过滤的结果返回为一个新的RDD
flatMap 与map类似
1.3.2 Action 操作
1.4 Shuffle过程
Spark的某些操作,会引起一个Shuffle过程.Shuffle是指不同节点上的不同分区数据整合重新分区分组的机制.
所以Shuffle是一个代价很高的操作,因为它会导致executor和不同的机器节点之间进行数据复制.
1.4.1 Shuffle简述
以reduceByKey为例,将原始数据中key相同的记录聚合为一个组.这里挑战是原始数据很可能是存在不同分区不同机器的(参考MapReduce执行过程)
Spark-Shuffle与MapReduce-Shuffle的区别
MapReduce-Shuffle结果是分区有序,分区内再按Key排序
Spark-Shuffle结果是分区有序,但分区内Key无序.
要对Spark-Shuffle的分区内再排序,有以下方法:
mapPartitions 在已有的每个分区上再使用.sort排序
repartitionAndSortWithinPartitions 重建分区,并排序
sortBy提前对RDD本身做一个全范围排序
1.4.2 RDD中引起Shuffle的操作
repartition类
操作 例如:repartition
、coalesce
_ByKey
操作(除了counting相关操作)例如:groupByKey
、reduceByKey
join 例如:cogroup
、join
1.4.3 Shuffle的性能影响
Shuffle本身是同时高耗内存,高耗磁盘IO,高耗网络IO的昂贵操作.
Spark会启动一系列的MapReduce(Hadoop MapReduce),产生大量的数据缓冲区与归并排序,大量的pill文件与归并Merge等等
原文地址:https://www.cnblogs.com/NightPxy/p/9245707.html