RDD弹性分布式数据集的基本操作

RDD的中文解释是弹性分布式数据集。构造的数据集的时候用的是List(链表)或者Array数组类型/* 使用makeRDD创建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println(r01.collect().mkString(","))
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println(r02.collect().mkString(","))

val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println(r03.collect().mkString(","))
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println(r04.collect().mkString(","))

也可以直接用文件系统来构造

1 val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
2 val r:RDD[String] = rdd.flatMap { x => x.split(",") }
3 println(r.collect().mkString(","))

RDD的操作分为转化操作(transformation)和行为操作(action),

转化操作和行为操作的本质区别

转化操作使一个RDD转化为另一个RDD而行动操作就是进行实际的计算

 1 val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
 2 val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
 3 val rddFile:RDD[String] = sc.textFile(path, 1)
 4
 5 val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
 6 val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
 7
 8 /* map操作 */参数是函数,函数应用于RDD每一个元素,返回值是新的RDD
 9 println("======map操作======")
10 println(rddInt.map(x => x + 1).collect().mkString(","))
11 println("======map操作======")
12 /* filter操作 */参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD
13 println("======filter操作======")
14 println(rddInt.filter(x => x > 4).collect().mkString(","))
15 println("======filter操作======")
16 /* flatMap操作 */参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD
17 println("======flatMap操作======")
18 println(rddFile.flatMap { x => x.split(",") }.first())
19 println("======flatMap操作======")
20 /* distinct去重操作 */没有参数,将RDD里的元素进行去重操作方法转换操作生成一个只包含不同元素的一个新的RDD。开销很大。 
21 println("======distinct去重======")
22 println(rddInt.distinct().collect().mkString(","))
23 println(rddStr.distinct().collect().mkString(","))
24 println("======distinct去重======")
25 /* union操作 */会返回一个包含两个RDD中所有元素的RDD,包含重复数据。
26 println("======union操作======")
27 println(rdd01.union(rdd02).collect().mkString(","))
28 println("======union操作======")
29 /* intersection操作 */只返回两个RDD中都有的元素。可能会去掉所有的重复元素。通过网络混洗来发现共有元素
30 println("======intersection操作======")
31 println(rdd01.intersection(rdd02).collect().mkString(","))
32 println("======intersection操作======")
33 /* subtract操作 */返回只存在第一个RDD中而不存在第二个RDD中的所有的元素组成的RDD。也需要网络混洗
34 println("======subtract操作======")
35 println(rdd01.subtract(rdd02).collect().mkString(","))
36 println("======subtract操作======")
37 /* cartesian操作 */计算两个RDD的笛卡尔积,转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则来自于另一个RDD。 
38 println("======cartesian操作======")
39 println(rdd01.cartesian(rdd02).collect().mkString(","))
40 println("======cartesian操作======")

以下是行动操作代码

 1 val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
 2 val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
 3
 4 /* count操作 */返回RDD所有元素的个数
 5 println("======count操作======")
 6 println(rddInt.count())
 7 println("======count操作======")
 8 /* countByValue操作 */各元素在RDD中出现次数
 9 println("======countByValue操作======")
10 println(rddInt.countByValue())
11 println("======countByValue操作======")
12 /* reduce操作 */并行整合所有RDD数据,例如求和操作
13 println("======reduce操作======")
14 println(rddInt.reduce((x ,y) => x + y))
15 println("======reduce操作======")
16 /* fold操作 */和reduce功能一样,不过fold带有初始值
17 println("======fold操作======")
18 println(rddInt.fold(0)((x ,y) => x + y))
19 println("======fold操作======")
20 /* aggregate操作 */和reduce功能一样,不过fold带有初始值
21 println("======aggregate操作======")
22 val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
23 println(res._1 + "," + res._2)
24 println("======aggregate操作======")
25 /* foeach操作 */对RDD每个元素都是使用特定函数就是遍历
26 println("======foeach操作======")
27 println(rddStr.foreach { x => println(x) })
28 println("======foeach操作======")
.mapValues(x=>(x,1)).//mapValues是对值的操作,不操作key使数据变成(Tom,(26,1))

map()指的是对key进行操作

mapValues()指的是对Values进行操作

first()返回的是dataset中的第一个元素

take(n)返回前n个elements,这个是driverprogram返回的

takeSample(withReplacementnum,seed)抽样返回一个dataset中的num个元素,随机种子seed

saveAsTextFile(path)把dataset写到一个textfile中,或者HDFS支持的文件系统中,spark把每条记录都转换为一行记录,然后写到file中

saveAsTextFile(path)只能用在key-value对上,然后生成SequenceFile写到本地或者hadoop文件系统

saveAsObjectFile(path)把dataset写到一个java序列化的文件中,用sparkContext,objectFile()装载

countByKey()返回的是key对应的个数的一个map.,作用与一个RDD

参考https://www.cnblogs.com/sharpxiajun/p/5506822.html加上自己的理解

transformation和action的主要区别



接口定义方式不同

1.Transformation:RDD[X]->RDD[Y]

2.Action:RDD[X]->Z(Z不是一个RDD,可能是基本类型,数组等)

执行方式也不同

Transformation只会记录RDD转化关系,并不会产生计算(惰性执行,LazyExecution)

Action是触发程序执行(分布式)的算子

原文地址:https://www.cnblogs.com/zpsblog/p/10534024.html

时间: 2024-10-09 19:30:35

RDD弹性分布式数据集的基本操作的相关文章

Spark弹性分布式数据集RDD

RDD(Resilient Distributed Dataset)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作.这对于迭代运算比

[Berkeley]弹性分布式数据集RDD的介绍(RDD: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 论文翻译)

摘要: 本文提出了分布式内存抽象的概念--弹性分布式数据集(RDD,Resilient Distributed Datasets).它同意开发者在大型集群上运行基于内存的计算.RDD适用于两种应用,而现有的数据流系统对这两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域非经常见.二是交互式数据挖掘工具.这两种情况下.将数据保存在内存中可以极大地提高性能.为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD在共享状态的时候是基于粗粒度的转换而不是细粒度的更新(换句话说就是

Spark - RDD(弹性分布式数据集)

org.apache.spark.rddRDDabstract class RDD[T] extends Serializable with Logging A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. Thi

Spark的核心RDD(Resilient Distributed Datasets弹性分布式数据集)

Spark的核心RDD (Resilient Distributed Datasets弹性分布式数据集)  原文链接:http://www.cnblogs.com/yjd_hycf_space/p/7681585.html 铺垫 在hadoop中一个独立的计算,例如在一个迭代过程中,除可复制的文件系统(HDFS)外没有提供其他存储的概念,这就导致在网络上进行数据复制而增加了大量的消耗,而对于两个的MapReduce作业之间数据共享只有一个办法,就是将其写到一个稳定的外部存储系统,如分布式文件系统

SparkRDD之弹性分布式数据集RDD

2.RDD概述2.1什么是RDDRDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.RDD具有数据流模型的特点:自动容错.位置感知性调度和可伸缩性.RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询速度.Dataset:一个数据集合,用于存放数据的.Distributed:RDD中的数据是分布式存储的,可用于分布式计算.R

大数据技术学习:弹性分布式数据集RDD

今天给大家分享的技术学习是:浅谈弹性分布式数据集RDD. 一.RDD定义 RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中基本的数据抽象,它代表一个不可变(数据和元数据).可分区.里面的元素可并行计算的集合.其特点在于自动容错,位置感知性调度和可伸缩性. 二.RDD的属性 1.一组分片.即数据集的基本组成单位.对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度.用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会

弹性分布式数据集(RDD)

spark围绕弹性分布式数据集(RDD)的概念展开的,RDD是一个可以并行操作的容错集合. 创建RDD的方法: 1.并行化集合(并行化驱动程序中现有的集合) 调用SparkContext的parallelize收集方法 2.外部数据集操作(引用外部系统存储的数据集) RDD操作 1.Transformations 是从将一个以有的RDD生成另外一个RDD.Transformation具有延迟加载的特性(lazy特性),Transformation算子的代码不会真正的被执行,只有当我们的程序中遇到

弹性分布式数据集:一个支持容错的集群内存计算的抽象

注:本文章是翻译自:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing 概要     我们提出了弹性分布式数据集(Resilient Distributed Datasets,简称RDDs)的概念,这是一个分布式内存的抽象,允许编程在大规模集群 上编写出以内存计算为基础的程序,并且该模型支持容错.RDD概念的提出主要启发于这样一种现象:有两种类型的应用程序,使用现

Spark弹性分布式数据集RDD概述

弹性分布数据集RDD RDD(Resilient Distributed Dataset)是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO