Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

原文作者:翟开顺
首发:CSDN
本人仅为自己方便查阅做了摘抄,请支持原作者
原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428

github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8
先从spark-learning中的一张图大致了解其功能

subtractByKey

函数定义

12345
def [W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)]

def [W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)]

def [W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)]

类似于subtrac,删掉 RDD 中键与 other RDD 中的键相同的元素

join

函数定义

12345
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]

RDD1.join(RDD2)
可以把RDD1,RDD2中的相同的key给连接起来,类似于sql中的join操作

fullOuterJoin

和join类似,不过这是全连接

leftOuterJoin

12345
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))]

def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))]

直接看图即可
对两个 RDD 进行连接操作,类似于sql中的左外连接

rightOuterJoin

对两个 RDD 进行连接操作,类似于sql中的右外连接,存在的话,value用的Some, 不存在用的None,具体的看上面的图和下面的代码即可

代码示例

scala语言

1234567891011121314
scala> val rdd = sc.makeRDD(Array((1,2),(3,4),(3,6)))scala> val other = sc.makeRDD(Array((3,9)))

scala>  rdd.subtractByKey(other).collect()res0: Array[(Int, Int)] = Array((1,2))

scala> rdd.join(other).collect()res1: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9)))

scala> rdd.leftOuterJoin(other).collect()res2: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9))))

scala> rdd.rightOuterJoin(other).collect()res3: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9)))

java语言

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.Optional;import scala.Tuple2;import java.util.Arrays;import java.util.Map;

public class JoinRDD {    public static void main(String[] args) {        SparkConf sparkConf = new SparkConf().setAppName("ReduceByKey").setMaster("local");        JavaSparkContext sc = new JavaSparkContext(sparkConf);        sc.setLogLevel("WARN");

        JavaRDD<Tuple2<Integer,Integer>> rddPre = sc.parallelize(Arrays.asList(new Tuple2(1,2)                , new Tuple2(3,4)                , new Tuple2(3,6)));        JavaRDD<Tuple2<Integer,Integer>> otherPre = sc.parallelize(Arrays.asList(new Tuple2(3,10),new Tuple2(4,8)));

        JavaPairRDD<Integer, Integer> rdd = JavaPairRDD.fromJavaRDD(rddPre);        JavaPairRDD<Integer, Integer> other = JavaPairRDD.fromJavaRDD(otherPre);        //subtractByKey        JavaPairRDD<Integer, Integer> subRDD = rdd.subtractByKey(other);

        //join        JavaPairRDD<Integer, Tuple2<Integer, Integer>> joinRDD =  rdd.join(other);        //fullOutJoin        JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinRDD = rdd.fullOuterJoin(other);        //leftOuterJoin        JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinRDD = rdd.leftOuterJoin(other);

        //rightOutJoin        JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinRDD = rdd.rightOuterJoin(other);        //输出看效果        Map<Integer, Integer> subMap = subRDD.collectAsMap();        System.out.println("-------------subRDD-------------");        for (Integer key : subMap.keySet()) {            System.out.println("subRDD: "+key+", "+subMap.get(key));        }        Map<Integer, Tuple2<Integer, Integer>> joinMap = joinRDD.collectAsMap();        System.out.println("-------------joinRDD-------------");        for (Integer key : joinMap.keySet()) {            System.out.println("join: "+key+", Tuple("+joinMap.get(key)._1+","+joinMap.get(key)._2+")");        }        Map<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinMap = fullOutJoinRDD.collectAsMap();        System.out.println("-------------fullOutJoinRDD-------------");        for (Integer key : fullOutJoinMap.keySet()) {            System.out.println("fullOutJoinRDD: "+key+", Tuple("+fullOutJoinMap.get(key)._1+","+fullOutJoinMap.get(key)._2+")");        }

        Map<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinMap = leftOutJoinRDD.collectAsMap();        System.out.println("-------------leftOutJoinRDD-------------");        for (Integer key : leftOutJoinMap.keySet()) {            System.out.println("leftOutJoinRDD: "+key+", Tuple("+leftOutJoinMap.get(key)._1+","+leftOutJoinMap.get(key)._2+")");        }

        Map<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinMap = rightOutJoinRDD.collectAsMap();        System.out.println("-------------rightOutJoinRDD-------------");        for (Integer key : rightOutJoinMap.keySet()) {            System.out.println("rightOutJoinRDD: "+key+", Tuple("+rightOutJoinMap.get(key)._1+","+rightOutJoinMap.get(key)._2+")");        }

    }}

运行后显示

1234567891011121314
-------------subRDD-------------subRDD: 1, 2-------------joinRDD-------------join: 3, Tuple(6,10)-------------fullOutJoinRDD-------------fullOutJoinRDD: 4, Tuple(Optional.empty,Optional[8])fullOutJoinRDD: 1, Tuple(Optional[2],Optional.empty)fullOutJoinRDD: 3, Tuple(Optional[6],Optional[10])-------------leftOutJoinRDD-------------leftOutJoinRDD: 1, Tuple(2,Optional.empty)leftOutJoinRDD: 3, Tuple(6,Optional[10])-------------rightOutJoinRDD-------------rightOutJoinRDD: 4, Tuple(Optional.empty,8)rightOutJoinRDD: 3, Tuple(Optional[6],10)

原文:大专栏  Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin

原文地址:https://www.cnblogs.com/petewell/p/11615193.html

时间: 2024-08-06 16:04:56

Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin的相关文章

Learning Spark中文版--第四章--使用键值对(2)

Actions Available on Pair RDDs (键值对RDD可用的action) ??和transformation(转换)一样,键值对RDD也可以使用基础RDD上的action(开工),并且键值对RDD有一些利用键值对数据特性的的action,如下表: 表4-3 键值对RDD上的action 函数名 描述 例子 结果 countByKey() 计算每个键元素的总数 rdd.countByKey() {(1,1),(3,2)} collectAsMap() 结果收集成一个map便

Spark RDD基本概念、宽窄依赖、转换行为操作

目录 RDD概述 RDD的内部代码 案例 小总结 转换.行动算子 宽.窄依赖 Reference 本文介绍一下rdd的基本属性概念.rdd的转换/行动操作.rdd的宽/窄依赖. RDD:Resilient Distributed Dataset 弹性分布式数据集,是Spark中的基本抽象. RDD表示可以并行操作的元素的不变分区集合. RDD提供了许多基本的函数(map.filter.reduce等)供我们进行数据处理. RDD概述 通常来说,每个RDD有5个主要的属性组成: 分区列表 RDD是

Redis常用命令手册:键值相关命令

Redis提供了丰富的命令(command)对数据库和各种数据类型进行操作,这些command可以在Linux终端使用.在编程时,比如各类语言包,这些命令都有对应的方法.下面将Redis提供的命令做一总结. 键值相关命令 1.keys 返回满足给定pattern的所有key: redis 127.0.0.1:6379> keys * 1) "myzset2" 2) "myzset3" 3) "mylist" 4) "myset2&

Spark学习笔记3:键值对操作

键值对RDD通常用来进行聚合计算,Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为pair RDD.pair RDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口. Spark中创建pair RDD的方法:存储键值对的数据格式会在读取时直接返回由其键值对数据组成的pair RDD,还可以使用map()函数将一个普通的RDD转为pair RDD. Pair RDD的转化操作 reduceByKey()  与reduce类似 ,接收一个函数,并使用该函数对值进行合并,

redis常用操作和操作键值

string.list常用操作 string数据创建.覆盖及设置过期时间 127.0.0.1:6379> get key1 ? ? ? ? 查看原来的key1的值 "b" 127.0.0.1:6379> set key1 123 ? ? 覆盖创建一个新的key1的string的值 OK 127.0.0.1:6379> get key1 ? ? ? ? 查看 "123" 127.0.0.1:6379> setnx key1 456 ? ? 使用

Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer ============================================================ SparkRDD编程(一) Spark 的键值对(pair RDD)操作,Scala实现 RDD的分区函数 目前Spark中实现的分区函数包括两种 Ha

Dictionary读取键值的快捷方法

对泛型集合Dictionary<T,T> 进行读取键值是经常的操作,一般情况下,都是通过keys 和values进行键值的读取操作: eg: foreach (var item in dic.Keys)            {                Console.WriteLine(item);            } foreach (var item in dic.Values)            {                Console.WriteLine(it

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

Spark常用函数讲解--键值RDD转换

摘要: RDD:弹性分布式数据集,是一种特殊集合 ‚ 支持多种来源 ‚ 有容错机制 ‚ 可以被缓存 ‚ 支持并行操作,一个RDD代表一个分区里的数据集RDD有两种操作算子:         Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住       了数据集的逻辑操作         Ation(执行):触发Spark作业的运行,真正触发转换算子的计算 本系列主要讲解Spark中常用的函数操作: