原文作者:翟开顺
首发:CSDN
本人仅为自己方便查阅做了摘抄,请支持原作者
原文地址:https://blog.csdn.net/t1dmzks/article/details/72077428
github: https://github.com/zhaikaishun/spark_tutorial/tree/master/src/main/java/com/spark/rdd_tutorial/tutorial8
先从spark-learning中的一张图大致了解其功能
subtractByKey
函数定义
12345 |
def [W](other: RDD[(K, W)])(implicit arg0: ClassTag[W]): RDD[(K, V)] def [W](other: RDD[(K, W)], numPartitions: Int)(implicit arg0: ClassTag[W]): RDD[(K, V)] def [W](other: RDD[(K, W)], p: Partitioner)(implicit arg0: ClassTag[W]): RDD[(K, V)] |
类似于subtrac,删掉 RDD 中键与 other RDD 中的键相同的元素
join
函数定义
12345 |
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] |
RDD1.join(RDD2)
可以把RDD1,RDD2中的相同的key给连接起来,类似于sql中的join操作
fullOuterJoin
和join类似,不过这是全连接
leftOuterJoin
12345 |
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] |
直接看图即可
对两个 RDD 进行连接操作,类似于sql中的左外连接
rightOuterJoin
对两个 RDD 进行连接操作,类似于sql中的右外连接,存在的话,value用的Some, 不存在用的None,具体的看上面的图和下面的代码即可
代码示例
scala语言
1234567891011121314 |
scala> val rdd = sc.makeRDD(Array((1,2),(3,4),(3,6)))scala> val other = sc.makeRDD(Array((3,9))) scala> rdd.subtractByKey(other).collect()res0: Array[(Int, Int)] = Array((1,2)) scala> rdd.join(other).collect()res1: Array[(Int, (Int, Int))] = Array((3,(4,9)), (3,(6,9))) scala> rdd.leftOuterJoin(other).collect()res2: Array[(Int, (Int, Option[Int]))] = Array((1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))) scala> rdd.rightOuterJoin(other).collect()res3: Array[(Int, (Option[Int], Int))] = Array((3,(Some(4),9)), (3,(Some(6),9))) |
java语言
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.Optional;import scala.Tuple2;import java.util.Arrays;import java.util.Map; public class JoinRDD { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("ReduceByKey").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(sparkConf); sc.setLogLevel("WARN"); JavaRDD<Tuple2<Integer,Integer>> rddPre = sc.parallelize(Arrays.asList(new Tuple2(1,2) , new Tuple2(3,4) , new Tuple2(3,6))); JavaRDD<Tuple2<Integer,Integer>> otherPre = sc.parallelize(Arrays.asList(new Tuple2(3,10),new Tuple2(4,8))); JavaPairRDD<Integer, Integer> rdd = JavaPairRDD.fromJavaRDD(rddPre); JavaPairRDD<Integer, Integer> other = JavaPairRDD.fromJavaRDD(otherPre); //subtractByKey JavaPairRDD<Integer, Integer> subRDD = rdd.subtractByKey(other); //join JavaPairRDD<Integer, Tuple2<Integer, Integer>> joinRDD = rdd.join(other); //fullOutJoin JavaPairRDD<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinRDD = rdd.fullOuterJoin(other); //leftOuterJoin JavaPairRDD<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinRDD = rdd.leftOuterJoin(other); //rightOutJoin JavaPairRDD<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinRDD = rdd.rightOuterJoin(other); //输出看效果 Map<Integer, Integer> subMap = subRDD.collectAsMap(); System.out.println("-------------subRDD-------------"); for (Integer key : subMap.keySet()) { System.out.println("subRDD: "+key+", "+subMap.get(key)); } Map<Integer, Tuple2<Integer, Integer>> joinMap = joinRDD.collectAsMap(); System.out.println("-------------joinRDD-------------"); for (Integer key : joinMap.keySet()) { System.out.println("join: "+key+", Tuple("+joinMap.get(key)._1+","+joinMap.get(key)._2+")"); } Map<Integer, Tuple2<Optional<Integer>, Optional<Integer>>> fullOutJoinMap = fullOutJoinRDD.collectAsMap(); System.out.println("-------------fullOutJoinRDD-------------"); for (Integer key : fullOutJoinMap.keySet()) { System.out.println("fullOutJoinRDD: "+key+", Tuple("+fullOutJoinMap.get(key)._1+","+fullOutJoinMap.get(key)._2+")"); } Map<Integer, Tuple2<Integer, Optional<Integer>>> leftOutJoinMap = leftOutJoinRDD.collectAsMap(); System.out.println("-------------leftOutJoinRDD-------------"); for (Integer key : leftOutJoinMap.keySet()) { System.out.println("leftOutJoinRDD: "+key+", Tuple("+leftOutJoinMap.get(key)._1+","+leftOutJoinMap.get(key)._2+")"); } Map<Integer, Tuple2<Optional<Integer>, Integer>> rightOutJoinMap = rightOutJoinRDD.collectAsMap(); System.out.println("-------------rightOutJoinRDD-------------"); for (Integer key : rightOutJoinMap.keySet()) { System.out.println("rightOutJoinRDD: "+key+", Tuple("+rightOutJoinMap.get(key)._1+","+rightOutJoinMap.get(key)._2+")"); } }} |
运行后显示
1234567891011121314 |
-------------subRDD-------------subRDD: 1, 2-------------joinRDD-------------join: 3, Tuple(6,10)-------------fullOutJoinRDD-------------fullOutJoinRDD: 4, Tuple(Optional.empty,Optional[8])fullOutJoinRDD: 1, Tuple(Optional[2],Optional.empty)fullOutJoinRDD: 3, Tuple(Optional[6],Optional[10])-------------leftOutJoinRDD-------------leftOutJoinRDD: 1, Tuple(2,Optional.empty)leftOutJoinRDD: 3, Tuple(6,Optional[10])-------------rightOutJoinRDD-------------rightOutJoinRDD: 4, Tuple(Optional.empty,8)rightOutJoinRDD: 3, Tuple(Optional[6],10) |
原文:大专栏 Spark RDD常用算子操作(八) 键值对关联操作 subtractByKey, join,fullOuterJoin, rightOuterJoin, leftOuterJoin
原文地址:https://www.cnblogs.com/petewell/p/11615193.html
时间: 2024-08-06 16:04:56