Spark RDD-行动算子

2.4 Action

行动算子:触发运算,在 Executor 执行,如果想直接在 Driver 端看到结果可以使用 collect 和 foreach 都可以将数据拉取到 Driver 端。

2.4.1 reduce(func) 案例

1. 作用:通过 func 函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。

2. 需求:创建一个 RDD,将所有元素聚合得到结果

(1)创建一个 RDD[Int]

scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

(2)聚合 RDD[Int]所有元素

scala> rdd1.reduce(_+_)
res50: Int = 55

(3)创建一个 RDD[String]

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

(4)聚合 RDD[String]所有数据

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)

测试:

scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> rdd.reduce(_+_)
res17: Int = 15

2.4.2 collect() 案例

从 Executor 端拉取数据到 Driver 端操作:一般用在测试环境中。

1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。

2. 需求:创建一个 RDD,并将 RDD 内容收集到 Driver 端打印

(1)创建一个 RDD

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)将结果收集到 Driver 端

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

2.4.3 count() 案例

1. 作用:返回 RDD 中元素的个数

2. 需求:创建一个 RDD,统计该 RDD 的条数

(1)创建一个 RDD

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)统计该 RDD 的条数

scala> rdd.count
res1: Long = 10

2.4.4 first() 案例

1. 作用:返回 RDD 中的第一个元素

2. 需求:创建一个 RDD,返回该 RDD 中的第一个元素

(1)创建一个 RDD

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

(2)统计该 RDD 的条数

scala> rdd.first
res2: Int = 1

2.4.5 take(n) 案例

1. 作用:返回一个由 RDD 的前 n 个元素组成的数组

2. 需求:创建一个 RDD,统计该 RDD 的条数

(1)创建一个 RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

(2)统计该 RDD 的条数

scala> rdd.take(3)
res10: Array[Int] = Array(2, 5, 4)

2.4.6 takeSample(WithReplacement,num,[seed])

返回一个数组,该数组由从数据集中随机采样的 num 个元素组成,可以选择是否随机数

替换不足的部分,seed 用于指定随机数生成器种子。

2.4.7 takeOrdered(n) 案例

1. 作用:返回该 RDD 排序后的前 n 个元素组成的数组

--相当于先执行 sortBy() 再执行 take()

2. 需求:创建一个 RDD,统计该 RDD 的条数

(1)创建一个 RDD

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

(2)统计该 RDD 的条数

scala> rdd.takeOrdered(3)
res18: Array[Int] = Array(2, 3, 4)

2.4.8 aggregate 案例

1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

2. 作用:aggregate 函数将每个分区里面的元素通过 seqOp 和初始值进行聚合,然后用

combine 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作。这个函数最终返回

的类型不需要和 RDD 中元素类型一致。

3. 需求:创建一个 RDD,将所有元素相加得到结果

(1)创建一个 RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

(2)将该 RDD 所有元素相加得到结果

scala> rdd.aggregate(0)(_+_,_+_)
res22: Int = 55

2.4.9 fold(num)(func) 案例

1. 作用:折叠操作,aggregate 的简化操作,seqop 和 combop 一样。

2. 需求:创建一个 RDD,将所有元素相加得到结果

(1)创建一个 RDD

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

(2)将该 RDD 所有元素相加得到结果

scala> rdd.fold(0)(_+_)
res24: Int = 55

2.4.10 saveAsTextFile(path)

作用:将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,

对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本

开发中常用的读入(textFile)和 写出(saveAsTextFile)

2.4.11 saveAsSequenceFile(path)

作用:将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使

HDFS 或者其他 Hadoop 支持的文件系统。

2.4.12 saveAsObjectFile(path)

作用:用于将 RDD 中的元素序列化成对象,存储到文件中。

2.4.13 countByKey() 案例

1. 作用:针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。

2. 需求:创建一个 PairRDD,统计每种 key 的个数

(1)创建一个 PairRDD

scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

(2)统计每种 key 的个数

scala> rdd.countByKey
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

测试:

scala> sc.parallelize(Array(1,2,1,2,3,3,3)).map((_,1)).countByKey()
res19: scala.collection.Map[Int,Long] = Map(1 -> 2, 2 -> 2, 3 -> 3) 

在开发中可以搭配 Sample 查看数据是否倾斜

2.4.14 foreach(func) 案例

1. 作用:在数据集的每一个元素上,运行函数 func 进行更新。

  源码:

  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

2. 需求:创建一个 RDD,对每个元素进行打印

(1)创建一个 RDD

scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

(2)对该 RDD 每个元素进行打印

scala> rdd.foreach(println(_))
3
4
5
1
2

测试:

scala> rdd.foreach(println)
2
1
4
5
3

原文地址:https://www.cnblogs.com/LXL616/p/11144946.html

时间: 2024-11-12 13:10:33

Spark RDD-行动算子的相关文章

Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

原文作者:翟开顺首发:CSDN本人仅为自己方便查阅做了摘抄,请支持原作者原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428 github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8先从spark-learning中的一张图大致了解其功能 subtractByKey

Spark RDD算子实战

[TOC] Spark算子概述 RDD:弹性分布式数据集,是一种特殊集合.支持多种来源.有容错机制.可以被缓存.支持并行操作,一个RDD代表多个分区里的数据集. RDD有两种操作算子: Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作 Action(执行):触发Spark作业的运行,真正触发转换算子的计算 需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都

Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流 对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下: 除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下: List<Integer> list = new ArrayList<Integ

【spark 深入学习 03】Spark RDD的蛮荒世界

RDD真的是一个很晦涩的词汇,他就是伯克利大学的博士们在论文中提出的一个概念,很抽象,很难懂:但是这是spark的核心概念,因此有必要spark rdd的知识点,用最简单.浅显易懂的词汇描述.不想用学术话的语言来阐述RDD是什么,用简单.容易理解的方式来描述. 一.什么是RDD,RDD出现的背景 Mapreduce计算模型的出现解决了分布式计算的诸多难题,但是由于MR对数据共享的解决方案比较低效,导致MR编程模型效率不高,将数据写到一个稳定的外部存储系统,如HDFS,这个会引起数据复写.磁盘IO

Spark RDD Transformation 简单用例(三)

cache和persist 将RDD数据进行存储,persist(newLevel: StorageLevel)设置了存储级别,cache()和persist()是相同的,存储级别为MEMORY_ONLY.因为RDD的transformation是lazy的,只有action算子才会触发transformain真正的执行,如果一个rdd需要进行多次的action算子操作,最好能够使用cache或persist将rdd缓存至内存中,这样除第一次action会触发transformation操作,后

Spark RDD Transformation 简单用例(二)

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine

Spark RDD使用详解1--RDD原理

RDD简介 在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD).RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现.RDD是Spark最核心的东西,它表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现.RDD必须是可序列化的.RDD可以cache到内存中,每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操

RDD转换算子与操作算子

一.RDD算子分类 1. RDD算子分类及概述 RDD的算子分为Transformation和Action两类,Transformation是延迟执行,Action是立即执行.Transformation和Action本质上的区别是,Transformation是从一个RDD到一个RDD,Action是从一个RDD到一个值.由下图可知,Spark的的转换算子与操作算子的执行流程.首先可以从HDFS中使用textFile方法将数据加载到内存,然后经过转换算子对RDD进行转换,最后再通过操作算子Ac

Spark笔记整理(五):Spark RDD持久化、广播变量和累加器

[TOC] Spark RDD持久化 RDD持久化工作原理 Spark非常重要的一个功能特性就是可以将RDD持久化在内存中.当对RDD执行持久化操作时,每个节点都会将自己操作的RDD的partition持久化到内存中,并且在之后对该RDD的反复使用中,直接使用内存缓存的partition.这样的话,对于针对一个RDD反复执行多个操作的场景,就只要对RDD计算一次即可,后面直接使用该RDD,而不需要反复计算多次该RDD. 巧妙使用RDD持久化,甚至在某些场景下,可以将spark应用程序的性能提升1