spark 中的RDD编程 -以下基于Java api

1.RDD介绍:

    RDD,弹性分布式数据集,即分布式的元素集合。在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化。

Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。

用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,比如list或者set。

RDD的转化操作都是惰性求值的,这意味着我们对RDD调用转化操作,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操作也是惰性的,数据只会在必要时读取。转化操作和读取操作都有可能多次执行。

2.创建RDD数据集

(1)读取一个外部数据集

JavaRDD<String> lines=sc.textFile(inputFile);

(2)分发对象集合,这里以list为例

List<String> list=new ArrayList<String>();list.add("a");list.add("b");list.add("c");JavaRDD<String> temp=sc.parallelize(list);//上述方式等价于JavaRDD<String> temp2=sc.parallelize(Arrays.asList("a","b","c"));

3.RDD操作

(1)转化操作

用java实现过滤器转化操作:

List<String> list=new ArrayList<String>();//建立列表,列表中包含以下自定义表项list.add("error:a");list.add("error:b");list.add("error:c");list.add("warning:d");list.add("hadppy ending!");//将列表转换为RDD对象JavaRDD<String> lines = sc.parallelize(list);//将RDD对象lines中有error的表项过滤出来,放在RDD对象errorLines中JavaRDD<String> errorLines = lines.filter(        new Function<String, Boolean>() {            public Boolean call(String v1) throws Exception {                return v1.contains("error");            }        });//遍历过滤出来的列表项List<String> errorList = errorLines.collect();for (String line : errorList)    System.out.println(line);

       

输出:

error:a

error:b

error:c

可见,列表list中包含词语error的表项都被正确的过滤出来了。

(2)合并操作

将两个RDD数据集合并为一个RDD数据集

接上述程序示例:

  1. JavaRDD<String> warningLines=lines.filter(
  2. new Function<String, Boolean>() {
  3. public Boolean call(String v1) throws Exception {
  4. return v1.contains("warning");
  5. }
  6. }
  7. );
  8. JavaRDD<String> unionLines=errorLines.union(warningLines);
  9. for(String line :unionLines.collect())
  10. System.out.println(line);

输出:

error:a

error:b

error:c

warning:d

可见,将原始列表项中的所有error项和warning项都过滤出来了。

(3)获取RDD数据集中的部分或者全部元素

①获取RDD数据集中的部分元素   .take(int num)  返回值List<T>

获取RDD数据集中的前num项。

/** * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so * it will be slow if a lot of partitions are required. In that case, use collect() to get the * whole RDD instead. */def take(num: Int): JList[T] 

程序示例:接上

JavaRDD<String> unionLines=errorLines.union(warningLines);

for(String line :unionLines.take(2))    System.out.println(line);

输出:

error:a

error:b

可见,输出了RDD数据集unionLines的前2项

②获取RDD数据集中的全部元素 .collect() 返回值 List<T>

程序示例:

List<String> unions=unionLines.collect();for(String line :unions)    System.out.println(line);

遍历输出RDD数据集unions的每一项

4.向spark传递函数


函数名

实现的方法

用途

Function<T,R>

R call(T)
接收一个输入值并返回一个输出值,用于类似map()和filter()的操作中
Function<T1,T2,R>
R call(T1,T2)

接收两个输入值并返回一个输出值,用于类似aggregate()和fold()等操作中
FlatMapFunction<T,R>
Iterable <R> call(T)

接收一个输入值并返回任意个输出,用于类似flatMap()这样的操作中

①Function<T,R>

JavaRDD<String> errorLines=lines.filter(        new Function<String, Boolean>() {            public Boolean call(String v1)throws Exception {                return v1.contains("error");            }        });

过滤RDD数据集中包含error的表项,新建RDD数据集errorLines

②FlatMapFunction<T,R>

List<String> strLine=new ArrayList<String>();strLine.add("how are you");strLine.add("I am ok");strLine.add("do you love me")JavaRDD<String> input=sc.parallelize(strLine);JavaRDD<String> words=input.flatMap(        new FlatMapFunction<String, String>() {            public Iterable<String> call(String s) throws Exception {                return Arrays.asList(s.split(" "));            }        });

将文本行的单词过滤出来,并将所有的单词保存在RDD数据集words中。

③ Function<T1,T2,R>

List<String> strLine=new ArrayList<String>();strLine.add("how are you");strLine.add("I am ok");strLine.add("do you love me");JavaRDD<String> input=sc.parallelize(strLine);JavaRDD<String> words=input.flatMap(        new FlatMapFunction<String, String>() {            public Iterable<String> call(String s) throws Exception {                return Arrays.asList(s.split(" "));            }        });JavaPairRDD<String,Integer> counts=words.mapToPair(        new PairFunction<String, String, Integer>() {            public Tuple2<String, Integer> call(String s) throws Exception {                return new Tuple2(s, 1);            }        });JavaPairRDD <String,Integer> results=counts.reduceByKey(        new org.apache.spark.api.java.function.Function2<Integer, Integer, Integer>() {            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        }) ;

上述程序是spark中的wordcount实现方式,其中的reduceByKey操作的Function2函数定义了遇到相同的key时,value是如何reduce的->直接将两者的value相加。

*注意:

可以将我们的函数类定义为使用匿名内部类,就像上述程序实现的那样,也可以创建一个具名类,就像这样:

class ContainError implements Function<String,Boolean>{    public Boolean call(String v1) throws Exception {        return v1.contains("error");    }}JavaRDD<String> errorLines=lines.filter(new ContainError());for(String line :errorLines.collect())    System.out.println(line);

具名类也可以有参数,就像上述过滤出含有”error“的表项,我们可以自定义到底含有哪个词语,就像这样,程序就更有普适性了。

5.针对每个元素的转化操作:

转化操作map()接收一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应的元素。关键词:转化

转化操作filter()接受一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。关键词:过滤

示例图如下所示:

①map()

计算RDD中各值的平方

JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));JavaRDD<Integer> result=rdd.map(    new Function<Integer, Integer>() {        public Integer call(Integer v1) throwsException {        return v1*v1;        }    });System.out.println( StringUtils.join(result.collect(),","));

输出:

1,4,9,16

filter()

② 去除RDD集合中值为1的元素:

JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4));JavaRDD<Integer> results=rdd.filter(new Function<Integer, Boolean>() {    public Boolean call(Integer v1) throws Exception {        return v1!=1;        }    });System.out.println(StringUtils.join(results.collect(),","));

结果:

2,3,4

③ 有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫做flatMap()。和map()类似,我们提供给flatMap()的函数被分别应用到了输入的RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可以访问的所有元素的RDD。flatMap()的一个简单用途是将输入的字符串切分成单词,如下所示:

JavaRDD<String> rdd =sc.parallelize(Arrays.asList("hello world","hello you","world i love you"));JavaRDD<String> words=rdd.flatMap(    new FlatMapFunction<String, String>() {        public Iterable<String> call(String s) throws Exception {            return Arrays.asList(s.split(" "));        }    });System.out.println(StringUtils.join(words.collect(),‘\n‘));

输出:

hello

world

hello

you

world

i

love

you

6.集合操作

RDD中的集合操作


函数

用途

RDD1.distinct()
生成一个只包含不同元素的新RDD。需要数据混洗。

RDD1.union(RDD2)
返回一个包含两个RDD中所有元素的RDD

RDD1.intersection(RDD2)
只返回两个RDD中都有的元素

RDD1.substr(RDD2)
返回一个只存在于第一个RDD而不存在于第二个RDD中的所有元素组成的RDD。需要数据混洗。

集合操作对笛卡尔集的处理:


RDD1.cartesian(RDD2)
返回两个RDD数据集的笛卡尔集

程序示例:生成RDD集合{1,2} 和{1,2}的笛卡尔集

JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2));JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2));JavaPairRDD<Integer ,Integer> rdd=rdd1.cartesian(rdd2);for(Tuple2<Integer,Integer> tuple:rdd.collect())    System.out.println(tuple._1()+"->"+tuple._2());

输出:

1->1

1->2

2->1

2->2

7.行动操作

(1)reduce操作

    reduce()接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD进行累加。使用reduce(),可以很方便地计算出RDD中所有元素的总和,元素的个数,以及其他类型的聚合操作。

以下是求RDD数据集所有元素和的程序示例:

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));Integer sum =rdd.reduce(    new Function2<Integer, Integer, Integer>() {        public Integercall(Integer v1, Integer v2) throws Exception {            return v1+v2;        }    });System.out.println(sum.intValue());

输出:55

(2)fold()操作

接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素,也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,*对应的1,或者拼接操作对应的空列表)。

程序实例:

①计算RDD数据集中所有元素的和:

zeroValue=0;//求和时,初始值为0。

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));Integer sum =rdd.fold(0,        new Function2<Integer, Integer, Integer>() {            public Integer call(Integer v1, Integer v2) throws Exception {                return v1+v2;            }        });System.out.println(sum);

②计算RDD数据集中所有元素的积:

zeroValue=1;//求积时,初始值为1。

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));Integer result =rdd.fold(1,        new Function2<Integer, Integer, Integer>() {            public Integer call(Integer v1, Integer v2) throws Exception {                return v1*v2;            }        });System.out.println(result);

(3)aggregate()操作

aggregate()函数返回值类型不必与所操作的RDD类型相同。

与fold()类似,使用aggregate()时,需要提供我们期待返回的类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器。考虑到每个节点是在本地进行累加的,最终,还需要提供第二个函数来将累加器两两合并。

以下是程序实例:

public class AvgCount implements Serializable{public int total;    public int num;    public AvgCount(int total,int num){    this.total=total;    this.num=num;}public double avg(){    return total/(double)num;}static Function2<AvgCount,Integer,AvgCount> addAndCount=new Function2<AvgCount, Integer, AvgCount>() {    public AvgCount call(AvgCount a, Integer x) throws Exception {        a.total+=x;        a.num+=1;        return a;        }};static Function2<AvgCount,AvgCount,AvgCount> combine=    new Function2<AvgCount, AvgCount, AvgCount>() {        public AvgCount call(AvgCount a, AvgCount b) throws Exception {            a.total+=b.total;            a.num+=b.num;            return a;        } };    public static void main(String args[]){

SparkConf conf = new SparkConf().setMaster("local").setAppName("my app");        JavaSparkContext sc = new JavaSparkContext(conf);

AvgCount intial =new AvgCount(0,0);        JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));        AvgCount result=rdd.aggregate(intial,addAndCount,combine);        System.out.println(result.avg());

}

}

这个程序示例可以实现求出RDD对象集的平均数的功能。其中addAndCount将RDD对象集中的元素合并起来放入AvgCount对象之中,combine提供两个AvgCount对象的合并的实现。我们初始化AvgCount(0,0),表示有0个对象,对象的和为0,最终返回的result对象中total中储存了所有元素的和,num储存了元素的个数,这样调用result对象的函数avg()就能够返回最终所需的平均数,即avg=tatal/(double)num。

8.持久化缓存

因为Spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。如果简单地对RDD调用行动操作,Spark每次都会重算RDD以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法常常会多次使用同一组数据。

为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存它们所求出的分区数据。

出于不同的目的,我们可以为RDD选择不同的持久化级别。默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中

不同关键字对应的存储级别表

级别
使用的空间

cpu时间

是否在内存

是否在磁盘

备注

MEMORY_ONLY



直接储存在内存
MEMORY_ONLY_SER




序列化后储存在内存里

MEMORY_AND_DISK

中等

部分

部分
如果数据在内存中放不下,溢写在磁盘上

MEMORY_AND_DISK_SER


部分

部分
数据在内存中放不下,溢写在磁盘中。内存中存放序列化的数据。

DISK_ONLY





直接储存在硬盘里面

程序示例:将RDD数据集持久化在内存中。

JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));rdd.persist(StorageLevel.MEMORY_ONLY());System.out.println(rdd.count());System.out.println(StringUtils.join(rdd.collect(),‘,‘));

RDD还有unpersist()方法,调用该方法可以手动把持久化的RDD从缓存中移除。

9.不同的RDD类型

Java中有两个专门的类JavaDoubleRDD和JavaPairRDD,来处理特殊类型的RDD,这两个类还针对这些类型提供了额外的函数,折让你可以更加了解所发生的一切,但是也显得有些累赘。

要构建这些特殊类型的RDD,需要使用特殊版本的类来替代一般使用的Function类。如果要从T类型的RDD创建出一个DoubleRDD,我们就应当在映射操作中使用DoubleFunction<T>来替代Function<T,Double>。

程序实例:以下是一个求RDD每个对象的平方值的程序实例,将普通的RDD对象转化为DoubleRDD对象,最后调用DoubleRDD对象的max()方法,返回生成的平方值中的最大值。

JavaRDD<Integer> rdd =sc.parallelize(Arrays.asList(1,2,3,4,5));JavaDoubleRDD result=rdd.mapToDouble(    new DoubleFunction<Integer>() {        public double call(Integer integer) throws Exception {            return (double) integer*integer;        }    });System.out.println(result.max());

来自为知笔记(Wiz)

时间: 2024-10-26 03:11:51

spark 中的RDD编程 -以下基于Java api的相关文章

spark core之RDD编程

  spark提供了对数据的核心抽象--弹性分布式数据集(Resilient Distributed Dataset,简称RDD).RDD是一个分布式的数据集合,数据可以跨越集群中的多个机器节点,被分区并行执行.  在spark中,对数据的所有操作不外乎创建RDD.转化已有RDD及调用RDD操作进行求值.spark会自动地将RDD中的数据分发到集群中并行执行. 五大特性 a list of partitions  RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的lis

Spark中的RDD和DataFrame

什么是DataFrame 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格. RDD和DataFrame的区别 DataFrame与RDD的主要区别在于,DataFrame带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型.使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标. RDD,

spark中的RDD以及DAG

今天,我们就先聊一下spark中的DAG以及RDD的相关的内容 1.DAG:有向无环图:有方向,无闭环,代表着数据的流向,这个DAG的边界则是Action方法的执行 2.如何将DAG切分stage,stage切分的依据:有宽依赖的时候要进行切分(shuffle的时候, 也就是数据有网络的传递的时候),则一个wordCount有两个stage, 一个是reduceByKey之前的,一个事reduceByKey之后的(图1), 则我们可以这样的理解,当我们要进行提交上游的数据的时候, 此时我们可以认

.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列

三年前写过基于ConcurrentQueue的异步队列,今天在整理代码的时候发现当时另外一种实现方式-使用BlockingCollection实现,这种方式目前依然在实际项目中使用.关于BlockingCollection的基本使用请查阅MSDN.源码实现 下面直接上代码:(代码已经放到了我的github上) using System; using System.Collections.Concurrent; using System.Collections.Generic; using Sys

结对编程——paperOne基于java web的简易四则运算出题网站

项目成员:张金生     张政 需求分析: 1.要进行四则运算: 2.运算题目随机: 3.进行对错判断: 4.整数运算. 程序概要: 1.用JSP实现: 2.用户可选择题目数量: 3.答题页用表格列出: 4.包含用来填写答案的输入框: 5.答完后点击提交会直接显示相应题目的对错. 实现过程: 数据结构主要用到了数组 题目生成: 1 public String generateQuestion(int numOfOperand, int rangeMin, int rangMax, boolean

深入浅出VC++串口编程之基于Win32 API

1.API描述 在WIN32 API中,串口使用文件方式进行访问,其操作的API基本上与文件操作的API一致. 打开串口 Win32 中用于打开串口的API 函数为CreateFile,其原型为: HANDLE CreateFile ( LPCTSTR lpFileName, //将要打开的串口逻辑名,如COM1 或COM2 DWORD dwAccess, //指定串口访问的类型,可以是读取.写入或两者并列 DWORD dwShareMode, //指定共享属性,由于串口不能共享,该参数必须置为

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

Learning Spark中文版--第三章--RDD编程(1)

? ?本章介绍了Spark用于数据处理的核心抽象概念,具有弹性的分布式数据集(RDD).一个RDD仅仅是一个分布式的元素集合.在Spark中,所有工作都表示为创建新的RDDs.转换现有的RDDs,或者调用RDDs上的操作来计算结果.在底层,Spark自动将数据中包含的数据分发到你的集群中,并将你对它们执行的操作进行并行化.数据科学家和工程师都应该阅读这一章,因为RDDs是Spark的核心概念.我们强烈建议你在这些例子中尝试一些 交互式shell(参见"Spark的Python和Scala she

Java中的网络编程

Java中的网路编程主要是Java的Socket编程,属于JavaEE中的高级的部分,以下内容是对java网路编程的一个小结,代码都是经过编译调试的 C/S程序应用:客户/服务器模式,如QQ客户端,客户端连到服务器上,一个C/S模式的应用必须有两套程序,一个是客户端的程序,一个是服务器程序. B/S程序应用:浏览器/服务器模式,如当下的各种网站都是B/S模式,所有的程序代码都在服务器上,用户通过浏览器去访问. C/S程序分为两种: 基于TCP协议:Socket(套接字), 可靠的编程: A->B