Spark (Python版) 零基础学习笔记(二)—— Spark Transformations总结及举例

1. map(func) 
func函数作用到数据集的每个元素,生成一个新的分布式的数据集并返回

1 >>> a = sc.parallelize((‘a‘, ‘b‘, ‘c‘))
2 >>> a.map(lambda x: x+‘1‘).collect()
3 [‘a1‘, ‘b1‘, ‘c1‘]

2. filter(func) 
选出所有func返回值为true的元素,作为一个新的数据集返回

1 >>> a = sc.parallelize(range(10))
2 >>> a.filter(lambda x: x%2==0).collect()  # 选出0-9的偶数
3 [0, 2, 4, 6, 8]

3. flatMap(func) 
与map相似,但是每个输入的item能够被map到0个或者更多的items输出,也就是说func的返回值应当是一个Sequence,而不是一个单独的item

1 >>> l = [‘I am Tom‘, ‘She is Jenny‘, ‘He is Ben‘]
2 >>> a = sc.parallelize(l,3)
3 >>> a.flatMap(lambda line: line.split()).collect()  # 将每个字符串中的单词划分出来
4 [‘I‘, ‘am‘, ‘Tom‘, ‘She‘, ‘is‘, ‘Jenny‘, ‘He‘, ‘is‘, ‘Ben‘]

4. mapPartitions(func) 
与map相似,但是mapPartitions的输入函数单独作用于RDD的每个分区(block)上,因此func的输入和返回值都必须是迭代器iterator。 
例如:假设RDD有十个元素0~9,分成三个区,使用mapPartitions返回每个元素的平方。如果使用map方法,map中的输入函数会被调用10次,而使用mapPartitions方法,输入函数只会被调用3次,每个分区被调用1次。

1 >>> def squareFunc(a):
2 . . .     for i in a:
3 . . .         yield i*i
4 . . .
5 >>> a = sc.parallelize(range(10), 3)
6 PythonRDD[1] at RDD at PythonRDD.scala:48
7 >>> a.mapPartitions(squareFunc).collect()
8 [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

5. mapPartitionsWithIndex(func) 
与mapPartitions相似,但是输入函数func提供了一个正式的参数,可以用来表示分区的编号。

 1 >>> def func(index, iterator):  # 返回每个分区的编号和数值
 2 . . .     yield (‘index ‘ + str(index) + ’ is: ‘ + str(list(iterator)))
 3 . . .
 4 >>> a = sc.parallelize(range(10),3)
 5 >>> a.mapPartitionsWithIndex(func).collect()
 6 [‘index 0 is: [0, 1, 2]‘, ‘index 1 is: [3, 4, 5]‘, ‘index 2 is: [6, 7, 8, 9]‘]
 7 >>> def squareIndex(index, iterator):  # 返回每个数值所属分区的编号和数值的平方
 8 ...     for i in iterator:
 9 ...         yield ("The index is: " + str(index) + ", and the square is: " + str(i*i))
10 ...
11 >>> a.mapPartitionsWithIndex(squareIndex).collect()
12 [‘The index is: 0, and the square is: 0‘, ‘The index is: 0, and the square is: 1‘, ‘The index is: 1, and the square is: 4‘, ‘The index is: 1, and the square is: 9‘, ‘The index is: 1, and the square is: 16‘, ‘The index is: 2, and the square is: 25‘, ‘The index is: 2, and the square is: 36‘, ‘The index is: 3, and the square is: 49‘, ‘The index is: 3, and the square is: 64‘, ‘The index is: 3, and the square is: 81‘]

6. sample(withReplacementfractionseed) 
从数据中抽样,withReplacement表示是否有放回,withReplacement=true表示有放回抽样,fraction为抽样的概率(0<=fraction<=1),seed为随机种子。 
例如:从1-100之间抽取样本,被抽取为样本的概率为0.2

1 >>> data = sc.parallelize(range(1,101),2)
2 >>> sample = data.sample(True, 0.2)
3 >>> sampleData.count()
4 19
5 >>> sampleData.collect()
6 [16, 19, 24, 29, 32, 33, 44, 45, 55, 56, 56, 57, 65, 65, 73, 83, 84, 92, 96]

!!!注意,Spark中的sample抽样,当withReplacement=True时,相当于采用的是泊松抽样;当withReplacement=False时,相当于采用伯努利抽样,fraction并不是表示抽样得到的样本占原来数据总量的百分比,而是一个元素被抽取为样本的概率。fraction=0.2并不是说明要抽出100个数字中20%的数据作为样本,而是每个数字被抽取为样本的概率为0.2,这些数字被认为来自同一总体,样本的大小并不是固定的,而是服从二项分布。

7. union(otherDataset) 
并集操作,将源数据集与union中的输入数据集取并集,默认保留重复元素(如果不保留重复元素,可以利用distinct操作去除,下边介绍distinct时会介绍)。

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).collect()
4 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 12, 13, 14]

8. intersection(otherDataset) 
交集操作,将源数据集与union中的输入数据集取交集,并返回新的数据集。

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.intersection(data2).collect()
4 [8, 9, 6, 7]

9. distinct([numTasks]) 
去除数据集中的重复元素。

1 >>> data1 = sc.parallelize(range(10))
2 >>> data2 = sc.parallelize(range(6,15))
3 >>> data1.union(data2).distinct().collect()
4 [0, 8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

下边的一系列transactions会用的键(Key)这一概念,在进行下列有关Key操作时使用的数据集为记录伦敦各个片区(英文称为ward)中学校和学生人数相关信息的表格,下载地址: 
https://data.london.gov.uk/dataset/london-schools-atlas/resource/64f771ee-38b1-4eff-8cd2-e9ba31b90685# 
下载后将其中命名为WardtoSecSchool_LDS_2015的sheet里边的数据保存为csv格式,删除第一行的表头,并重新命名为school.csv 
数据格式为: 
(Ward_CODE, Ward_NAME, TotalWardPupils, Ward2Sec_Flow_No., Secondary_School_URN, Secondary_School_Name, Pupil_count) 
首先对数据进行一些预处理:

1 >>> school = sc.textFile("file:///home/yang/下载/school.csv")
2 Data = sc.textFile("file:///home/yang/下载/school.csv")
3 >>> school.count()  # 共有16796行数据
4 16796
5 >>> import re  # 引入python的正则表达式包
6 >>> rows = school.map(lambda line: re.subn(‘,[\s]+‘,‘: ‘, line))

注意:1. 从本地读取数据时,代码中要通过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件,需要先上传文件到 HDFS 中,否则会有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/school.csv”的错误。 
2. 对数据集进行了一下预处理,利用正则匹配替换字符串,由于一些学校的名字的字符串中本身含有逗号,比如“The City Academy, Hackney”, 此时如果利用csv的分隔符’,’进行分割,并不能将名字分割为“The City Academy”和“Hackney”。我们注意到csv的分隔符逗号后边是没有空格的,而名字里边的逗号后边都会有空格(英语书写习惯),因此,先利用re.subn语句对逗号后边含有至少一个空格(正则表达式为’,[\s]+’)的子字符串进行替换,替换为’: ’,然后再进行后续操作。以上即为对这一数据集的预处理过程。

10. groupByKey([numTasks]) 
作用于由键值对(K, V)组成的数据集上,将Key相同的数据放在一起,返回一个由键值对(K, Iterable)组成的数据集。 
注意:1. 如果这一操作是为了后续在每个键上进行聚集(aggregation),比如sum或者average,此时使用reduceByKey或者aggregateByKey的效率更高。2. 默认情况下,输出的并行程度取决于RDD分区的数量,但也可以通过给可选参数numTasks赋值来调整并发任务的数量。

1 >>> newRows = rows.map(lambda r: r[0].split(‘,‘))
2 >>> ward_schoolname = newRows .map(lambda r: (r[1], r[5])).groupByKey()  # r[1]为ward的名字,r[5]为学校的名字
3 >>> ward_schoolname.map(lambda x: {x[0]: list(x[1])}).collect()  # 列出每个ward区域内所有的学校的名字
4 [{‘Stifford Clays‘: [‘William Edwards School‘, ‘Brentwood County High School‘, "The Coopers‘ Company and Coborn School", ‘Becket Keys Church of England Free School‘, ...] # 输出结果为在Stifford Clays这个ward里的学校有William Edwards School,Brentwood County High School,The Coopers‘ Company and Coborn School等等...

11. reduceByKey(func, [numTasks]) 
作用于键值对(K, V)上,按Key分组,然后将Key相同的键值对的Value都执行func操作,得到一个值,注意func的类型必须满足

1 >>> pupils = newRows.map(lambda r: (r[1], int(r[6])))  # r[1]为ward的名字,r[6]为每个学校的学生数
2 >>> ward_pupils = pupils.reduceByKey(lambda x, y: x+y)   # 计算各个ward中的学生数
3 >>> ward_pupils.collect()  # 输出各个ward中的学生数
4 [(‘Stifford Clays‘, 1566), (‘Shenley‘, 1625), (‘Southbury‘, 3526), (‘Rainham and Wennington‘, 769), (‘Bromley Town‘, 574), (‘Waltham Abbey Honey Lane‘, 835), (‘Telegraph Hill‘, 1238), (‘Chigwell Village‘, 1506), (‘Gooshays‘, 2097), (‘Edgware‘, 2585), (‘Camberwell Green‘, 1374), (‘Glyndon‘, 4633),...]

12. aggregateByKey(zeroValueseqOpcomOp, [numTasks]) 
在于键值对(K, V)的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seqOp函数的参数,进行计算,返回的结果作为一个新的键值对(K, V),然后再将结果按照key进行合并,最后将每个分组的value传递给comOp函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给comOp函数,以此类推),将key与计算结果作为一个新的键值对(K, V)输出。 
例子: 上述统计ward内学生人数的操作也可以通过aggregateByKey实现,此时,seqOpcomOp都是进行加法操作,代码如下:

1 >>> ward_pupils = pupils.aggregateByKey(0, lambda x, y: x+y, lambda x, y: x+y)
2 >>> ward_pupils.collect()
3 [(‘Stifford Clays‘, 1566), (‘Shenley‘, 1625), (‘Southbury‘, 3526), (‘Rainham and Wennington‘, 769), (‘Bromley Town‘, 574), (‘Waltham Abbey Honey Lane‘, 835), (‘Telegraph Hill‘, 1238), (‘Chigwell Village‘, 1506), (‘Gooshays‘, 2097), (‘Edgware‘, 2585), (‘Camberwell Green‘, 1374), (‘Glyndon‘, 4633),...]

13. sortByKey([ascending=True], [numTasks]) 
按照Key进行排序,ascending的值默认为True,True/False表示升序还是降序 
例如:将上述ward按照ward名字降序排列,打印出前十个

1 >>> ward_pupils.sortByKey(False, 4).take(10)
2 [(‘Yiewsley‘, 2560), (‘Wormholt and White City‘, 1455), (‘Woodside‘, 1204), (‘Woodhouse‘, 2930), (‘Woodcote‘, 1214), (‘Winchmore Hill‘, 1116), (‘Wilmington‘, 2243), (‘Willesden Green‘, 1896), (‘Whitefoot‘, 676), (‘Whalebone‘, 2294)]

14. join(otherDataset, [numTasks]) 
类似于SQL中的连接操作,即作用于键值对(K, V)和(K, W)上,返回元组 (K, (V, W)),spark也支持外连接,包括leftOuterJoin,rightOuterJoin和fullOuterJoin。例子:

 1 >>> class1 = sc.parallelize((‘Tom‘, ‘Jenny‘, ‘Bob‘)).map(lambda a: (a, ‘attended‘))
 2 >>> class2 = sc.parallelize((‘Tom‘, ‘Amy‘, ‘Alice‘, ‘John‘)).map(lambda a: (a, ‘attended‘))
 3 >>> class1.join(class2).collect()
 4 [(‘Tom‘, (‘attended‘, ‘attended‘))]
 5 >>> class1.leftOuterJoin(class2).collect()
 6 [(‘Tom‘, (‘attended‘, ‘attended‘)), (‘Jenny‘, (‘attended‘, None)), (‘Bob‘, (‘attended‘, None))]
 7 >>> class1.rightOuterJoin(class2).collect()
 8 [(‘John‘, (None, ‘attended‘)), (‘Tom‘, (‘attended‘, ‘attended‘)), (‘Amy‘, (None, ‘attended‘)), (‘Alice‘, (None, ‘attended‘))]
 9 >>> class1.fullOuterJoin(class2).collect()
10 [(‘John‘, (None, ‘attended‘)), (‘Tom‘, (‘attended‘, ‘attended‘)), (‘Jenny‘, (‘attended‘, None)), (‘Bob‘, (‘attended‘, None)), (‘Amy‘, (None, ‘attended‘)), (‘Alice‘, (None, ‘attended‘))]

15. cogroup(otherDataset, [numTasks]) 
作用于键值对(K, V)和(K, W)上,返回元组 (K, (Iterable, Iterable))。这一操作可叫做groupWith。

1 >>> class1 = sc.parallelize((‘Tom‘, ‘Jenny‘, ‘Bob‘)).map(lambda a: (a, ‘attended‘))
2 >>> class2 = sc.parallelize((‘Tom‘, ‘Amy‘, ‘Alice‘, ‘John‘)).map(lambda a: (a, ‘attended‘))
3 >>> group = class1.cogroup(class2)
4 >>> group.collect()
5 [(‘John‘, (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808afd0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a1d0>)), (‘Tom‘, (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a7f0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a048>)), (‘Jenny‘, (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808a9b0>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e808a208>)), (‘Bob‘, (<pyspark.resultiterable.ResultIterable object at 0x7fb7e808ae80>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b448d0>)), (‘Amy‘, (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44c88>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44588>)), (‘Alice‘, (<pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44748>, <pyspark.resultiterable.ResultIterable object at 0x7fb7e8b44f98>))]
6 >>> group.map(lambda x: {x[0]: [list(x[1][0]), list(x[1][1])]}).collect()
7 [{‘John‘: [[], [‘attended‘]]}, {‘Tom‘: [[‘attended‘], [‘attended‘]]}, {‘Jenny‘: [[‘attended‘], []]}, {‘Bob‘: [[‘attended‘], []]}, {‘Amy‘: [[], [‘attended‘]]}, {‘Alice‘: [[], [‘attended‘]]}]

16. cartesian(otherDataset) 
笛卡尔乘积,作用于数据集T和U上,返回(T, U),即数据集中每个元素的两两组合

1 >>> a = sc.parallelize((‘a‘, ‘b‘, ‘c‘))
2 >>> b = sc.parallelize((‘d‘, ‘e‘, ‘f‘))
3 >>> a.cartesian(b).collect()
4 [(‘a‘, ‘d‘), (‘a‘, ‘e‘), (‘a‘, ‘f‘), (‘b‘, ‘d‘), (‘b‘, ‘e‘), (‘b‘, ‘f‘), (‘c‘, ‘d‘), (‘c‘, ‘e‘), (‘c‘, ‘f‘)]

17. pipe(command, [envVars]) 
将驱动程序中的RDD交给shell处理(外部进程),例如Perl或bash脚本。RDD元素作为标准输入传给脚本,脚本处理之后的标准输出会作为新的RDD返回给驱动程序。

18. coalesce(numPartitions) 
将RDD的分区数减小到numPartitions个。当数据集通过过滤规模减小时,使用这个操作可以提升性能。

19. repartition(numPartitions) 
重组数据,数据被重新随机分区为numPartitions个,numPartitions可以比原来大,也可以比原来小,平衡各个分区。这一操作会将整个数据集在网络中重新洗牌。

20. repartitionAndSortWithinPartitions(partitioner) 
根据给定的partitioner函数重新将RDD分区,并在分区内排序。这比先repartition然后在分区内sort高效,原因是这样迫使排序操作被移到了shuffle阶段。

时间: 2024-10-12 10:42:42

Spark (Python版) 零基础学习笔记(二)—— Spark Transformations总结及举例的相关文章

python之数据类型(学习笔记二)

python之数据类型(学习笔记二) 在Python中,能够直接处理的数据类型有以下几种: (1)整数 Python可以处理任意大小的整数,当然包括负整数,在程序中的表示方法和数学上的写法一模一样,例 如: 1 , 100 , ‐8080 , 0 ,等等. 计算机由于使用二进制,所以,有时候用十六进制表示整数比较方便,十六进制用 0x 前缀和0-9,a-f表示,例 如: 0xff00 , 0xa5b4c3d2 ,等等. (2)浮点数 浮点数也就是小数,之所以称为浮点数,是因为按照科学记数法表示时

Spark零基础学习笔记(一)——Python版

由于Scala才刚刚开始学习,还是对python更为熟悉,因此在这记录一下自己的学习过程,主要内容来自于spark的官方帮助文档,这一节的地址为: http://spark.apache.org/docs/latest/quick-start.html 文章主要是翻译了文档的内容,但也在里边加入了一些自己在实际操作中遇到的问题及解决的方案,和一些补充的小知识,一起学习. 环境:Ubuntu 16.04 LTS,Spark 2.0.1, Hadoop 2.7.3, Python 3.5.2, 利用

Java基础学习笔记二十 IO流

转换流 在学习字符流(FileReader.FileWriter)的时候,其中说如果需要指定编码和缓冲区大小时,可以在字节流的基础上,构造一个InputStreamReader或者OutputStreamWriter,这又是什么意思呢? OutputStreamWriter类 查阅OutputStreamWriter的API介绍,OutputStreamWriter 是字符流通向字节流的桥梁:可使用指定的字符编码表,将要写入流中的字符编码成字节.它的作用的就是,将字符串按照指定的编码表转成字节,

Java基础学习笔记二十八 管家婆综合项目

本项目为JAVA基础综合项目,主要包括: 熟练View层.Service层.Dao层之间的方法相互调用操作.熟练dbutils操作数据库表完成增删改查. 项目功能分析 查询账务 多条件组合查询账务 添加账务 编辑账务 删除账务 项目环境搭建 技术选型和jar包介绍 每个项目都要使用一些已经成熟的技术,它们通常是由一些专业组织或团队所提供的开源免费技术.在今后的学习过程中,我们会逐渐对这些专业组织有所了解.本项目中使用的技术如下: apache的commons组件: commons-dbutils

使用JSP实现输出(web基础学习笔记二)

Jsp:Java Server Page 服务器端的Java页面,动态网页技术 jsp注释 显式注释:这种注释客户端是允许看见的;<!--html注释--> 隐式注释:这种注释客户端是看不到的 注释:格式一://注释,单行注释 格式二:/*多行注释*/ 格式三:<%--注释--%>jsp注释 <!-- 这个注释客户端可以可见 --> <%--这个注释客户端看不到 --%> <% out.println("学习jsp输出"); //输

Java基础学习笔记二十一 多线程

多线程介绍 学习多线程之前,我们先要了解几个关于多线程有关的概念.进程:进程指正在运行的程序.确切的来说,当一个程序进入内存运行,即变成一个进程,进程是处于运行过程中的程序,并且具有一定独立功能. 线程:线程是进程中的一个执行单元,负责当前进程中程序的执行,一个进程中至少有一个线程.一个进程中是可以有多个线程的,这个应用程序也可以称之为多线程程序. 简而言之:一个程序运行后至少有一个进程,一个进程中可以包含多个线程 什么是多线程呢?即就是一个程序中有多个线程在同时执行.通过下图来区别单线程程序与

Java基础学习笔记二十二 网络编程

络通信协议 通过计算机网络可以使多台计算机实现连接,位于同一个网络中的计算机在进行连接和通信时需要遵守一定的规则,这就好比在道路中行驶的汽车一定要遵守交通规则一样.在计算机网络中,这些连接和通信的规则被称为网络通信协议,它对数据的传输格式.传输速率.传输步骤等做了统一规定,通信双方必须同时遵守才能完成数据交换. 网络通信协议有很多种,目前应用最广泛的是TCP/IP协议(Transmission Control Protocal/Internet Protoal传输控制协议/英特网互联协议),它是

Java基础学习笔记二十五 MySQL

MySQL 在dos中操作mysql 连接mysql命令: mysql -uroot -p密码 ,连接OK,会出现mysql> 对数据库的操作 创建一个库 create database 库名 create database 库名 character set 编码 mysql> create database mybase; Query OK, 1 row affected (0.00 sec) mysql> show databases; +--------------------+

Java基础学习笔记二十六 JDBC

什么是JDBC JDBC(Java DataBase Connectivity)就是Java数据库连接,说白了就是用Java语言来操作数据库.原来我们操作数据库是在控制台使用SQL语句来操作数据库,JDBC是用Java语言向数据库发送SQL语句. JDBC原理 早期SUN公司的天才们想编写一套可以连接天下所有数据库的API,但是当他们刚刚开始时就发现这是不可完成的任务,因为各个厂商的数据库服务器差异太大了.后来SUN开始与数据库厂商们讨论,最终得出的结论是,由SUN提供一套访问数据库的规范(就是