大数据基础教程:创建RDD的二种方式

大数据基础教程:创建RDD的二种方式

1.从集合中创建RDD


val conf = new SparkConf().setAppName("Test").setMaster("local")
      val sc = new SparkContext(conf)
      //这两个方法都有第二参数是一个默认值2  分片数量(partition的数量)
      //scala集合通过makeRDD创建RDD,底层实现也是parallelize
      val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6))
     //scala集合通过parallelize创建RDD
      val rdd2 = sc.parallelize(Array(1,2,3,4,5,6))

2.从外部存储创建RDD


//从外部存储创建RDD
 val rdd3 = sc.textFile("hdfs://hadoop01:8020/word.txt")

RDD编程API

RDD支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的 RDD的操作,比如 map()和 filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作。比如 count() 和 first()。

Spark采用惰性计算模式,RDD只有第一次在一个行动操作中用到时,才会真正计算。Spark可以优化整个计算过程。默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。

Transformation算子

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。


转换


含义


map(func)


返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成


filter(func)


返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成


flatMap(func)


类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)


mapPartitions(func)


类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]


mapPartitionsWithIndex(func)


类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]


sample(withReplacement, fraction, seed)


根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子


union(otherDataset)


对源RDD和参数RDD求并集后返回一个新的RDD


intersection(otherDataset)


对源RDD和参数RDD求交集后返回一个新的RDD


distinct([numTasks]))


对源RDD进行去重后返回一个新的RDD


groupByKey([numTasks])


在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD


reduceByKey(func, [numTasks])


在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置


aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])


相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值zeroValue:中立值,定义返回value的类型,并参与运算seqOp:用来在同一个partition中合并值combOp:用来在不同partiton中合并值


sortByKey([ascending], [numTasks])


在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD


sortBy(func,[ascending], [numTasks])


与sortByKey类似,但是更灵活


join(otherDataset, [numTasks])


在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD


cogroup(otherDataset, [numTasks])


在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD


cartesian(otherDataset)


笛卡尔积


pipe(command, [envVars])


将一些shell命令用于Spark中生成新的RDD


coalesce(numPartitions)


重新分区


repartition(numPartitions)


重新分区


repartitionAndSortWithinPartitions(partitioner)


重新分区和排序

 Action算子

在RDD上运行计算,并返回结果给Driver或写入文件系统


动作


含义


reduce(func)


通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的


collect()


在驱动程序中,以数组的形式返回数据集的所有元素


count()


返回RDD的元素个数


first()


返回RDD的第一个元素(类似于take(1))


take(n)


返回一个由数据集的前n个元素组成的数组


takeSample(withReplacement,num, [seed])


返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子


takeOrdered(n, [ordering])


takeOrdered和top类似,只不过以和top相反的顺序返回元素


saveAsTextFile(path)


将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本


saveAsSequenceFile(path)


将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。


saveAsObjectFile(path)


countByKey()


针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。


foreach(func)


在数据集的每一个元素上,运行函数func进行更新。

RDD支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的 RDD的操作,比如 map()和 filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作。比如 count() 和 first()。

Spark采用惰性计算模式,RDD只有第一次在一个行动操作中用到时,才会真正计算。Spark可以优化整个计算过程。默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。

Transformation算子****

RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这种设计让Spark更加有效率地运行。

转换 含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值zeroValue:中立值,定义返回value的类型,并参与运算seqOp:用来在同一个partition中合并值combOp:用来在不同partiton中合并值
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars]) 将一些shell命令用于Spark中生成新的RDD
coalesce(numPartitions) 重新分区
repartition(numPartitions) 重新分区
repartitionAndSortWithinPartitions(partitioner) 重新分区和排序

** Action算子**

在RDD上运行计算,并返回结果给Driver或写入文件系统

动作 含义
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering]) takeOrdered和top类似,只不过以和top相反的顺序返回元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)  
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

原文地址:https://www.cnblogs.com/gcghcxy/p/11010398.html

时间: 2024-10-07 04:52:57

大数据基础教程:创建RDD的二种方式的相关文章

Java接入Spark之创建RDD的两种方式和操作RDD

首先看看思维导图,我的spark是1.6.1版本,jdk是1.7版本 spark是什么? Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark 部署在大量廉价硬件之上,形成集群. 下载和安装 可以看我之前发表的博客 Spark安装 安装成功后运行示例程序 在spark安装目录下examples/src/main目录中. 运行的一个Java或Scala示例程序,使用bin/run-examp

大数据实践:ODI 和 Twitter (二)

大数据实践:ODI和Twitter(二) 在前面的文章中,我们已经使用flume将数据从twitter抓取到Hive中,现在我们来看看ODI(Oracle Data Integrator)如何在HIVE表中进行逆向工程,打开HIVE模型,然后在逆向工程中选择“新的数据存储”及待逆向的对象,如下: 逆向工程完成之后,得到如下的元数据信息: 上面的操作步骤与普通的关系型数据库一样,没有特殊之处,ODI可以对HIVE的表进行逆向工程,使用RKM Hive, RKM HBase, IKM File to

Kylin大数据 实战 教程

Kylin大数据实战教程链接:https://pan.baidu.com/s/17vuLNQDjBGUirQV_IdXgfg提取码:bj4b 复制这段内容后打开百度网盘手机App,操作更方便哦课程学习地址:https://www.xuetuwuyou.com/course/316请添加链接描述课程出自学途无忧网:www.xuetuwuyou.com咨询QQ:2591905126 本课程为专题课,通过全面讲解Kylin架构原理.分布式集群搭建以及项目案例,让你快速掌握Kylin实时大数据BI技术,

好程序员大数据实用教程之面向对象进阶

好程序员大数据实用教程之面向对象进阶:包的创建与使用 是对一个程序中指定功能的部分代码进行包装 ####构造方法 是一个方法 特殊点: 构造方法没有返回值,不是指的返回值类型是void,而是根本就不写返回值类型 方法名字和类名相同 构造方法不能用static来修饰 构造方法调用的时机: 通俗来讲:是在实例化一个对象的时候调用的 一般情况下,我们在构造方法中做什么: 对对象的某一些属性进行初始化赋值操作 实例化对象的过程: Person xiaoming = new Person(); new :

区块链这些技术与h5房卡斗牛平台出售,大数据基础软件干货不容错过

在IT产业发展中,包括CPU.操作系统h5房卡斗牛平台出售 官网:h5.super-mans.com 企娥:2012035031 vx和tel:17061863513 h5房卡斗牛平台出售在内的基础软硬件地位独特,不但让美国赢得了产业发展的先机,成就了产业巨头,而且因为技术.标准和生态形成的壁垒,主宰了整个产业的发展.错失这几十年的发展机遇,对于企业和国家都是痛心的. 当大数据迎面而来,并有望成就一个巨大的应用和产业机会时,企业和国家都虎视眈眈,不想错再失这一难得的机遇.与传统的IT产业一样,大

Flink视频教程_大数据Flink教程下载

Flink视频教程_大数据Flink教程下载课程下载:https://pan.baidu.com/s/1LXm9W30jt4sufJvJakx5Dw 提取码:mazb 本课程将基于真实的电商分析系统构建,通过Flink实现真正的实时分析,该系统会从无到有一步一步带大家实现,让大家在实操中快速掌握Flink技术. 课程所涵盖的知识点包括Flink.Kafka.Flume.Sqoop.SpringMVC.Redis.HDFS.Mapreduce.Hbase.Hive.SpringBoot.Sprin

django之创建第7-5-第二种传值方式(time/1232/xiaodneg)

1.修改views文件 def foo(request,myID,myName): t = loader.get_template("foo.html") user = {"today": datetime.datetime.now(),"id":myID,"name":myName} c = Context(user) return HttpResponse(t.render(c)) 2.创建foo.html文件 <!

Spark SQL初始化和创建DataFrame的几种方式

一.前述       1.SparkSQL介绍 Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制. SparkSQL支持查询原生的RDD. RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础. 能够在Scala中写SQL语句.支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用.     2.Spark on Hive和Hive on Spa

0166 DOM 之 节点操作: 删除节点,删除留言案例,复制(克隆)节点,动态生成表格案例,创建元素的三种方式,innerHTML和createElement效率对比

1.1.1 删除节点 node.removeChild(child) // 此处的node指 父节点 node.removeChild() 方法: 从 node节点中删除一个子节点,返回删除的节点. <button>删除</button> <ul> <li>熊大</li> <li>熊二</li> <li>光头强</li> </ul> <script> // 1.获取元素 va