Spark学习摘记 —— Spark转化操作API归纳

本文参考

在阅读了《Spark快速大数据分析》动物书后,大概了解到了spark常用的api,不过书中并没有给予所有api详细的案例,而且现在spark的最新版本已经上升到了2.4.5,动物书中的spark版本还停留在1.2.0版本,所以就有了这篇文章,在最新的2.4.5版本下测试常用的api

由于spark的惰性计算特性,RDD只有在第一次行动操作中被用到时才会真正进行计算,因此我打算将文章内容分为"转化操作API"和"行动操作API"两部分,同时因为pair RDD(RDD中的元素是键值对)的部分api较为特殊,所以我打算单独再写一篇文章

本文仅介绍转化操作API,前5个api —— map()、flatMap()、filter()、distinct()、sample()是针对一个RDD的转化操作,后续的api —— union()、intersection()、subtract()、cartesion()是针对两个RDD的转化操作

环境

idea + spark 2.4.5 + scala 2.11.12

RDD均通过SparkContext的parallelize()函数创建

map()函数

目的:

将函数应用于RDD中的每个元素,将返回值构成新的RDD

转化前后的RDD的元素类型可以不同(比如经典的WordCount示例中转化为了键值对元素)

代码:

val testList = List(1, 2, 3, 3)
val testRdd = sc.parallelize(testList)
testRdd.map(ele => ele * ele).foreach(ele => print(s"$ele "))
																								

输出:

1 4 9 9

更高效的操作:

每个RDD被分为多个分区,这些分区在集群的不同节点上运行,可以使用mapPartitions()函数,将转化操作作用于每个分区的元素上,这种方法还可以为每个分区创建一个JDBC连接,而不是为每一个元素创建一个连接(此处不做示例)

mapPartitions()函数有两个参数,第一个参数接收一个函数,和map()函数相同,第二个参数为preservesPartitioning,默认值为false,仅当我们对pair RDD进行转化操作,并且没有修改键时设置为true

val testList = List(1, 2, 3, 3)
val testRdd = sc.parallelize(testList)
testRdd.mapPartitions(partition =>

					partition.map(

					ele => {

					ele * ele

					}
)).foreach(ele => print(s"$ele "))

?

flatMap()函数

目的:

将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,我们也常常说成是"压扁"

"压扁"这个词可能听上去不大好理解,我们提供给flatMap()的函数分别应用到RDD的那个元素上,不过返回的不是一个元素,而是一个返回值序列的迭代器,但输出的RDD不是由迭代器组成,得到的是一个包含各个迭代器可以访问的所有元素的RDD

转化前后的RDD的元素类型不变

代码:

val testList = List(1, 2, 3, 3)
val testRdd = sc.parallelize(testList)
testRdd.flatMap(ele => {

					ele.to(5)
}).foreach(ele => print(s"$ele "))
																	

我们也可以手动返回迭代器,这段代码也类似于

val testList = List(Range(1, 6), Range(2, 6), Range(3, 6), Range(3, 6))
val testRdd = sc.parallelize(testList)
testRdd.flatMap(_.iterator).foreach(ele => print(s"$ele "))
																								

输出:

1 2 3 4 5 2 3 4 5 3 4 5 3 4 5

?

filter()函数

目的:

返回一个由传给filter()函数的元素组成的RDD,当函数返回值为true时,保留该元素,可以理解为 "被过滤"出来

代码:

val testList = List(1, 2, 3, 3)
val testRdd = sc.parallelize(testList)
testRdd.filter(ele => ele > 2).foreach(ele => print(s"$ele "))
																									

输出:

3 3

更高效的操作:

通过过滤操作后,RDD中的元素减少,可以在filter()操作后执行coalesce()函数进行分区合并,第一个参数指定分区数,当指定的分区数大于当前RDD的分区数时不会进行合并,当前分区数不变(除非指定第二参数shuffle为true,默认为false),当指定的分区数小于当前的RDD的分区数时会进行合并,并且不会进行shuffle(尽量不要指定极端的情况,如指定合并后的分区数为1)

val testList = List(1, 2, 3, 3)
val testRdd = sc.parallelize(testList)
testRdd.filter(ele => ele > 2).coalesce(5).foreach(ele => print(s"$ele "))

?

distinct()函数

目的:

去重,因为会进行shuffle,所以不推荐此操作

代码:

val testList = List(1, 2, 3, 3)
val testRdd = sc.parallelize(testList)
testRdd.distinct().foreach(ele => print(s"$ele "))
																				

输出:

1 2 3

?

sample()函数

目的:

对RDD进行采样

代码:

val testList = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
val testRdd = sc.parallelize(testList)
testRdd.sample(false, 0.9).foreach(ele => print(s"$ele "))
																								

第一个参数withReplacement指定false时,第二个参数fraction必须为 [ 0 , 1 ] 之间,表示每个元素被选中的可能性

按照该示例,也有人将该函数理解为,从所有元素中抽取90%返回,但是在源码中已经我们可以看到"without replacement: probability that each element is chosen; fraction must be [0, 1]",并且也指明"This is NOT guaranteed to provide exactly the fraction of the count of the given [[RDD]]",因此这种理解方式我认为是错误的

输出:

0 1 2 3 5 6 8 9(不一定)

疑点:

当第一个参数withReplacement指定true时,第二个参数fraction并不要求一定小于1,源码中注释为"with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0"

val testList = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
val testRdd = sc.parallelize(testList)
testRdd.sample(true, 3).foreach(ele => print(s"$ele "))
																								

输出:0 0 1 1 1 1 2 2 2 3 3 3 3 4 4 4 4 4 5 5 5 5 5 5 6 6 7 7 8 9 9

目前不大理解是如何在采样的,希望各位看官大大能在评论区发表看法哈

===============暂时更新到这儿哈==============

?

?

原文地址:https://www.cnblogs.com/kuluo/p/12545374.html

时间: 2024-08-06 00:55:49

Spark学习摘记 —— Spark转化操作API归纳的相关文章

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用, 还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码. Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream. DStream 是随时间推移而收到的数据的序列.在内部,每个时间区间收到

Spark学习9 Spark Streaming流式数据处理组件学习

目录 SparkStreaming相关概念 概述 SparkStreaming的基本数据抽象DStream 处理模式 操作流程中细节 StreamingContext StreamingContext对象的创建 StreamingContext主要用法 输入源 DStream两种转化 无状态转化操作 有状态转化操作 输出操作 实践(最简单的wordCount) 创建StreamingContext对象 创建DStream对象 对DStream对象操纵 SparkStreaming相关概念 概述

Spark学习(一) Spark初识

一.官网介绍 1.什么是Spark 官网地址:http://spark.apache.org/ Apache Spark™是用于大规模数据处理的统一分析引擎. 从右侧最后一条新闻看,Spark也用于AI人工智能 spark是一个实现快速通用的集群计算平台.它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的.低延迟的数据分析应用程序.它扩展了广泛使用的MapReduce计算 模型.高效的支撑更多计算模式,包括交互式查询和流处理.spark的一个主要特点是能够在内存中进

Spark学习之Spark安装

Spark安装 spark运行环境 spark是Scala写的,运行在jvm上,运行环境为java7+ 如果使用Python的API ,需要使用Python2.6+或者Python3.4+ Spark1.6.2  -  Scala 2.10    Spark 2.0.0  -  Scala  2.11 Spark下载 下载地址:http://spark.apache.org/downloads.html 搭建spark,不需要Hadoop,如有Hadoop集群,可下载对应版本解压 Spark目录

【Spark学习】Spark 1.1.0 with CDH5.2 安装部署

[时间]2014年11月18日 [平台]Centos 6.5 [工具]scp [软件]jdk-7u67-linux-x64.rpm spark-worker-1.1.0+cdh5.2.0+56-1.cdh5.2.0.p0.35.el6.noarch.rpm spark-core-1.1.0+cdh5.2.0+56-1.cdh5.2.0.p0.35.el6.noarch.rpm spark-history-server-1.1.0+cdh5.2.0+56-1.cdh5.2.0.p0.35.el6.

Spark学习笔记-Spark Streaming

http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html 在SparkStreaming中如何对数据进行分片 Level of Parallelism in Data Processing Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the computation is not

Spark学习笔记——Spark上数据的获取、处理和准备

数据获得的方式多种多样,常用的公开数据集包括: 1.UCL机器学习知识库:包括近300个不同大小和类型的数据集,可用于分类.回归.聚类和推荐系统任务.数据集列表位于:http://archive.ics.uci.edu/ml/ 2.Amazon AWS公开数据集:包含的通常是大型数据集,可通过Amazon S3访问.这些数据集包括人类基因组项目.Common Crawl网页语料库.维基百科数据和Google Books Ngrams.相关信息可参见:http://aws.amazon.com/p

spark学习笔记-spark集群搭建(7)

安装spark包 1 1.将spark-1.3.0-bin-hadoop2.4.tgz使用WinSCP上传到/usr/local目录下. 2 2.解压缩spark包:tar zxvf spark-1.3.0-bin-hadoop2.4.tgz. 3 3.更改spark目录名:mv spark-1.3.0-bin-hadoop2.4 spark 4 4.设置spark环境变量 5 vi .bashrc 6 export SPARK_HOME=/usr/local/spark 7 export PA

spark学习笔记总结-spark入门资料精化

Spark学习笔记 Spark简介 spark 可以很容易和yarn结合,直接调用HDFS.Hbase上面的数据,和hadoop结合.配置很容易. spark发展迅猛,框架比hadoop更加灵活实用.减少了延时处理,提高性能效率实用灵活性.也可以与hadoop切实相互结合. spark核心部分分为RDD.Spark SQL.Spark Streaming.MLlib.GraphX.Spark R等核心组件解决了很多的大数据问题,其完美的框架日受欢迎.其相应的生态环境包括zepplin等可视化方面