处理键值对RDD

保存Key/Value对的RDD叫做Pair RDD。

1.创建Pair RDD:

1.1 创建Pair RDD的方式:

很多数据格式在导入RDD时,会直接生成Pair RDD。我们也可以使用map()来将之前讲到的普通RDD转化为Pair RDD。

1.2 Pair RDD转化实例:

下面例子中,把原始RDD,修改成首单词做Key,整行做Value的Pair RDD。

Java中没有tuple类型,所以使用scala的scala.Tuple2类来创建tuple。创建tuple:  new Tuple2(elem1,elem2)  ; 访问tuple的元素:  使用._1()和._2()方法来访问。

而且,在Python和Scala实现中使用基本的map()函数即可,java需要使用函数mapToPair():

	/**
	 * 将普通的基本RDD转化成一个Pair RDD,业务逻辑: 将每一行的首单词作为Key,整个句子作为Value 返回Key/Value PairRDD。
	 * @param JavaRDD<String>
	 * @return JavaPairRDD<String,String>
	 */
	public JavaPairRDD<String,String> firstWordKeyRdd(JavaRDD<String> input){
		JavaPairRDD<String,String> pair_rdd = input.mapToPair(
				new PairFunction<String,String,String>(){
					@Override
					public Tuple2<String, String> call(String arg0) throws Exception {
						// TODO Auto-generated method stub
						return new Tuple2<String,String>(arg0.split(" ")[0],arg0);
					}
				}
		);
		return pair_rdd;
	}

当从内存中的集合创建PairRDD时,Python和Scala需要使用函数SparkContext.parallelize();而Java使用函数SparkContext.parallelizePairs()。

2.Pair RDD的转化操作:

2.1 Pair RDD常见的转化操作列表:

基础RDD使用的转化操作也可以在Pair RDD中使用。因为Pair RDD中使用tuple,所以需要传递操作tuple的函数给Pair RDD.

下表列出Pair RDD常用的转化操作(事例RDD内容:{(1, 2), (3, 4), (3, 6)})

函数名 作用 调用例子 返回结果
reduceByKey(func) Combine values with the same key. rdd.reduceByKey((x, y) => x + y) {(1,2),(3,10)}
groupByKey() Group values with the same key. rdd.groupByKey() {(1,[2]),(3,[4,6])}
combineByKey(createCombiner,mergeValue, mergeCombiners,partitioner) Combine values with the same key using a different result type.
mapValues(func) Apply a function to each value of a pair RDD without changing the key. rdd.mapValues(x =>x+1) {(1,3),(3,5),(3,7)}
flatMapValues(func)
Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization.

rdd.flatMapValues(x=> (x to 5) {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}
keys() Return an RDD of just the keys. rdd.keys() {1, 3, 3}
values() Return an RDD of just the values. rdd.values() {2, 4, 6}
sortByKey() Return an RDD sorted by the key. rdd.sortByKey() {(1,2),(3,4),(3,6)}

下表列举2个RDD之间的转化操作(rdd = {(1, 2), (3, 4), (3, 6)} other = {(3,9)}):

函数名 作用 调用例子 返回结果
subtractByKey Remove elements with a key present in the other RDD. rdd.subtractByKey(other) {(1, 2)}
join Perform an inner join between two RDDs. rdd.join(other) {(3, (4, 9)),(3, (6, 9))}
rightOuterJoin Perform a join between two RDDs where the key must be present in the first RDD. rdd.rightOuterJoin(other) {(3,(Some(4),9)), (3,(Some(6),9))}
leftOuterJoin Perform a join between two RDDs where the key must be present in the other RDD. rdd.leftOuterJoin(other) {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))}
cogroup Group data from both RDDs sharing the same key. rdd.cogroup(other) {(1,([2],[])),(3,([4, 6],[9]))}

2.2 Pair RDD筛选操作:

Pair RDD也还是RDD,所以之前介绍的操作(例如filter)也同样适用于PairRDD。下面程序,筛选长度大于20的行:

	/**
	 * PairRDD筛选长度大于20的行。
	 * @param JavaPairRDD<String,String>
	 * @return JavaPairRDD<String,String>
	 */
	public JavaPairRDD<String,String> filterMoreThanTwentyLines
			(JavaPairRDD<String,String> input){
		JavaPairRDD<String,String> filter_rdd = input.filter(
				new Function<Tuple2<String, String>,Boolean>(){
					@Override
					public Boolean call(Tuple2<String, String> arg0) throws Exception {
						// TODO Auto-generated method stub
						return (arg0._2.length()>20);
					}
				}
				);
		return filter_rdd;
	}

2.3 聚合操作:



时间: 2024-10-10 17:47:36

处理键值对RDD的相关文章

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

5.2 键值对RDD

一.键值对RDD的创建 1.从文件中加载 2.通过并行集合(数组)创建RDD 二.常用的键值对RDD转换操作 三.综合实例 原文地址:https://www.cnblogs.com/nxf-rabbit75/p/11811930.html

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR

Spark学习笔记3:键值对操作

键值对RDD通常用来进行聚合计算,Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为pair RDD.pair RDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口. Spark中创建pair RDD的方法:存储键值对的数据格式会在读取时直接返回由其键值对数据组成的pair RDD,还可以使用map()函数将一个普通的RDD转为pair RDD. Pair RDD的转化操作 reduceByKey()  与reduce类似 ,接收一个函数,并使用该函数对值进行合并,

Learning Spark中文版--第四章--使用键值对(2)

Actions Available on Pair RDDs (键值对RDD可用的action) ??和transformation(转换)一样,键值对RDD也可以使用基础RDD上的action(开工),并且键值对RDD有一些利用键值对数据特性的的action,如下表: 表4-3 键值对RDD上的action 函数名 描述 例子 结果 countByKey() 计算每个键元素的总数 rdd.countByKey() {(1,1),(3,2)} collectAsMap() 结果收集成一个map便

Spark常用函数讲解--键值RDD转换

摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集RDD有两种操作算子:         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作:   

Spark之键值RDD转换(转载)

1.mapValus(fun):对[K,V]型数据中的V值map操作(例1):对每个的的年龄加2 object MapValues { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("map") val sc = new SparkContext(conf) val list = List(("mobin",22),

Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

原文作者:翟开顺首发:CSDN本人仅为自己方便查阅做了摘抄,请支持原作者原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428 github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8先从spark-learning中的一张图大致了解其功能 subtractByKey

20150218【改进】IMX257实现GPIO-IRQ中断按键获取键值驱动程序

[改进]IMX257实现GPIO-IRQ中断按键获取键值驱动程序 2015-02-18 李海沿 一.使用struct pin_desc 管理按键的值 1.定义结构体 2.将前面我们申请中断时写的(void *)1修改为 &pins_desc[n] 在ioctl中,设置中断中修改 在key_release中释放中修改 3.在中断程序中利用我们定义的struc pins_desc判断并得到按键的值 4.得到按键键值后,唤醒程序,在read函数中返回键值 附上驱动源程序: 1 /***********