大数据笔记(二十九)——RDD简介、特性及常用算子

1、什么是RDD? 最核心
(*)弹性分布式数据集,Resilent distributed DataSet
(*)Spark中数据的基本抽象
(*)结合源码,查看RDD的概念

RDD属性
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
一组分区,把数据分成了的不同的分区,每个分区可能运行在不同的worker

* - A function for computing each split
一个函数,用于计算每个分区中的数据
RDD的函数(算子)
(1)Transformation
(2)Action

* - A list of dependencies on other RDDs
RDD之间依赖关系:(1)窄依赖 (2)宽依赖
根据依赖的关系,来划分任务的Stage(阶段)

* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

如何创建一个RDD?有两种方式
(1)使用sc.parallelize方法

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)

(2)通过使用外部的数据源创建RDD:比如:HDFS

val rdd2 = sc.textFile("hdfs://bigdata11:9000/input/data.txt")
val rdd2 = sc.textFile("/root/temp/input/data.txt")

2、Transformation算子:不会触发计算、延时加载(lazy值)

RDD API网址:http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

常见的RDD算子:

map(func):该操作是对原来的RDD进行操作后,返回一个新的RDD
filter: 过滤操作、返回一个新的RDD
flatMap:类似map
mapPartitions:对每个分区进行操作
mapPartitionsWithIndex: 对每个分区进行操作,带分区的下标
union 并集
intersection 交集
distinct 去重
groupByKey: 都是按照Key进行分组
reduceByKey: 都是按照Key进行分组、会有一个本地操作(相当于:Combiner操作)

3、Action算子:会触发计算

collect: 触发计算、打印屏幕上。以数组形式返回
count: 求个数
first: 第一个元素(take(1))
take(n)
saveAsTextFile: 会转成String的形式,会调用toString()方法
foreach: 在RDD的每个元素上进行某个操作

4、RDD的缓存机制:默认在内存中
(*)提高效率
(*)默认:缓存在Memory中
(*)调用:方法:persist或者cache

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()

(*)缓存的位置:StorageLevel定义的

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

(*)示例:

测试数据:Oracle数据库的订单变 sales表(大概92万)
步骤
(1)从HDFS读入数据

val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")

(2)计算

rdd1.count ---> Action,这一次没有缓存
rdd1.cache ---> 缓存数据,但是不会触发计算,cache是一个Transformation
rdd1.count ----> 触发计算,将结果缓存
rdd1.count ----> ???会从哪里得到数据

通过UI进行监控:

IDEA功能键:ctrl + n 查找类
ctl+alt+shit+N 在类中找方法

5、RDD的容错机制:checkpoint检查点
(1)复习检查点:HDFS中,合并元信息
                              Oracle中,会以最高优先级唤醒数据库写进程(DBWn),来内存中的脏数据---> 数据文件
(2)RDD的检查点:容错机制,辅助Lineage(血统)---> 整个计算的过程
如果lineage越长,出错的概率就越大。出错之后,从最近一次的检查点开始运行。

两种类型

(1)本地目录 : 需要将spark-shell运行在本地模式上

(2)HDFS目录: 需要将spark-shell运行在集群模式上

scala> sc.setCheckpointDir("hdfs://bigdata11:9000/spark/checkpoint")

scala> val rdd1 = sc.textFile("hdfs://bigdata11:9000/input/sales")
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://bigdata11:9000/input/sales MapPartitionsRDD[41] at textFile at <console>:24

scala> rdd1.checkpoint

scala> rdd1.count

源码中对于检查点的说明:

/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/    

RDD的检查点多久发出一次,是手动发出的吗?
1、不是手动
2、每个RDD计算完成后

查看源码

/**
* Performs the checkpointing of this RDD by saving this. It is called after a job using this RDD
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint(): Unit = {

6、RDD的依赖关系、划分Spark任务的Stage(阶段)
(*)窄依赖(Narrow Dependencies):每一个父RDD的分区最多被子RDD的一个分区使用
       比方:独生子女

  举例:map,filter,union

(*)宽依赖(Wide Dependencies):多个子RDD的分区会依赖同一个父RDD的分区
         比方:超生

  举例:groupByKey

  比如分区1和分区2都有10号部门的员工,那么在统计10号部门(key)的员工时需要依赖分区1和分区2,它们属于不同的父RDD的分区。

根据宽依赖和窄依赖的标准,我们可以划分任务的Stage(阶段)

7、RDD算子的基础例子

1、创建一个RDD(数字)
    val rdd1 = sc.parallelize(List(5,6,1,2,10,4,12,20,100,30))

    每个元素*2,然后排序
    val rdd2 = rdd1.map(_*2).sortBy(x=>x,true)

    完整
    val rdd2 = rdd1.map((x:Int)=>x*2)

    过滤出大于10的元素
    val rdd3 = rdd2.filter(_>10)
    rdd3.collect

2、创建一个RDD(字符)
   val rdd1 = sc.parallelize(Array("a b c","d e f","h i j"))
   val rdd2 = rdd1.flatMap(_.split(‘ ‘))
   rdd2.collect

3、集合运算、去重
    val rdd1 = sc.parallelize(List(5,6,7,8,1,2))
    val rdd2 = sc.parallelize(List(1,2,3,4))

    val rdd3 = rdd1.union(rdd2)
    rdd3.distinct.collect
    val rdd4 = rdd1.intersection(rdd2)
4、分组
    val rdd1 = sc.parallelize(List(("Tom",1000),("Jerry",3000),("Mary",2000)))
    val rdd2 = sc.parallelize(List(("Jerry",500),("Tom",3000),("Mike",2000)))

    并集
    val rdd3 = rdd1 union rdd2
    scala> val rdd4 = rdd3.groupByKey
    rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[27] at groupByKey at <console>:30

    scala> rdd4.collect
    res8: Array[(String, Iterable[Int])] = Array((Tom,CompactBuffer(1000, 3000)),
                                                 (Jerry,CompactBuffer(3000, 500)),
                                                 (Mike,CompactBuffer(2000)),
                                                 (Mary,CompactBuffer(2000)))

六、Spark RDD的高级算子
1、mapPartitionsWithIndex: 对RDD中的每个分区进行操作,带有分区号
定义:def mapPartitionsWithIndex[U](f: (Int, Iterator[T])=>Iterator[U], preservesPartitioning: Boolean = false)
(implicit arg0: ClassTag[U]): RDD[U]
参数说明:
f: (Int, Iterator[T])=>Iterator[U]
(*)Int: 分区号
(*)Iterator[T]: 该分区中的每个元素
(*)返回值:Iterator[U]

Demo:
(1)创建一个RDD:

val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

(2)创建一个函数,作为f的值

def func1(index:Int,iter:Iterator[Int]):Iterator[String] ={
iter.toList.map(x=>"[PartID:" + index +",value="+x+"]").iterator
}

(3)调用

rdd1.mapPartitionsWithIndex(func1).collect

(4)结果:

Array([PartID:0,value=1], [PartID:0,value=2], [PartID:0,value=3], [PartID:0,value=4],
[PartID:1,value=5], [PartID:1,value=6], [PartID:1,value=7], [PartID:1,value=8], [PartID:1,value=9])    

2、aggregate:聚合操作
定义:def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
作用:先对局部进行操作,再对全局进行操作

举例:

val rdd1 = sc.parallelize(List(1,2,3,4,5),2)

(1)求每个分区最大值的和
先查看每个分区中的元素:

rdd1.mapPartitionsWithIndex(func1).collect

rdd1.aggregate(0)(math.max(_,_),_+_)

(2)改一下:

rdd1.aggregate(0)(_+_,_+_) ====> 15 两个分区求和并相加
rdd1.aggregate(10)(math.max(_,_),_+_) ===> 30 初始值是10,每个分区里有10,初始值10+分区一10+分区二10 = 30

(3)一个字符串的例子

3、aggregateByKey

(1)类似aggregate,也是先对局部,再对全局
(2)区别:aggregateByKey操作<key,value>
(3)测试数据:

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

每个分区中的元素(key,value)

def func3(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}    

[partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: (mouse,4)],[partID:1, val: (cat,12)], [partID:1, val: (dog,12)], [partID:1, val: (mouse,2)]

(4)把每个笼子中,每种动物最多的个数进行求和

pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect

4、coalesce和repartition

(*)都是将RDD中的分区进行重分区
(*)区别:coalesce 默认:不会进行shuffle(false)
        repartition 会进行shuffle

(*)举例:
    创建一个RDD
    val rdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)

    进行重分区
     val rdd5 = rdd4.repartition(3)
     val rdd6 = rdd4.coalesce(3,false)  ---> 分区的长度: 2
     val rdd6 = rdd4.coalesce(3,true)  ---> 分区的长度: 2

5、其他高级算子:参考文档
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

原文地址:https://www.cnblogs.com/lingluo2017/p/8684548.html

时间: 2024-08-07 04:13:54

大数据笔记(二十九)——RDD简介、特性及常用算子的相关文章

Android学习笔记二十九之SwipeRefreshLayout、RecyclerView和CardView

Android学习笔记二十九之SwipeRefreshLayout.RecyclerView和CardView 前面我们介绍了AlertDialog和几个常用的Dialog,ProgressDialog进度条提示框.DatePickerDialog日期选择对话框和TimePickerDialog时间选择对话框.这一节我们介绍几个新的API控件SwipeRefreshLayout.RecyclerView和CardView,这几个API控件都是google在Android5.0推出的.下面我们来学

【Unity 3D】学习笔记二十九:游戏实例——简单小地图制作

任何的学习,光看不练是学不好的.所以这次就总结回顾下怎么制作MMROPG类游戏中的小地图.在MMROPG类游戏里,主角在游戏世界里走动时,一般在屏幕右上角都会有一个区域来显示当前游戏场景的小地图.主角在游戏世界里走动,小地图里代表着主角的小标记也会随之移动.那怎么实现咧? 首先需要确定两个贴图,第一个是右上角的小地图背景贴图,应该是从Y轴俯视向下截取主角所在的位置大地图.第二个就是主角的位置大贴图.在本例中,因为没有学习unity地图制作,所以地图用一个面对象代替,主角用立方体代替,使用GUI来

[傅里叶变换及其应用学习笔记] 二十九. 高维Ш函数修改版

一维Ш函数复习 我们前面(十六课,十七课)已经学习过一维的Ш函数,标准的Ш函数表现为无数个脉冲函数分布在整数点上, 我们定义Ш为 $Ш(x) = \displaystyle{ \sum_{k=-\infty}^{\infty}\delta(x-k) }$ 而Ш函数最为深刻的一个性质就是:Ш的傅里叶变换是它自身 $\mathcal{F}Ш=Ш$ 进一步推广到脉冲间隔为$p$的函数$Ш_p$ $\displaystyle{ Ш_p(x)=\sum_{k=-\infty}^{\infty}\delta

iOS学习笔记(二十九)——图文解释XCode常用快捷键的使用

刚开始用Xcode是不是发现以前熟悉的开发环境的快捷键都不能用了?怎么快捷运行,停止,编辑等等.都不一样了.快速的掌握这些快捷键,能提供开发的效率. 其实快捷键在Xcode的工具栏里都标注有,只是有的符号和你的键盘上的符号对应不起来罢了.下面截图工具栏里的快捷键总结一下常用快捷键的用法. 一.关于运行调试 1.运行,停止,都在工具栏的Product里. Command + R  运行. Command + .  停止 2.F6单步调试.F7跳入,F8继续, 和Eclipse,VS类似 二.导航

大数据笔记(十四)——HBase的过滤器与Mapreduce

一. HBase过滤器 package demo; import javax.swing.RowFilter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; impo

大数据笔记(十八)——Pig的自定义函数

Pig的自定义函数有三种:1.自定义过滤函数:相当于where条件2.自定义运算函数:3.自定义加载函数:使用load语句加载数据,生成一个bag 默认:一行解析成一个Tuple 需要MR的jar包 一.自定义过滤函数 package demo.pig; import java.io.IOException; import org.apache.pig.FilterFunc; import org.apache.pig.data.Tuple; //实现自定义的过滤函数,实现:查询过滤薪水大于20

Linux学习笔记&lt;二十九&gt;——http服务

基础概念: HTTP:Hyper Text Transfer Protocol 超文本传输协议 versions: HTTP/0.9:只接收GET一种请求方法,只支持纯文本 HTTP/1.0:支持PUT.POST.DELETE和HEAD,支持MINE HTTP/1.1:在HTTP/1.0的基础上,增加了缓存功能,支持长连接,支持管道方式同时                  发送多个请求 HTTP请求方法:获取资源的方法 HTTP/0.9:GET HTTP/1.0:PUT(修改服务器上的内容),

Android笔记二十九. 一款简易画图板开发

一款简易画图板开发 转载请表明出处:http://blog.csdn.net/u012637501(嵌入式_小J的天空) 一.画图板原理 1.直线效果 画图板表面上看起来可以随用户在触摸屏上自由的绘制任意图形,但是实际上当用户在触摸屏上移动时,两次拖动事件发生点的距离很小,多条极短的直线连接起来我们肉眼看起来就是直接了.在触摸屏绘制图形时,每条直线都是从上一次拖动事件发生点画到本次拖动事件的发生点,可以借助于Android提供的Path类来实现.然后,如果程序每次都是从上次拖动事件的发生点绘一条

PHP学习笔记二十九【接口】

<?php //定义接口 //接口可以定义属性,但必须是常量而且是public //接口的所有方法必须是public interface Iusb{ public function start(); public function stop(); } //手机类实现接口关键字implements,必须实现这个所有方法 //类可以同时实现多个接口 //一个类可以实现多个接口 implements 接口1,接口2,接口 class Phone implements Iusb{ public func

29、蛤蟆的数据结构笔记之二十九数组之硬币抛掷模拟

29.蛤蟆的数据结构笔记之二十九数组之硬币抛掷模拟 本篇名言:"人生是各种不同的变故.循环不已的痛苦和欢乐组成的.那种永远不变的蓝天只存在于心灵中间,向现实的人生去要求未免是奢望.-- 巴尔扎克" 欢迎转载,转载请标明出处: 1.  硬币抛掷 如果抛掷硬币N次,看到头像的期望值是N/2次,但实际值也可能是0~N次,在程序中进行M次试验,M和N都在代码中定义.它使用一个数组f来跟踪出现"i次头像"的概率,其中0≤j≤N.然后打印试验结果的柱状图,每出现10次用1个星号