spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark

本文对join相关的api做了一个解释

SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

JavaPairRDD<Integer, Integer> javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2<>(1, 2),
                new Tuple2<>(3, 4), new Tuple2<>(3, 6), new Tuple2<>(5, 6)));
JavaPairRDD<Integer, Integer> otherJavaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2<>(3, 9),
                new Tuple2<>(4, 5)));
//结果: [(4,([],[5])), (1,([2],[])), (3,([4, 6],[9])), (5,([6],[]))]
System.out.println(javaPairRDD.cogroup(otherJavaPairRDD).collect());

//结果: [(4,([],[5])), (1,([2],[])), (3,([4, 6],[9])), (5,([6],[]))]
// groupWith和cogroup效果是一模一样的
System.out.println(javaPairRDD.groupWith(otherJavaPairRDD).collect());

//结果: [(3,(4,9)), (3,(6,9))]
//基于cogroup实现的,就是取cogroup结果中相同key在两个RDD都有value的数据
System.out.println(javaPairRDD.join(otherJavaPairRDD).collect());

//结果: [(1,(2,Optional.empty)), (3,(4,Optional[9])), (3,(6,Optional[9])), (5,(6,Optional.empty))]
//基于cogroup实现的,结果需要出现的key以左边的RDD为准
System.out.println(javaPairRDD.leftOuterJoin(otherJavaPairRDD).collect());

//结果: [(4,(Optional.empty,5)), (3,(Optional[4],9)), (3,(Optional[6],9))]
//基于cogroup实现的,结果需要出现的key以右边的RDD为准
System.out.println(javaPairRDD.rightOuterJoin(otherJavaPairRDD).collect());

//结果: [(4,(Optional.empty,Optional[5])), (1,(Optional[2],Optional.empty)), (3,(Optional[4],Optional[9])), (3,(Optional[6],Optional[9])), (5,(Optional[6],Optional.empty))]
//基于cogroup实现的,结果需要出现的key是两个RDD中所有的key
System.out.println(javaPairRDD.fullOuterJoin(otherJavaPairRDD).collect());

从上可以看出,最基本的操作是cogroup这个操作,下面是cougroup的原理图:

如果想对cogroup原理更彻底的理解,可以参考:spark core RDD api原理详解

时间: 2024-12-12 15:41:35

spark2.x由浅入深深到底系列六之RDD java api详解四的相关文章

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了java api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: //从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从

spark2.x由浅入深深到底系列六之RDD java api详解二

package com.twq.javaapi.java7; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.funct

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext 一.简单实现scala版本的RDD和SparkContext class RDD[T](value: Seq[T]) {   //RDD的map操作   def map[U](f: T => U): RDD[U] = {     new RDD(value.map(f))   

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark 以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库: package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; imp

spark2.x由浅入深深到底系列七之RDD python api详解一

学习spark任何技术之前,请先正确理解spark,可以参考:正确理解spark 以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了python api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: """ 创建RDD的方法: 1: 从一个稳定的存储系统中,比如hdfs文件, 或者本地文件系统 """

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark 我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口.接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现: 一.非lambda实现的java spark

spark2.x由浅入深深到底系列五之python开发spark环境配置

学习spark任何的技术前,请先正确理解spark,可以参考: 正确理解spark 以下是在mac操作系统上配置用python开发spark的环境 一.安装python spark2.2.0需要python的版本是Python2.6+ 或者 Python3.4+ 可以参考: http://jingyan.baidu.com/article/7908e85c78c743af491ad261.html 二.下载spark编译包并配置环境变量 1.在官网中: http://spark.apache.o

reactjs入门到实战(六)---- ReactJS组件API详解

全局的api 1.React.createClass 创建一个组件类,并作出定义.组件实现了 render() 方法,该方法返回一个子级.该子级可能包含很深的子级结构.组件与标准原型类的不同之处在于,你不需要使用 new 来实例化. 组件是一种很方便的封装,可以(通过 new )为你创建后台实例. 2.React.createElement ReactElement createElement( string/ReactClass type, [object props], [children