Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团

博主微博:http://weibo.com/234654758

Github:https://github.com/thinkgamer

============================================================

SparkRDD编程(一)

Spark 的键值对(pair RDD)操作,Scala实现


RDD的分区函数



目前Spark中实现的分区函数包括两种

  • HashPartitioner(哈希分区)

    原理图:

  • RangePartitioner(区域分区)

partitioner这个属性只存在于< K,V>类型的RDD中,对于非< K,V >类型的partitioner的值就是None,partitioner函数即决定了RDD本身的分区数量,也可作为RDD shuffle输出中每个区分进行数据切割的依据。

scala> val rdd = sc.makeRDD(1 to 10,2).map(x=>(x,x))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[1] at map at <console>:27

scala> rdd.partitioner
res0: Option[org.apache.spark.Partitioner] = None

scala> val group_rdd = rdd.groupByKey(new org.apache.spark.HashPartitioner(3))
group_rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupByKey at <console>:29

scala> group_rdd.partitioner
res1: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)

scala> group_rdd.collect()
res4: Array[(Int, Iterable[Int])] = Array((6,CompactBuffer(6)), (3,CompactBuffer(3)), (9,CompactBuffer(9)), (4,CompactBuffer(4)), (1,CompactBuffer(1)), (7,CompactBuffer(7)), (10,CompactBuffer(10)), (8,CompactBuffer(8)), (5,CompactBuffer(5)), (2,CompactBuffer(2)))

RDD的基本转换操作


1. repartition 和 coalesce

两者都是对rdd分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,这里我们讨论一下coalesce合并函数该如何设置shuffle参数,这里分三种情况(假设RDD有N个分区,需要重新划分为M个分区)

  • 如果N < M

    一般情况下N个分区有数据分布不均的情况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle的参数设置为true

  • 如果N > M(两者相差不大)

    两者相差不大的情况下,就可以将N中的若干个分区合并未一个分区,最终合并未M个分区,这时可以将shuffle参数设置为false(在shuffle为false的情况下,设置M>N,coalesce是不起作用的),不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。

  • 如果N>>M(N远大于M的情况)

    N,M相差悬殊的时候如果把shuffle参数设置为false,由于父子

    RDD是窄依赖,他们同处在一个Stage中,就有可能造成Spark程序运行的并行度不高,从而影响性能。比如在M为1时,由于只有一个分区,所以只会有一个任务在运行,为了使coalesce之前的操作有更好的并行度,可以将shuffle参数设置为true。

scala> val rdd = sc.makeRDD(1 to 10,100)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:21

scala> rdd.partitions.size
res14: Int = 100

scala> val repartitionRDD = rdd.repartition(4)
repartitionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at repartition at <console>:23

scala> repartitionRDD.partitions.size
res15: Int = 4
------------------------------------------------------------------------

scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[31] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res16: Int = 3

scala> val coalesceRDD = rdd.coalesce(1)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[34] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res17: Int = 1

scala> val coalesceRDD = rdd.coalesce(1,shuffle=true)  #增加并行度
coalesceRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[38] at coalesce at <console>:23
res18: Int = 1            

如果第二次分区的数目大于现有的分区数,不指定参数时,分区数不改变,也就是说在不进行洗牌的情况下,是无法将RDD的分区数目进行改变的

scala> val rdd = sc.makeRDD(1 to 1000,1000)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at makeRDD at <console>:21

scala> val coalesceRDD = rdd.coalesce(1)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[40] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res21: Int = 1

scala> val coalesceRDD = rdd.coalesce(100000)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[41] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res22: Int = 1000

scala> val coalesceRDD = rdd.coalesce(100000,shuffle=true)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[45] at coalesce at <console>:23

scala> coalesceRDD.partitions.size
res23: Int = 100000

2. randomSplit()和glom()

randomSplit是根绝weights权重将一个RDD切分成多个RDD,而glom函数是将RDD中每一个分区中类型为T的元素转换为数组[T],这样每一个分区就只有一个数组元素。

scala> val rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[50] at makeRDD at <console>:21

scala> rdd.collect()
res26: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val glomRDD = rdd.glom()
glomRDD: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[51] at glom at <console>:23

scala> glomRDD.collect()
res27: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

-------------------------------------------------------------------
scala> val rdd = sc.makeRDD(1 to 10,10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at makeRDD at <console>:21

scala> rdd.collect()
res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val splitRDD = rdd.randomSplit(Array(1.0,3.0,6.0))
splitRDD: Array[org.apache.spark.rdd.RDD[Int]] = Array(MapPartitionsRDD[57] at randomSplit at <console>:23, MapPartitionsRDD[58] at randomSplit at <console>:23, MapPartitionsRDD[59] at randomSplit at <console>:23)

scala> splitRDD(0).collect()
res33: Array[Int] = Array()

scala> splitRDD(1).collect()
res34: Array[Int] = Array(6)

scala> splitRDD(2).collect()
res36: Array[Int] = Array(1, 2, 3, 4, 5, 7, 8, 9, 10)

3. mapPartitions和mapPartitionsWithIndex

mapPartitions与map转换操作类似,只不过映射函数的输入参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,该操作有一个参数perservesPartitioning指明mapPartitions是否保留父RDD的partitions的分区信息。mapPartitionWithIndex和mapPartitions功能类似,只是输入参数时多了一个分区的ID

scala> val rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:27

scala> val mapRDD = rdd.map(x=>(x,x))
mapRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at map at <console>:29

scala> val groupRDD = mapRDD.groupByKey(3)
groupRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[6] at groupByKey at <console>:31

scala> val mapPartitionsRDD = groupRDD.mapPartitions(iter=>iter.filter(_._1>3))
mapPartitionsRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = MapPartitionsRDD[7] at mapPartitions at <console>:33

scala> mapPartitionsRDD.collect()
res3: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (5,CompactBuffer(5)))

scala> mapPartitionsRDD.partitioner
res4: Option[org.apache.spark.Partitioner] = None

scala> val mapPartitionsRDD = groupRDD.mapPartitions(iterator => iterator.filter(_._1>3),true)
mapPartitionsRDD: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = MapPartitionsRDD[8] at mapPartitions at <console>:33

scala> mapPartitionsRDD.partitioner
res5: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@3)

3. zip和zipPartitions

zip是将两个RDD组成key/value(键/值)形式的RDD,这里认为两个rdd的partitioner数量以及元素数量都相等。

zipPartitions是将多个RDD,按照partition组合成新的RDD,zipPartitions需要相互组合的RDD具有相同的分区数,但是对于每个分区中的元素数量是没有限制的

scala> val rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:27

scala> val mapRDD=rdd.map(x=>(x+1.0))
mapRDD: org.apache.spark.rdd.RDD[Double] = MapPartitionsRDD[1] at map at <console>:29

scala> val zipRDD = rdd.zip(mapRDD)
zipRDD: org.apache.spark.rdd.RDD[(Int, Double)] = ZippedPartitionsRDD2[2] at zip at <console>:31

scala> zipRDD.collect
res0: Array[(Int, Double)] = Array((1,2.0), (2,3.0), (3,4.0), (4,5.0), (5,6.0))
scala> val rdd1=sc.makeRDD(Array("1","2","3","4","5","6"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at makeRDD at <console>:21

scala> val zipPartitionsRDD = rdd.zipPartitions(rdd1)((i:Iterator[Int],s:Iterator[String])=>{Iterator(i.toArray.size,s.toArray.size)})
zipPartitionsRDD: org.apache.spark.rdd.RDD[Int] = ZippedPartitionsRDD2[7] at zipPartitions at <console>:25

scala> zipPartitionsRDD.collect()
res3: Array[Int] = Array(2, 3, 3, 3)

4. zipWithIndex和zinWithUniqueId

zipWithIndex是将RDD中的元素和这个元素的ID组合成键/值对,比如说第一个分区的第一个元素是0,第一个分区的第二个元素是1,依次类推

zipWithUniqueID是将RDD中的元素和一个唯一ID组合成键/值对,假设RDD共有N个分区,那么第一个分区的第一个元素唯一ID是1,第一个分区的第二个元素就是1+N,第一个分区的第三个元素就是1+2N,依次类推

scala> val rdd = sc.makeRDD(1 to 6,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:21

scala> val zipWithIndex = rdd.zipWithIndex()
zipWithIndex: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[1] at zipWithIndex at <console>:23

scala> zipWithIndex.collect()
res0: Array[(Int, Long)] = Array((1,0), (2,1), (3,2), (4,3), (5,4), (6,5))

scala> val zipWithUniqueID = rdd.zipWithUniqueId()
zipWithUniqueID: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[2] at zipWithUniqueId at <console>:23

scala> zipWithUniqueID.collect()
res1: Array[(Int, Long)] = Array((1,0), (2,2), (3,4), (4,1), (5,3), (6,5))

控制操作



在Spark中对RDD持久化操作时一项非常重要的功能,可以将RDD持久化在不同层次的存储介质中,以便后续的操作能够重复使用

  • checkpoint

    将RDD持久化在HDFS上,与persist的一个区别是会切断此RDD之前的依赖关系,而persist依然保留着RDD的依赖关系。

    checkpoint的主要作用

    1、如果一个spark程序会很长时间驻留运行(如spark streaming 一般会7*2小时运行),过长的依赖将会占用很多系统资源,那么定期的将RDD进行checkpoint操作,能够有效节省系统资源

    2、维护过长的依赖关系还会出现一些小问题,如果Spark在运行过程中出现节点失败的情况,那么RDD进行容错重算的成本会非常高


scala> val rdd = sc.makeRDD(1 to 4,1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[60] at makeRDD at <console>:21

scala> val flatMapRDD = rdd.flatMap(x=>Seq(x,x))
flatMapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[61] at flatMap at <console>:23

scala> sc.setCheckpointDir("temp")
16/09/14 10:56:08 WARN spark.SparkContext: Checkpoint directory must be non-local if Spark is running on a cluster: temp

scala> flatMapRDD.checkpoint()

scala> flatMapRDD.dependencies.head.rdd
res40: org.apache.spark.rdd.RDD[_] = ParallelCollectionRDD[60] at makeRDD at <console>:21

scala> flatMapRDD.collect()
res41: Array[Int] = Array(1, 1, 2, 2, 3, 3, 4, 4)

scala> flatMapRDD.dependencies.head.rdd
res42: org.apache.spark.rdd.RDD[_] = ReliableCheckpointRDD[62] at collect at <console>:26

在hdfs上查看具体信息
![checkpoint保存](http://img.blog.csdn.net/20160914110209481)
                            ![这里写图片描述](http://img.blog.csdn.net/20160914110324415)


行动操作



这里我们不具体列出使用案例,在Spark RDD编程(一)中已经详细说明,这里只做一个回顾与总结

  • 集合标量行动操作
函数名称 功能
first 返回rdd中的第一个元素
count 返回RDD中元素的个数
reduce 对rdd中的元素进行二元运算,返回计算结果
collect()/toArray() 以集合形式返回RDD的元素
take(num:Int) 将RDD作为集合,返回集合中[0,num-1]下标的元素
top(num:Int) 按照默认的或者是指定的排序规则,返回前num个元素
takeOrdered(num:Int) 以与top相反的排序规则,返回前num个元素
aggregate 比较麻烦参考Spark RDD编程(一)
fold 是aggregate的便利接口
lookup(Key:K):Seq[v] 针对(K,V)类型的RDD行动操作,对于给定的键值,返回与此键值相对应的所有值
  • 存储行动操作
函数名称 功能
saveAsTextFile() 保存到hdfs
saveAsObjectFile() 用于将RDD中的元素序列化成对象,存储到文件中。对于HDFS,默认采用SequenceFile保存。
saveAsHadoopFile() 保存为hadoop的一种格式,比如说TextFileOutputFormat,SequenceFileOutputFormat,OutputFormat…
saveAsHadoopDataset() 保存到数据库如hbase,mongodb,Cassandra

END!

时间: 2024-08-25 09:06:59

Spark RDD编程(二)的相关文章

Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流 对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下: 除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下: List<Integer> list = new ArrayList<Integ

Spark学习(二):RDD编程

介绍: RDD--Resilient Distributed Dataset Spark中RDD是一个不可变的分布式对象集合.每个RDD被分为多个分区,这些分区运行在集群的不同的节点上.RDD可以包含Python.Java.Scala中的任意类型的对象,以及自定义的对象. 创建RDD的两种方法: 1 读取一个数据集(SparkContext.textFile()) : lines = sc.textFile("README.md") 2 读取一个集合(SparkContext.para

【spark 深入学习 05】RDD编程之旅基础篇02-Spaek shell

--------------------- 本节内容: · Spark转换 RDD操作实例 · Spark行动 RDD操作实例 · 参考资料 --------------------- 关于学习编程方式的,每个人都有自己的方式.对我个人来说,最好的方法还是多动手写demo,要多写代码,才能理解的更加深刻,本节以例子的形式讲解各个Spark RDD的使用方法和注意事项,本文一共讲解了20个RDD的使用demo. 一.Spark转换 RDD操作实例 RDD转换操作返回的是RDD,而行动操作返回的是其

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

spark 中的RDD编程 -以下基于Java api

1.RDD介绍:     RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化. Spark中的RDD就是一个不可变的分布式对象集合.每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上.RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象. 用户可以使用两种方法创建RDD:读取一个

Spark RDD Transformation 简单用例(二)

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine

Spark RDD Action 简单用例(二)

foreach(f: T => Unit) 对RDD的所有元素应用f函数进行处理,f无返回值./** * Applies a function f to all elements of this RDD. */def foreach(f: T => Unit): Unit scala> val rdd = sc.parallelize(1 to 9, 2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at p

spark core之RDD编程

  spark提供了对数据的核心抽象--弹性分布式数据集(Resilient Distributed Dataset,简称RDD).RDD是一个分布式的数据集合,数据可以跨越集群中的多个机器节点,被分区并行执行.  在spark中,对数据的所有操作不外乎创建RDD.转化已有RDD及调用RDD操作进行求值.spark会自动地将RDD中的数据分发到集群中并行执行. 五大特性 a list of partitions  RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的lis

【spark 深入学习 03】Spark RDD的蛮荒世界

RDD真的是一个很晦涩的词汇,他就是伯克利大学的博士们在论文中提出的一个概念,很抽象,很难懂:但是这是spark的核心概念,因此有必要spark rdd的知识点,用最简单.浅显易懂的词汇描述.不想用学术话的语言来阐述RDD是什么,用简单.容易理解的方式来描述. 一.什么是RDD,RDD出现的背景 Mapreduce计算模型的出现解决了分布式计算的诸多难题,但是由于MR对数据共享的解决方案比较低效,导致MR编程模型效率不高,将数据写到一个稳定的外部存储系统,如HDFS,这个会引起数据复写.磁盘IO