Spark基础 --RDD算子详解

RDD算子分为两类:Transformation和Action,如下图,记住这张图,走遍天下都不怕。

Transformation:将一个RDD通过一种规则映射为另外一个RDD。

Action:返回结果或保存结果。

注意:只有action才触发程序的执行,transformation不触发执行。

RDD的操作种类有多个,分为: 单指RDD操作、Key/Value RDD操作、多个RDD联合操作,其他操作。

单值RDD

1. Map

map (f: T => U) : RDD[U] ,其中f定义了类型为T的元素到类型为U

的元素的映射,RDD[T] => RDD[U]的变换

举例:

  var rdd=sc.makeRDD(1 to 7,3)

简写为  rdd.map(_+1)   //rdd.map(x=>x+1)

2. collect

collect(): Array[T],T是RDD中元素类型,将RDD转化为数组。

举例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.collect()

注意:此算子非常危险,他会将所有RDD中的数据汇总到Drive端的JVM内存中,对Drive端压力很大。

3. take

take(num: Int): Array[T] ,其中k是整数,T是RDD中元素类型,返回RDD中前k个元素,并保存成数组

举例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.take(2)

4. glom

glom() : RDD[Array[T]],将RDD中每个partition中元素转换为数组

举例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.glom.collect

5. coalesce

coalesce(numPartitions: Int) : RDD[T],将RDD中的partition个数合并为numPartitions个

举例:

val rdd = sc.makeRDD(1 to 7,7)

rdd.coalesce(3) // 生成新的RDD,它包含三个Partition

6. repartition

repartition(numPartitions: Int) :RDD[T],将RDD中的partition个数均匀合并为numPartitions个

举例:

val list = Seq(Seq(),Seq(),Seq(),Seq(),Seq(),Seq(),

Seq(1,2,3,4,5,6,7))

val rdd = sc.makeRDD(list, 7).flatMap(x => x)

rdd.repartition(3) // 生成新的RDD,它包含三个Partition

7. filter

filter(f: T => Boolean):

RDD[T] ,其中f定义了类型为T的元素是否留下,过滤输入RDD中的元素,将f返回true的元素留下

举例:

var rdd=sc.makeRDD(1 to 7,7)

rdd.filter(_%3==0)

8. count

count(): Long,统计RDD中元素个数,并返回Long类型

val rdd = sc.makeRDD(1 to 7, 3)

rdd.count() // 统计RDD中元素总数

9. flatMap

flatMap(f: T =>TraversableOnce[U]): RDD[U],将函数f作用在RDD中每个元素上,并展开(flatten)

输出的每个结果, flatMap = flatten + map,先映射(map),再拍扁(flatten )

举例:

val rdd = sc.makeRDD(1 to 3, 3)

rdd.flatMap( x => 1 to x) // 将x映射成1~x

10. reduce

reduce(f: (T, T) => T): T, 按照函数f对RDD中元素,进行规约

举例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.reduce((x, y) => x + y)

简写为:rdd.reduce(_ + _)

11. foreach

foreach(f: T => Unit):Unit,对RDD中每个元素,调用函数f

举例:

val rdd = sc.makeRDD(1 to 7, 3)

rdd.foreach( x => println(x))

简写为:rdd.foreach(println)

Key/Value RDD

首先先来看下如何创建一个Key/Value的rdd

var seq=Seq((A,1),(B,1),(C,1))

var rdd=sc.makeRDD(seq)

1. mapValues

对vaule做map操作

举例:

val pairs = Seq((A,1), (B,2), (A,2), (C, 4), (B, 1), (B, 1), (D, 1))

val rdd = sc.makeRDD(pairs, 3)

rdd.mapValues(_ + 1)

2. reduceByKey

对Key相同的value做计算

举例:

val pairs = Seq((‘A‘,1), (‘B‘,2), (‘A‘,2), (‘C‘, 4), (‘B‘, 1), (‘B‘, 1), (‘D‘, 1))

val rdd = sc.makeRDD(pairs, 3)

rdd.reduceByKey(_ + _)

3. groupByKey

将RDD[key,value] 按照相同的key进行分组,形成RDD[key,Iterable[value]]的形式, 有点类似于sql中的groupby

举例:

val pairs = Seq((A,1), (B,2), (A,2), (C, 4), (B, 1), (B, 1), (D, 1))

val rdd = sc.makeRDD(pairs, 3)

rdd.groupByKey()

注意:能用reducebykey代替就不用groupbykey,groupbykey会将所有的元素进行聚合,消耗大量内存。

多RDD

1. union

将多个RDD合并为一个RDD

举例:

val pairs1 = Seq((A,1), (B,1), (C,1), (D, 1), (A, 2), (C, 3))

val rdd1 = sc.makeRDD(pairs1, 3)

val pairs2 = Seq((A,4), (D,1), (E, 1))

val rdd2 = sc.makeRDD(pairs2, 2)

rdd1.union(rdd2)

2. zip

zip函数用于将两个RDD组合成Key/Value形式的RDD,如果两个rdd中的partition数量不一致,会报错。

举例:

val s1 = Seq(A, B, C, D, E)

val rdd1 = sc.makeRDD(s1)

val s2 = Seq(1, 2, 3, 4, 5)

val rdd2 = sc.makeRDD(s2)

rdd1.zip(rdd2)

3. join

join相当于SQL中的内关联join,只返回两个RDD根据K可以关联上的结果,join只能用于两个RDD之间的关联,

如果要多个RDD关联,多关联几次即可

举例:

val pairs1 = Seq((A,1), (B,1), (C,1), (D, 1), (A, 2), (C, 3))

val rdd1 = sc.makeRDD(pairs1, 3)

val pairs2 = Seq((A,4), (D,1), (C,1), (E, 1))

val rdd2 = sc.makeRDD(pairs2, 2)

rdd1.join(rdd2)

还有些是是其他rdd操作符,这里就不讲解了,上述所写如有不对之处,还请各位前辈赐教。

原文地址:https://www.cnblogs.com/wuweikongjian/p/8309245.html

时间: 2024-10-28 17:23:39

Spark基础 --RDD算子详解的相关文章

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初

【Spark】RDD操作详解4——Action算子

本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD DAG的执行. 根据Action算子的输出空间将Action算子进行分类:无输出. HDFS. Scala集合和数据类型. 无输出 foreach 对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint. 图中,foreach算子通过用户自定义函数对每个数据项进行操作. 本例中自定义函数为println,控制台打印所有数据项. 源码: /** * Applies a f

【Spark】RDD操作详解1——Transformation和Actions概况

Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换. 算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作. 输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理. 运行:在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操

Spark Streaming 源码详解

原地址 本系列内容适用范围: * 2015.12.05 update, Spark 1.6 全系列 √ (1.6.0-preview,尚未正式发布) * 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2) * 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1) * 2015.04.17 update, Spark 1.3 全系列 √ (1.3.0, 1.3.1) 概述 0.1 Spark

Linux 程序设计学习笔记----终端及串口编程基础之概念详解

转载请注明出处,谢谢! linux下的终端及串口的相关概念有: tty,控制台,虚拟终端,串口,console(控制台终端)详解 部分内容整理于网络. 终端/控制台 终端和控制台都不是个人电脑的概念,而是多人共用的小型中型大型计算机上的概念. 1.终端 一台主机,连很多终端,终端为主机提供了人机接口,每个人都通过终端使用主机的资源. 终端有字符哑终端和图形终端两种. 控制台是另一种人机接口, 不通过终端与主机相连, 而是通过显示卡-显示器和键盘接口分别与主机相连, 这是人控制主机的第一人机接口.

(赵小明RHCE笔记)linux基础之四 权限详解

一.special permissions for executables1.special permissions for executables:  -suid:command run with permissions of the owner of the command,not executor of   the command  -sgid:command runs with group affiliation of the group of the commandeg:file:us

java笔记--反射机制之基础总结与详解

一.反射之实例化Class类的5种方式: java的数据类型可以分为两类,即引用类型和原始类型(即基本数据类型). 对于每种类型的对象,java虚拟机会实例化不可变的java.lang.Class对象. 它提供了在运行时检查对象属性的方法,这些属性包括它的成员和类型信息. 更重要的是Class对象是所有反射API的入口. Class类是泛型类,可以使用@SuppressWarnings("unchecked")忽略泛型或者使用Class<V>类型. 获得Class对象的5种

基础拾遗------webservice详解

基础拾遗 基础拾遗------webservice详解 基础拾遗------redis详解 基础拾遗------反射详解 基础拾遗------委托详解 基础拾遗------接口详解 基础拾遗------泛型详解 前言 工作当中常用的服务接口有三个wcf,webservice和webapi.首先第一个接触的就是webservice,今天大致总结一下. 1.webservice概念相关 1.1.Web Service也叫XML Web Service WebService 是一种可以接收从Inter