spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式、单类型RDD基本的transformation api、采样Api以及pipe操作进行了java api方面的阐述

一、RDD的三种创建方式

  1. 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下:
//从hdfs文件中创建
JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt");

//从本地文件系统的文件中,注意file:后面肯定是至少三个///,四个也行,不能是两个
//如果指定第二个参数的话,表示创建的RDD的最小的分区数,如果文件分块的数量大于指定的分区
//数的话则已文件的分块数量为准
JavaRDD<String> textFileRDD = sc.textFile(" 2 );

2.  可以经过transformation api从一个已经存在的RDD上创建一个新的RDD,以下是map这个转换api

JavaRDD<String> mapRDD = textFileRDD.map(new Function<String, String>() {
    @Override
    public String call(String s) throws Exception {
        return s + "test";
    }
});
System.out.println("mapRDD = " + mapRDD.collect());

3.  从一个内存中的列表数据创建一个RDD,可以指定RDD的分区数,如果不指定的话,则取所有Executor的所有cores数量

//创建一个单类型的JavaRDD
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3, 4), 2);
System.out.println("integerJavaRDD = " + integerJavaRDD.glom().collect());

//创建一个单类型且类型为Double的JavaRDD
JavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles(Arrays.asList(2.0, 3.3, 5.6));
System.out.println("doubleJavaDoubleRDD = " + doubleJavaDoubleRDD.collect());

//创建一个key-value类型的RDD
import scala.Tuple2;
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
System.out.println("javaPairRDD = " + javaPairRDD.collect());

注:对于第三种情况,scala中还提供了makeRDD api,这个api可以指定创建RDD每一个分区所在的机器,这个api的原理详见spark core RDD scala api

二、单类型RDD基本的transformation api

先基于内存中的数据创建一个RDD

JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
  1. map操作,表示对integerJavaRDD的每一个元素应用我们自定义的函数接口,如下是将每一个元素加1:
JavaRDD<Integer> mapRDD = integerJavaRDD.map(new Function<Integer, Integer>() {
    @Override
    public Integer call(Integer element) throws Exception {
        return element + 1;
    }
});
//结果:[2, 3, 4, 4]
System.out.println("mapRDD = " + mapRDD.collect());

需要注意的是,map操作可以返回与RDD不同类型的数据,如下,返回一个自定义的User对象:

public class User implements Serializable {
    private String userId;

    private Integer amount;

    public User(String userId, Integer amount) {
        this.userId = userId;
        this.amount = amount;
    }
    //getter setter....
    @Override
    public String toString() {
        return "User{" +
                "userId=‘" + userId + ‘\‘‘ +
                ", amount=" + amount +
                ‘}‘;
    }
}
JavaRDD<User> userJavaRDD = integerJavaRDD.map(new Function<Integer, User>() {
    @Override
    public User call(Integer element) throws Exception {
        if (element < 3) {
            return new User("小于3", 22);
        } else {
            return new User("大于3", 23);
        }
    }
});
//结果:[User{userId=‘小于3‘, amount=22}, User{userId=‘小于3‘, amount=22}, User{userId=‘大于3‘, amount=23}, User{userId=‘大于3‘, amount=23}]
System.out.println("userJavaRDD = " + userJavaRDD.collect());

2.  flatMap操作,对integerJavaRDD的每一个元素应用我们自定义的FlatMapFunction,这个函数的输出是一个数据列表,flatMap会对这些输出的数据列表进行展平

JavaRDD<Integer> flatMapJavaRDD = integerJavaRDD.flatMap(new FlatMapFunction<Integer, Integer>() {
    @Override
    public Iterator<Integer> call(Integer element) throws Exception {
        //输出一个list,这个list里的元素是0到element
        List<Integer> list = new ArrayList<>();
        int i = 0;
        while (i <= element) {
            list.add(i);
            i++;
        }
        return list.iterator();
    }
});
//结果: [0, 1, 0, 1, 2, 0, 1, 2, 3, 0, 1, 2, 3]
System.out.println("flatMapJavaRDD = " + flatMapJavaRDD.collect());

3.  filter操作,对integerJavaRDD的每一个元素应用我们自定义的过滤函数,过滤掉我们不需要的元素,如下,过滤掉不等于1的元素:

JavaRDD<Integer> filterJavaRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) throws Exception {
        return integer != 1;
    }
});
//结果为:[2, 3, 3]
System.out.println("filterJavaRDD = " + filterJavaRDD.collect());

4.  glom操作,查看integerJavaRDD每一个分区对应的元素数据

JavaRDD<List<Integer>> glomRDD = integerJavaRDD.glom();
//结果: [[1, 2], [3, 3]], 说明integerJavaRDD有两个分区,第一个分区中有数据1和2,第二个分区中有数据3和3
System.out.println("glomRDD = " + glomRDD.collect());

5.  mapPartitions操作,对integerJavaRDD的每一个分区的数据应用我们自定义的函数接口方法,假设我们需要为每一个元素加上一个初始值,而这个初始值的获取又是非常耗时的,这个时候用mapPartitions会有非常大的优势,如下:

//这是一个初始值获取的方法,是一个比较耗时的方法
public static Integer getInitNumber(String source) {
    System.out.println("get init number from " + source + ", may be take much time........");
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return 1;
}

JavaRDD<Integer> mapPartitionTestRDD = integerJavaRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Integer>() {
    @Override
    public Iterator<Integer> call(Iterator<Integer> integerIterator) throws Exception {
        //每一个分区获取一次初始值,integerJavaRDD有两个分区,那么会调用两次getInitNumber方法
        //所以对应需要初始化的比较耗时的操作,比如初始化数据库的连接等,一般都是用mapPartitions来为对每一个分区初始化一次,而不要去使用map操作
        Integer initNumber = getInitNumber("mapPartitions");

        List<Integer> list = new ArrayList<>();
        while (integerIterator.hasNext()) {
            list.add(integerIterator.next() + initNumber);
        }
        return list.iterator();
    }
});
//结果为: [2, 3, 4, 4]
System.out.println("mapPartitionTestRDD = " + mapPartitionTestRDD.collect());

JavaRDD<Integer> mapInitNumberRDD = integerJavaRDD.map(new Function<Integer, Integer>() {
    @Override
    public Integer call(Integer integer) throws Exception {
        //遍历每一个元素的时候都会去获取初始值,这个integerJavaRDD含有4个元素,那么这个getInitNumber方法会被调用4次,严重的影响了时间,不如mapPartitions性能好
        Integer initNumber = getInitNumber("map");
        return integer + initNumber;
    }
});
//结果为:[2, 3, 4, 4]
System.out.println("mapInitNumberRDD = " + mapInitNumberRDD.collect());

6.  mapPartitionsWithIndex操作,对integerJavaRDD的每一个分区的数据应用我们自定义的函数接口方法,在应用函数接口方法的时候带上了分区信息,即知道你当前处理的是第几个分区的数据

JavaRDD<Integer> mapPartitionWithIndex = integerJavaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Integer>>() {
    @Override
    public Iterator<Integer> call(Integer partitionId, Iterator<Integer> integerIterator) throws Exception {
        //partitionId表示当前处理的第几个分区的信息
        System.out.println("partition id = " + partitionId);
        List<Integer> list = new ArrayList<>();
        while (integerIterator.hasNext()) {
            list.add(integerIterator.next() + partitionId);
        }
        return list.iterator();
    }
}, false);
//结果 [1, 2, 4, 4]
System.out.println("mapPartitionWithIndex = " + mapPartitionWithIndex.collect());

三、采样Api

先基于内存中的数据创建一个RDD

JavaRDD<Integer> listRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3), 2);
  1. sample
//第一个参数为withReplacement
//如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现
//如果withReplacement=false的话表示无放回的抽样,采用伯努利抽样算法实现

//第二个参数为:fraction,表示每一个元素被抽取为样本的概率,并不是表示需要抽取的数据量的因子
//比如从100个数据中抽样,fraction=0.2,并不是表示需要抽取100 * 0.2 = 20个数据,
//而是表示100个元素的被抽取为样本概率为0.2;样本的大小并不是固定的,而是服从二项分布
//当withReplacement=true的时候fraction>=0
//当withReplacement=false的时候 0 < fraction < 1

//第三个参数为:reed表示生成随机数的种子,即根据这个reed为rdd的每一个分区生成一个随机种子
JavaRDD<Integer> sampleRDD = listRDD.sample(false, 0.5, 100);
//结果: [1, 3]
System.out.println("sampleRDD = " + sampleRDD.collect());

2.  randomSplit

//按照权重对RDD进行随机抽样切分,有几个权重就切分成几个RDD
//随机抽样采用伯努利抽样算法实现, 以下是有两个权重,则会切成两个RDD
JavaRDD<Integer>[] splitRDDs = listRDD.randomSplit(new double[]{0.4, 0.6});
//结果为2
System.out.println("splitRDDs.length = " + splitRDDs.length);
//结果为[2, 3] 结果是不定的
System.out.println("splitRDD(0) = " + splitRDDs[0].collect());
//结果为[1, 3] 结果是不定的 
System.out.println("splitRDD(1) = " + splitRDDs[1].collect());

3.  takeSample

//随机抽样指定数量的样本数据
//第一个参数为withReplacement
//如果withReplacement=true的话表示有放回的抽样,采用泊松抽样算法实现
//如果withReplacement=false的话表示无放回的抽样,采用伯努利抽样算法实现
//第二个参数指定多少,则返回多少个样本数
结果为[2, 3]
System.out.println(listRDD.takeSample(false, 2));

4. 分层采样,对key-value类型的RDD进行采样

//创建一个key value类型的RDD
import scala.Tuple2;
JavaPairRDD<String, Integer> javaPairRDD =
        sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3),
                new Tuple2("kkk", 3), new Tuple2("kkk", 3)));
//定义每一个key的采样因子
Map<String, Double> fractions = new HashMap<>();
fractions.put("test", 0.5);
fractions.put("kkk", 0.4);
//对每一个key进行采样
//结果为 [(test,3), (kkk,3)]
//sampleByKey 并不对过滤全量数据,因此只得到近似值
System.out.println(javaPairRDD.sampleByKey(true, fractions).collect());
//结果为 [(test,3), (kkk,3)]
//sampleByKeyExtra 会对全量数据做采样计算,因此耗费大量的计算资源,但是结果会更准确。
System.out.println(javaPairRDD.sampleByKeyExact(true, fractions).collect());

抽样的原理详细可以参考:spark core RDD api。这些原理性的东西用文字不太好表述

四、pipe,表示在RDD执行流中的某一步执行其他的脚本,比如python或者shell脚本等

JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hi", "hello", "how", "are", "you"), 2);

//启动echo.py需要的环境变量
Map<String, String> env = new HashMap<>();
env.put("env", "envtest");

List<String> commands = new ArrayList<>();
commands.add("python");
//如果是在真实的spark集群中,那么要求echo.py在集群的每一台机器的同一个目录下面都要有
commands.add("/Users/tangweiqun/spark/source/spark-course/spark-rdd-java/src/main/resources/echo.py");

JavaRDD<String> result = dataRDD.pipe(commands, env);
//结果为: [slave1-hi-envtest, slave1-hello-envtest, slave1-how-envtest, slave1-are-envtest, slave1-you-envtest]
System.out.println(result.collect());

echo.py的内容如下:

import sys
import os

#input = "test"
input = sys.stdin
env_keys = os.environ.keys()
env = ""
if "env" in env_keys:
   env = os.environ["env"]
for ele in input:
   output = "slave1-" + ele.strip(‘\n‘) + "-" + env
       print (output)

input.close

对于pipe的原理,以及怎么实现的,参考spark core RDD api,这个里面还清楚的讲述了怎么消除手工将脚本拷贝到每一台机器中的工作

时间: 2024-08-07 01:52:26

spark2.x由浅入深深到底系列六之RDD java api详解一的相关文章

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark 本文对join相关的api做了一个解释 SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, Integer> javaPa

spark2.x由浅入深深到底系列六之RDD java api详解二

package com.twq.javaapi.java7; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.funct

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext 一.简单实现scala版本的RDD和SparkContext class RDD[T](value: Seq[T]) {   //RDD的map操作   def map[U](f: T => U): RDD[U] = {     new RDD(value.map(f))   

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark 以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库: package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; imp

spark2.x由浅入深深到底系列七之RDD python api详解一

学习spark任何技术之前,请先正确理解spark,可以参考:正确理解spark 以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了python api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: """ 创建RDD的方法: 1: 从一个稳定的存储系统中,比如hdfs文件, 或者本地文件系统 """

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark 我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口.接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现: 一.非lambda实现的java spark

spark2.x由浅入深深到底系列五之python开发spark环境配置

学习spark任何的技术前,请先正确理解spark,可以参考: 正确理解spark 以下是在mac操作系统上配置用python开发spark的环境 一.安装python spark2.2.0需要python的版本是Python2.6+ 或者 Python3.4+ 可以参考: http://jingyan.baidu.com/article/7908e85c78c743af491ad261.html 二.下载spark编译包并配置环境变量 1.在官网中: http://spark.apache.o

reactjs入门到实战(六)---- ReactJS组件API详解

全局的api 1.React.createClass 创建一个组件类,并作出定义.组件实现了 render() 方法,该方法返回一个子级.该子级可能包含很深的子级结构.组件与标准原型类的不同之处在于,你不需要使用 new 来实例化. 组件是一种很方便的封装,可以(通过 new )为你创建后台实例. 2.React.createElement ReactElement createElement( string/ReactClass type, [object props], [children