spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark

本文详细介绍了spark key-value类型的rdd java api

一、key-value类型的RDD的创建方式

1、sparkContext.parallelizePairs

JavaPairRDD<String, Integer> javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
//结果:[(test,3), (kkk,3)]
System.out.println("javaPairRDD = " + javaPairRDD.collect());

2、keyBy的方式

public class User implements Serializable {
    private String userId;

    private Integer amount;

    public User(String userId, Integer amount) {
        this.userId = userId;
        this.amount = amount;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=‘" + userId + ‘\‘‘ +
                ", amount=" + amount +
                ‘}‘;
    }
}

JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(new User("u1", 20)));
JavaPairRDD<String, User> userJavaPairRDD = userJavaRDD.keyBy(new Function<User, String>() {
    @Override
    public String call(User user) throws Exception {
        return user.getUserId();
    }
});
//结果:[(u1,User{userId=‘u1‘, amount=20})]
System.out.println("userJavaPairRDD = " + userJavaPairRDD.collect());

3、zip的方式

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
//两个rdd zip也是创建key-value类型RDD的一种方式
JavaPairRDD<Integer, Integer> zipPairRDD = rdd.zip(rdd);
//结果:[(1,1), (1,1), (2,2), (3,3), (5,5), (8,8), (13,13)]
System.out.println("zipPairRDD = " + zipPairRDD.collect());

4、groupBy的方式

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isEven = new Function<Integer, Boolean>() {
    @Override
    public Boolean call(Integer x) throws Exception {
        return x % 2 == 0;
    }
};
//将偶数和奇数分组,生成key-value类型的RDD
JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isEven);
//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]
System.out.println("oddsAndEvens = " + oddsAndEvens.collect());
//结果:1
System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());

oddsAndEvens = rdd.groupBy(isEven, 2);
//结果:[(false,[1, 1, 3, 5, 13]), (true,[2, 8])]
System.out.println("oddsAndEvens = " + oddsAndEvens.collect());
//结果:2
System.out.println("oddsAndEvens.partitions.size = " + oddsAndEvens.partitions().size());

二、combineByKey

JavaPairRDD<String, Integer> javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
                new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);

//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数
Function<Integer, Tuple2<Integer, Integer>> createCombiner = new Function<Integer, Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> call(Integer value) throws Exception {
        return new Tuple2<>(value, 1);
    }
};
//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数
Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>> mergeValue =
        new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer value) throws Exception {
                return new Tuple2<>(acc._1() + value, acc._2() + 1);
            }
        };
//当需要对不同分区的数据进行聚合的时候应用这个函数
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiners =
        new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
                return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
            }
        };

JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD =
        javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());

combineByKey的数据流如下:

对于combineByKey的原理讲解详细见: spark core RDD api原理详解

三、aggregateByKey

JavaPairRDD<String, Tuple2<Integer, Integer>> aggregateByKeyRDD =
        javaPairRDD.aggregateByKey(new Tuple2<>(0, 0), mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("aggregateByKeyRDD = " + aggregateByKeyRDD.collect());
//aggregateByKey是由combineByKey实现的,上面的aggregateByKey就是等于下面的combineByKeyRDD
Function<Integer, Tuple2<Integer, Integer>> createCombinerAggregateByKey =
        new Function<Integer, Tuple2<Integer, Integer>>() {
            @Override
            public Tuple2<Integer, Integer> call(Integer value) throws Exception {
                return mergeValue.call(new Tuple2<>(0, 0), value);
            }
        };
//结果是: [(coffee,(12,3)), (panda,(3,1))]
System.out.println(javaPairRDD.combineByKey(createCombinerAggregateByKey, mergeValue, mergeCombiners).collect());

四、reduceByKey

JavaPairRDD<String, Integer> reduceByKeyRDD = javaPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer value1, Integer value2) throws Exception {
        return value1 + value2;
    }
});
//结果:[(coffee,12), (panda,3)]
System.out.println("reduceByKeyRDD = " + reduceByKeyRDD.collect());
//reduceByKey底层也是combineByKey实现的,上面的reduceByKey等于下面的combineByKey
Function<Integer, Integer> createCombinerReduce = new Function<Integer, Integer>() {
    @Override
    public Integer call(Integer integer) throws Exception {
        return integer;
    }
};
Function2<Integer, Integer, Integer> mergeValueReduce =
        new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        };
//结果:[(coffee,12), (panda,3)]
System.out.println(javaPairRDD.combineByKey(createCombinerReduce, mergeValueReduce, mergeValueReduce).collect());

五、foldByKey

JavaPairRDD<String, Integer> foldByKeyRDD = javaPairRDD.foldByKey(0, new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer integer, Integer integer2) throws Exception {
        return integer + integer2;
    }
});
//结果:[(coffee,12), (panda,3)]
System.out.println("foldByKeyRDD = " + foldByKeyRDD.collect());
//foldByKey底层也是combineByKey实现的,上面的foldByKey等于下面的combineByKey
Function2<Integer, Integer, Integer> mergeValueFold =
        new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        };

Function<Integer, Integer> createCombinerFold = new Function<Integer, Integer>() {
    @Override
    public Integer call(Integer integer) throws Exception {
        return mergeValueFold.call(0, integer);
    }
};
//结果:[(coffee,12), (panda,3)]
System.out.println(javaPairRDD.combineByKey(createCombinerFold, mergeValueFold, mergeValueFold).collect());

六、groupByKey

JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = javaPairRDD.groupByKey();
//结果:[(coffee,[1, 2, 9]), (panda,[3])]
System.out.println("groupByKeyRDD = " + groupByKeyRDD.collect());
//groupByKey底层也是combineByKey实现的,上面的groupByKey等于下面的combineByKey
Function<Integer, List<Integer>> createCombinerGroup = new Function<Integer, List<Integer>>() {
    @Override
    public List<Integer> call(Integer integer) throws Exception {
        List<Integer> list = new ArrayList<>();
        list.add(integer);
        return list;
    }
};
Function2<List<Integer>, Integer, List<Integer>> mergeValueGroup = new Function2<List<Integer>, Integer, List<Integer>>() {
    @Override
    public List<Integer> call(List<Integer> integers, Integer integer) throws Exception {
        integers.add(integer);
        return integers;
    }
};
Function2<List<Integer>, List<Integer>, List<Integer>> mergeCombinersGroup =
        new Function2<List<Integer>, List<Integer>, List<Integer>>() {
            @Override
            public List<Integer> call(List<Integer> integers, List<Integer> integers2) throws Exception {
                integers.addAll(integers2);
                return integers;
            }
        };
//结果:[(coffee,[1, 2, 9]), (panda,[3])]
System.out.println(javaPairRDD.combineByKey(createCombinerGroup, mergeValueGroup, mergeCombinersGroup).collect());

对于api原理性的东西很难用文档说明清楚,如果想更深入,更透彻的理解api的原理,可以参考: spark core RDD api原理详解

时间: 2024-12-23 18:43:04

spark2.x由浅入深深到底系列六之RDD java api详解三的相关文章

spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark 本文对join相关的api做了一个解释 SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, Integer> javaPa

spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了java api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: //从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从

spark2.x由浅入深深到底系列六之RDD java api详解二

package com.twq.javaapi.java7; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.funct

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext 一.简单实现scala版本的RDD和SparkContext class RDD[T](value: Seq[T]) {   //RDD的map操作   def map[U](f: T => U): RDD[U] = {     new RDD(value.map(f))   

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark 以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库: package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; imp

spark2.x由浅入深深到底系列七之RDD python api详解一

学习spark任何技术之前,请先正确理解spark,可以参考:正确理解spark 以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了python api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: """ 创建RDD的方法: 1: 从一个稳定的存储系统中,比如hdfs文件, 或者本地文件系统 """

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark 我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口.接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现: 一.非lambda实现的java spark

spark2.x由浅入深深到底系列五之python开发spark环境配置

学习spark任何的技术前,请先正确理解spark,可以参考: 正确理解spark 以下是在mac操作系统上配置用python开发spark的环境 一.安装python spark2.2.0需要python的版本是Python2.6+ 或者 Python3.4+ 可以参考: http://jingyan.baidu.com/article/7908e85c78c743af491ad261.html 二.下载spark编译包并配置环境变量 1.在官网中: http://spark.apache.o

reactjs入门到实战(六)---- ReactJS组件API详解

全局的api 1.React.createClass 创建一个组件类,并作出定义.组件实现了 render() 方法,该方法返回一个子级.该子级可能包含很深的子级结构.组件与标准原型类的不同之处在于,你不需要使用 new 来实例化. 组件是一种很方便的封装,可以(通过 new )为你创建后台实例. 2.React.createElement ReactElement createElement( string/ReactClass type, [object props], [children