Spark DataFrame ETL教程

前言

ETL是 Extract-Transform-Load的缩写,也就是抽取-转换-加载,在数据工作中是非常重要的部分。实际上,ETL就是一个对数据进行批处理的过程,一个ETL程序就是一个批处理脚本,执行时能将一堆数据转化成我们需要的形式。

每个接触过数据批处理的工程师,都走过ETL的流程,只是没有意识到而已。按照ETL过程的框架来重新认识数据批处理,有利于我们更清晰地编写批处理脚本。

在单机范围内的数据量下,使用python的pandas包就可以非常方便地完成数据批处理工作。但当数据量达到1G以上时,pandas处理起来就有些力不从心了,到数据量达到1T以上,只能以分块的方式存储在分布式系统上时,pandas就无能为力了。在当前的技术背景下,典型的场景就是数据存储在Hive on HDFS上。要做ETL,就需要新的工具。Hadoop生态下,原生的工具是MapReduce计算模型,通常用Java编写,比较复杂,每次计算的中间结果也需要进行磁盘存取,非常费时。Spark是一个MPP架构的计算引擎,相比MapReduce,Spark 有DataFrame(又名 Schema RDD), 以表的形式来储存数据,无论是理解还是操作,都更为简单,还支持Python,在许多需要使用函数作参数的场合,非常好用。

本教程将介绍如何使用pyspark.sql模块,操作Spark DataFrame,从Hive中读取数据,经过一系列转换,最后存入Hive中。Spark的DataFrame和pandas的DataFrame的概念很相似,只是操作略有不同,如果读者有pandas的使用经验,很容易就能快速上手。

教程只是为了方便读者快速入门,想要更好地开发Spark程序,仍然需要详细了解Spark的API接口,对python环境下,Hive的ETL来说,研究pyspark.sql模块下的内容就足够了,可以参考官方文档

环境:Spark的API随版本不同会有较大变化,目前比较流行的版本是1.6和2.2,本文使用Spark 1.6.0,语言为Python 2.7。默认数据都储存在Hive中,Hadoop集群带有yarn。

冒烟测试

学习一门语言或者软件的第一步,永远都是冒烟测试。最经典的冒烟测试就是输出Hello World。但对ETL来说,一个打印"Hello World"的Spark程序是没什么用的。所以我们这里讲讲如何打印一张表,这张表有一行数据,列名为t,值为"Hello World"。

Spark的核心是SparkContext,它提供了Spark程序的运行环境。而SqlContext则是由SparkContext产生,提供了对数据库表的访问接口。因为这里数据库的环境是Hive,通常使用SqlContext的派生类HiveContext。在Spark提供的交互式环境中,会在启动时自动创建环境,生成SparkContext和HiveContext的实例。在pyspark的交互式环境中,SparkContext实例名为sc,HiveContext实例名为sqlContext。

交互式操作只在学习和调试时使用,实际工作中还是要靠命令行执行脚本。在脚本中我们就需要自己生成SparkContext和HiveContext了。基本操作代码如下:

# -*- coding: UTF-8 -*-
from pyspark import SparkContext,HiveContext
sc = SparkContext(appName="Hello World") #  appName就是这个Spark程序的名字,在DEBUG时有用
hc = HiveContext(sc)
df = hc.createDataFrame([["Hello World"]],[‘t‘]) # 创建一个DataFrame,第一个参数是数据,一个二维列表,第二个参数是表头,一个列表)
first_cell = df.collect()[0][0] # 取第一个单元格的值
df.show() # 将表打印到屏幕上
print(first_cell)

将这段代码保存成文件hello.py,在终端中进入到该文件所在目录,输入命令spark-submit --master yarn hello.py ,然后就可以看到屏幕上输出如下,冒烟测试就算完成了。

+-----------+
|          t|
+-----------+
|Hello World|
+-----------+
Hello World

指令解释:spark-submit就是spark的执行程序,master yarn是spark-submit的参数,指定yarn作为计算调度的中心。最后hello.py就是我们的ETL程序。

Extract 抽取

ETL的第一步就是从数据源抽取数据,在Spark中就是从Hive里读取数据。

Hive虽然实质上是个MapReduce接口的封装,但从上层抽象模型来看,有最基本的Schema、Table和Column,还有一套类SQL语法,可以说就是一个典型的关系数据库模型,因此在ETL过程中,我们完全可以把Hive当成一个关系数据库来看待。

抽取的常用方法由两种,一种是直接读取Hive表,一种是通过Hive QL读取。

都需要以HiveContext的实例作为入口,结果返回一个Spark DataFrame,为了检查结果,可以使用show方法查看DataFrame的数据。

假设我们有一个名为test 的库,里面有一张表为t1,数据结构如下:

a b c
1 2 3
4 5 6
7 8 9

直接读取Hive表

HiveContext对读取操作提供统一的接口- DataFrameReader,HiveContext的实例的read属性就可以获取到这个接口。

当然,这个接口也能用来读取Hive的数据,read.table就可获取到表的数据,代码如下

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="extract")
hc = HiveContext(sc) # 生成HiveContext实例
t =hc.read.table("test.t1")
t.show() 

Hive QL读取

实质是让HiveContext将HiveQL传给Hive,让Hive执行后,将查询结果封装成Spark DataFrame返回。在处理过程比较简单,或者需要大量设置别名时,比较有用(因为Spark批量设置别名不太方便),但不推荐写太过复杂的Hive QL,因为Hive 执行Hive QL的实质是把Hive QL转成MapReduce执行,在计算效率上是不如Spark的。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="extract")
hc = HiveContext(sc)
hc.sql("use test")
t = hc.sql("select * from t1")
t.show() 

Load 加载

为什么不先讲Trasform呢?因为Trasform的操作很多,先讲Load有助于快速上手完成一个初级的ETL程序。

类似于读取,HiveContext也提供了统一的写接口,名为DataFrameWriter.调用write属性即可获取。

写入的具体方式也很多,不过为了快速上手,只讲最关键的一些东西。

mode 写入方式

如果表已经存在,该如何操作。

  • append 追加: 在尾部追加数据
  • overwrite 覆写: 覆盖原有数据
  • error 错误: 抛出异常
  • ignore忽略 : 自动跳过

因为Hive on HDFS的关系,更新表最快的方式是全表覆写。对于需要更新原有的ETL,一般都是全表重写,只需要追加的,就可以用追加。

format 文件格式

在Hive on HDFS中,数据实质上是以文件的形式保存的。不同的文件格式,在压缩容量、支持数据类型和查询速度上都有所不同。textfile,avro,sequence,parquet,json等。目前我常用的格式是text和parquet,如果不设置文件格式,默认会使用Hive表的文件格式,如果Hive表不存在,则使用Hive表的默认格式textfile

加载新表

了解了上面的操作之后,我们就可以开始写加载部分的代码了,只需要使用一个saveAsTable方法就行了,非常简单。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="load")
hc = HiveContext(sc)
hc.sql("use test")
t1 = hc.sql("select a as a1,b as b1,c as c1 from t1")
t1.write.saveAsTable("test.t2",format="parquet",mode="overwrite") # 将t1的三个列改名后存成t2表
t2.read.table("test.t2")
t2.show()

转换

转换是ETL过程中最复杂的部分,去掉抽取和加载,剩下的全都是转换,包含的内容是非常多的,常见的有筛选、聚合、多列合并或计算,列赋值,根据不同的需要有不同的处理方法。由于Spark的转换操作较为啰嗦,所以推荐把部分简单的操作通过Hive QL的方式,在抽取步骤中交由Hive完成,这样有助于精简代码,提高可读性,降低维度难度。

下面就讲一讲Spark DataFrame 转换部分的基本概念和操作。

向量化编程

对于日常用Java来做数据批处理的工程师来说,可能更习惯用for循环来逐条处理数据。但这样做在操作上是很不方便的,也不太利于阅读理解。在科学计算的语境下,数据总是以DataFrame的形式储存,也就是一张表。数据处理操作通常是对这张表的某些行或者某些列来进行处理。比如,“令t1表的a列中数字大于2的值的,全部都等于2”,或者“给t1表新加一常数列d,值为99”,这样的操作在向量化编程的语境下,就是一个调用API接口的操作,比for循环容易被理解。

可以类比pandas。在pandas中,也主要是通过向量化编程的方式来处理数据,虽然提供了迭代器的接口,可以一行行地读取数据,但一般以表作为修改对象的操作,主要是以API接口来完成,不推荐使用迭代器来做行级修改。一来操作不方便,二来运算速度未必能比优化过的API接口快。

Spark是分布式执行的,数据分散在各个机器上,背后有一套调度系统来控制数据计算负载。如果用for循环来处理,就是把负载都加在了执行脚本的机器上,一般来说执行脚本的机器都是不储存数据的master,实际上这一过程就会导致需要把数据从slave传到master上,无谓地增加了网络负担。所以,在Spark脚本里,严禁使用原生的python for循环来处理SparkData Frame,即使要用,也应该使用Spark提供的API接口。

基本操作对象

在Spark DataFrame语境下,操作对象主要有三个:DataFrame,Row,Column。

  • DataFrame: DataFrame就是一张表,有表头和若干行数据。这张表是一个有序、可迭代的集合。
  • Row:DataFrame 集合中的元素就是Row。每个Row储存一行数据,有相同的属性,这些属性和表头同名。DataFrame没有API接口可以直接获取到某个Row,但可以通过Colect方法获取到Row对象的list,再从中获取指定的Row。
  • Column:Column与数据的实际结构无关,是一个操作上的概念。在实际的转换操作中,绝大多数都是对若干列进行数学运算、拼接、映射等等。取DataFrame中的一列,得到的就是一个Column对象。

事实上,最常用的主要是DataFrame和Column,Row很少用到。其中,DataFrame是核心,一个ETl过程,实质就是从抽取一个DataFrame开始,经过一系列的DataFrame变换,得到一个与目标一致的DataFrame,然后写入到目标数据库中去。Column在其中扮演着中间点的角色,比如取DataFrame的多个列,拼接合成一个新列,然后把这个新列加到原本的DataFrame中去。

基本操作分类

上面提到了,DataFrame是核心操作对象。其实在Spark中,真正意义上的核心操作对象是RDD,一个有序的,分布式储存在内存中的操作对象。DataFrame就是一个特殊的RDD——Schema RDD。所有的DataFrame操作,都可以归类为两种基本操作:转化(Transformation)和行动(action)。转换操作是不会触发Spark的实际计算的,即使转换过程中出现了错误,在执行到这一行代码时,也不会报错。直到执行了行动操作之后,才会真正让Spark执行计算,这时候才会抛出在转化过程中出现的错误。这在DEBU时,尤其是交互式编程环境下,可能会导致问题代码定位错误,需要特别注意。

  • Transform:典型的转换操作有读(read),筛选(filter)、拼接(union)等等,只要这个过程只改变DataFrame的形态,而不需要实际取出DataFrame的数据进行计算,都属于转换。理论上来说,ETL过程中的Transfrom过程,主干流程只会有转换操作,不会有Action操作。
  • Action:典型的动作操作有计数(count),打印表(show),写(write)等,这些操作都需要真正地取出数据,就会触发Spark的计算。

筛选

filter(cond):筛选出满足条件cond的行。cond可以填字符串,格式和SQL中的where子句一样,也可以填Bool类型的Column对象,比如 df[‘a‘]>1。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],[‘a‘,‘b‘,‘c‘])
t1 = df.filter("a > 1 and c < 9")
t1.show() # 输出 4,5,6 这一行
t2 = df.filter( (df[‘b‘]<5) & (df[‘c‘]<8)) # 可以使用&或|对两个bool列进行逻辑运算,但必须要用圆括号括起,限定运算顺序。
t2.show() # 输出 1,2,3 这一行

赋值,加列

withColumn(col_name,col):col_name是列名,col是列值,必须是一个Column对象。

赋值和加列操作是相同的,col_name存在,就是赋值,否则就是加列。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],[‘a‘,‘b‘,‘c‘])
t1 = df.withColumn("c",df[‘c‘]+1)
t1.show() # c的值全都增加了1
t2 = df.withColumn("d",df[‘a‘]+1)
t2.show() # 增加了新一列d

选取列

给列取名

生成Column对象

在赋值的例子里,Column对象是由原DataFrame的Column经过简单的数学运算或逻辑运算得到的,但如果我们想生成一些更特殊的Column呢?比如常数列或者自己定义复杂的规则。

Spark提供了pyspark.sql.functions,含有丰富的接口,其中就有我们需要的东西。篇幅有限,只能介绍一些常用的,更多的还是需要去看官方文档。

常数列

lit(value):value数必须是必须为pyspark.sql.types支持的类型,比如int,double,string,datetime等

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import lit
from datetime import datetime
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],[‘a‘,‘b‘,‘c‘])
t = df.withColumn("constant",lit(datetime(2018,1,1,2,3,4,999)))
t.show(truncate=False)

取整

round、floor:和Python的标准函数用法一致,只是数字换成列名

条件分支

when(cond,value):符合cond就取value,value可以是常数也可以是一个列对象,连续可以接when构成多分支

otherwise(value):接在when后使用,所有不满足when的行都会取value,若不接这一项,则取Null。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import when
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],[‘a‘,‘b‘,‘c‘])
t = df.withColumn("when",when(df[‘a‘]==1,"a=1").when(df[‘b‘]==5,df[‘b‘]%5).otherwise("other"))
t.show() # 生成when列,值分别为 a=1,0,other

日期和时间

current_date():当前日期,返回一个date列

current_timestamp():当前时刻,返回一个timestamp列

date_add(start, days):日期正向偏移,start为开始时间,必须是Column或字符串对象,指向一个date或timestamp列,days为偏移天数。

date_sub(start, days):类似date_add,但是负向偏移。

date_format(date, format): 日期格式化,date为要格式化的时间,必须是Column或字符串对象,指向一个date或timestamp列,days为偏移天数,format为格式化的字符串,具体参考Hive QL的date_format函数。

datediff(end, start):计算天数差

自定义规则

udf(f, returnType=StringType): 自定义处理函数,f为自定义的处理函数,returnType为f的返回类型,必须为pyspark.sql.types支持的类型,如果不填,会默认自动转化为String类型。udf会返回一个函数,可以当做列函数使用。

这在处理逻辑非常复杂时很有用。比如对身份证号进行校验计算,然后取出有效的身份证号的第1,4,10位,这个复杂流程很难用Spark提供的API拼接起来,只能自己写。

作为教程,就不写太复杂的函数了。

自定义函数f的传入参数为列的值。

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,2,3],[4,5,6],[7,8,9]],[‘a‘,‘b‘,‘c‘])
def f(a,b,c):
    r=0
    if a==1:
        r=1
    elif b==5:
        r=2
    return r

col_match = udf(f,IntegerType())
t = df.withColumn("col_match",col_match("a","b","c"))
t.show() # 生成col_match列,值分别为 a=1,2,0

排序

Spark支持多字段,升降序排序。

可以使用orderBy和sort,因为操作比较简单也符合直觉,这里略去例子,详情可以看文档。

聚合

Spark 支持直接聚合,也支持分组聚合。聚合的表达方式非常多,这里仅选取常用的。

直接聚合

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import sum
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],[‘a‘,‘b‘,‘c‘])
t = df.agg(sum("a"))
print(t.collect()[0][0]) # 打印 12

分组聚合

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import sum,max
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
df = hc.createDataFrame([[1,1,3],[4,1,2],[7,2,9]],[‘a‘,‘b‘,‘c‘])
t = df.groupBy("b").agg(sum("a"),max("c"))
t.show() 

输出:

+---+------+------+
|  b|sum(a)|max(c)|
+---+------+------+
|  1|     5|     3|
|  2|     7|     9|
+---+------+------+

窗口函数

有一类分析需求,是需要分组计算,然后把结果集join到原先的DataFrame中,比如通过成绩表,按班计算的学生的成绩排名,加一列到原本的成绩表中。这种分析需求称为窗口分析,比如说每个班,就是一个窗口,在这个窗口中,计算出班级成绩排名,再并到原表中。

这种分析,首先要创建一个窗口,然后再使用窗口函数来进行计算。Spark提供了丰富的窗口函数,可以满足各类分析需求。

创建窗口

使用pyspark.sql.Window对象可以创建一个窗口,最简单的窗口可以什么都没有,但一般不推荐这样做。可以使用partitionBy进行分组,使用orderBy进行排序,比如

from pyspark.sql import Window
window = Window.partitionBy("a").orderBy("b")

排序

rank

# -*- coding: UTF-8 -*-
from pyspark import SparkContext, HiveContext
from pyspark.sql.functions import row_number,rank,desc
from pyspark.sql import Window
sc = SparkContext(appName="transform")
hc = HiveContext(sc)
score = [

> 引用

    [‘a‘,‘a_1‘,90],
    [‘a‘,‘a_2‘,80],

> 引用

    [‘a‘,‘a_3‘,85],
    [‘b‘,‘b_1‘,70],
    [‘b‘,‘b_2‘,80],
    [‘b‘,‘b_3‘,75],
    [‘c‘,‘c_1‘,90]
]
df = hc.createDataFrame(score,[‘class‘,‘student‘,‘score‘])
class_window = Window.partitionBy("class").rowsBetween(-1, 1)(0,1) .orderBy(desc("score"))
class_rank = rank().over(class_window)
class_row_number = row_number().over(class_window)
t = df.withColumn("rank",class_rank)
t.show()
t2 = df.withColumn("row_number",class_row_number)
t2.show() 

缓存

分支节点

原文地址:https://www.cnblogs.com/longfei-aot/p/8325843.html

时间: 2024-10-24 17:47:47

Spark DataFrame ETL教程的相关文章

spark dataframe操作集锦(提取前几行,合并,入库等)

Spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数. 而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到Hive中. 不得不赞叹dataframe的强大. 具体示例:为了得到样本均衡的训练集,需要对两个数据集中各取相同的训练样本数目来组成,因此用到了这

spark dataframe unionall

今天本来想写一个spark dataframe unionall的demo,由于粗心报下面错误: Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 3 columns and the right has 4; at o

Spark 数据ETL

Spark 数据ETL 说明 1.本文翻译自<Machine Learning with Spark>书中第三章第3,4节内容. 2.本文一些内容基于http://blog.csdn.net/u011204847/article/details/51224383. 3.大家如果有看不懂的地方可以参考原书(网上可以搜到). 数据处理以及转化 1.当我们完成了一些对数据集的探索和分析,我们知道了一些关于用户数据以及电影数据的特征,接下来我们该做些什么呢? 2.为了让原始数据能够在机器学习算法中变得

spark DataFrame 常见操作

spark dataframe派生于RDD类,但是提供了非常强大的数据操作功能.当然主要对类SQL的支持. 在实际工作中会遇到这样的情况,主要是会进行两个数据集的筛选.合并,重新入库. 首先加载数据集,然后在提取数据集的前几行过程中,才找到limit的函数. 而合并就用到union函数,重新入库,就是registerTemple注册成表,再进行写入到HIVE中. 不得不赞叹dataframe的强大. 具体示例:为了得到样本均衡的训练集,需要对两个数据集中各取相同的训练样本数目来组成,因此用到了这

Spark DataFrame 数据框空值判断和处理

scala> val data1 = data.toDF("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating") data1: org.apache.spark

spark&amp;dataframe

1.今天,我们来介绍spark以及dataframe的相关的知识点,但是在此之前先说一下对以前的hadoop的一些理解 当我启动hadoop的时候,上面有hdfs的存储结构,由于这个是分布式存储,所以当一个节点挂了之后,此后由于 还有别的机器上存储这些block块(这里面你肯定要问了,我们怎么知道它挂了,其实我前面关于akaka的时候rpc 通信的机制,心跳机制),所以这个是我们选择它的理由之一,还有一个原因我们可以进行无限扩容,是因为当我们 使用zookeeper进行管理这些datanode的

spark dataframe 类型转换

读一张表,对其进行二值化特征转换.可以二值化要求输入类型必须double类型,类型怎么转换呢? 直接利用spark column 就可以进行转换: DataFrame dataset = hive.sql("select age,sex,race from hive_race_sex_bucktizer "); /** * 类型转换 */ dataset = dataset.select(dataset.col("age").cast(DoubleType).as(

Spark DataFrame小试牛刀

三月中旬,Spark发布了最新的1.3.0版本,其中最重要的变化,便是DataFrame这个API的推出.DataFrame让Spark具备了处理大规模结构化数据的能力,在比原有的RDD转化方式易用的前提下,计算性能更还快了两倍.这一个小小的API,隐含着Spark希望大一统「大数据江湖」的野心和决心.DataFrame像是一条联结所有主流数据源并自动转化为可并行处理格式的水渠,通过它Spark能取悦大数据生态链上的所有玩家,无论是善用R的数据科学家,惯用SQL的商业分析师,还是在意效率和实时性

Spark DataFrame写入HBase的常用方式

Spark是目前最流行的分布式计算框架,而HBase则是在HDFS之上的列式分布式存储引擎,基于Spark做离线或者实时计算,数据结果保存在HBase中是目前很流行的做法.例如用户画像.单品画像.推荐系统等都可以用HBase作为存储媒介,供客户端使用. 因此Spark如何向HBase中写数据就成为很重要的一个环节了.本文将会介绍三种写入的方式,其中一种还在期待中,暂且官网即可... 代码在spark 2.2.0版本亲测 1. 基于HBase API批量写入 第一种是最简单的使用方式了,就是基于R