【spark】RDD操作

RDD操作分为转换操作和行动操作。

对于RDD而言,每一次的转化操作都会产生不同的RDD,供一个操作使用。

我们每次转换得到的RDD是惰性求值的

也就是说,整个转换过程并不是会真正的去计算,而是只记录了转换的轨迹。

当遇到行动操作的时候,才会发生真正的计算,从DAG图的源头开始进行“从头到尾”的计算。

常见的操作


操作类型


函数名


作用


转化操作


map()


参数是函数,函数应用于RDD每一个元素,返回值是新的RDD


flatMap()


参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD


filter()


参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD


distinct()


没有参数,将RDD里的元素进行去重操作


union()


参数是RDD,生成包含两个RDD所有元素的新RDD


intersection()


参数是RDD,求出两个RDD的共同元素


subtract()


参数是RDD,将原RDD里和参数RDD里相同的元素去掉


cartesian()


参数是RDD,求两个RDD的笛卡儿积


行动操作


collect()


返回RDD所有元素


count()


RDD里元素个数


countByValue()


各元素在RDD中出现次数


reduce()


并行整合所有RDD数据,例如求和操作


fold(0)(func)


和reduce功能一样,不过fold带有初始值


aggregate(0)(seqOp,combop)


和reduce功能一样,但是返回的RDD数据类型和原RDD不一样


foreach(func)


对RDD每个元素都是使用特定函数

除此之外我们还用到过的转换操作还有

1.groupByKey():应用于(K,V)键值对的数据集,返回一个新的(K,Iterable)形式的数据集

2.reduceByKey(func):应用于(K,V)键值对的数据集,返回一个新的(K,V)形式的数据集

其中每个值是将每个Key传入到func中进行聚合。

除此之外我们还用到过的行动操作还有

1.first():返回数据集的第一个元素

2.take(n):以数组形式返回数据集的前n个元素。

示例

转化操作

val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1)
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")

/* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")

/* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")

/* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")

/* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")

/* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")

/* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")

/* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")

行动操作

val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)

  

/* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")

/* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")

/* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x, y) => x + y))
    println("======countByValue操作======")

/* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x, y) => x + y))
    println("======fold操作======")

/* aggregate操作 */
    println("======aggregate操作======")
    val res: (Int, Int) = rddInt.aggregate((0, 0))((x, y) => (x._1 + x._2, y),
                                                               (x, y) => (x._1 + x._2, y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")

/* foreach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")

原文地址:https://www.cnblogs.com/zzhangyuhang/p/8989894.html

时间: 2024-10-29 06:59:54

【spark】RDD操作的相关文章

Spark RDD操作(1)

https://www.zybuluo.com/jewes/note/35032 RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD.从编程的角度来看,RDD可以简单看成是一个数组.和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理.因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果.本文为第一部分,将介绍Spark RDD中与Ma

Spark RDD操作记录(总结)

创建ListintRDD = sc.parallelize(List(1,2,3,4,5)) 过滤包含stringRDD.filter(_.contains("a")).collect() 去重stringRDD.distinct.collect() RDD拆分val sRDD = intRDD.randomSplit(Array(0.4,0.6)) 显示拆分后的RDDsRDD(0).collect() 奇偶区分intRDD.groupBy(x => {if (x % 2 ==

Spark RDD 操作实战之文件读取

/1.本地文件读取 val local_file_1 = sc.textFile("/home/hadoop/sp.txt") val local_file_2 = sc.textFile("file://home/hadoop/sp.txt") //2.当前目录下的文件 val file1 = sc.textFile("sp.txt") //3.HDFS文件 val hdfs_file1 = sc.textFile("hdfs://1

Spark编程模型及RDD操作

转载自:http://blog.csdn.net/liuwenbo0920/article/details/45243775 1. Spark中的基本概念 在Spark中,有下面的基本概念.Application:基于Spark的用户程序,包含了一个driver program和集群中多个executorDriver Program:运行Application的main()函数并创建SparkContext.通常SparkContext代表driver programExecutor:为某App

Spark性能优化(2)——广播变量、本地缓存目录、RDD操作、数据倾斜

广播变量 背景 一般Task大小超过10K时(Spark官方建议是20K),需要考虑使用广播变量进行优化.大表小表Join,小表使用广播的方式,减少Join操作. 参考:Spark广播变量与累加器 Local Dir 背景 shuffle过程中,临时数据需要写入本地磁盘.本地磁盘的临时目录通过参数spark.local.dir配置. 性能优化点 spark.local.dir支持配置多个目录.配置spark.local.dir有多个目录,每个目录对应不同的磁盘,这样可以提升IO效率.另外,可以采

Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流 对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下: 除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下: List<Integer> list = new ArrayList<Integ

Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

原文作者:翟开顺首发:CSDN本人仅为自己方便查阅做了摘抄,请支持原作者原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428 github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8先从spark-learning中的一张图大致了解其功能 subtractByKey

Spark RDD Action操作

reduce def reduce(f: (T, T) => T): T通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的 1 2 3 4 5 6 7 8 9 10 11 scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24 scala> rdd1.r

【Spark】RDD操作具体解释4——Action算子

本质上在Actions算子中通过SparkContext运行提交作业的runJob操作,触发了RDD DAG的运行. 依据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每一个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中.foreach算子通过用户自己定义函数对每一个数据项进行操作. 本例中自己定义函数为println,控制台打印全部数据项. 源代码: /** * Applie

【Spark】RDD操作详解4——Action算子

本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行. 根据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中,foreach算子通过用户自定义函数对每个数据项进行操作. 本例中自定义函数为println,控制台打印所有数据项. 源码: /** * Applies a f