Spark SQL  inferSchema实现原理探微(Python)

使用Spark SQL的基础是“注册”(Register)若干表,表的一个重要组成部分就是模式,Spark SQL提供两种选项供用户选择:

(1)applySchema

applySchema的方式需要用户编码显示指定模式,优点:数据类型明确,缺点:多表时有一定的代码工作量。

(2)inferSchema

inferSchema的方式无需用户编码显示指定模式,而是系统自动推断模式,代码比较简洁,但既然是推断,就可能出现推断错误(即与用户期望的数据类型不匹配的情况),所以我们需要对其推断过程有清晰的认识,才能在实际应用中更好的应用。

本文仅仅针对Python(spark-1.5.1)进行介绍,推断过程是依赖SQLContext(HiveContext是SQLContext的子类) inferSchema实现的:

SQLContext inferSchema已经在1.3版本中被弃用,取而代之的是createDataFrame,inferSchema仍然可以在1.5.1版本中被使用,其实际执行过程就是SQLContext createDataFrame,这里需要注意一个参数samplingRation,它的默认值为None,后续会讨论它的具体作用。

这里我们仅仅考虑从RDD推断数据类型的情况,也就是isinstance(data, RDD)为True的情况,代码执行流程转入SQLContext _createFromRDD:

从上述的代码调用逻辑可以看出,schema为None,代码执行流程转入SQLContext _inferSchema:

SQLContext _inferSchema的主要流程大致分为三步:

第一步:获取RDD的第一行记录first,而且要求first不能为空值(注意不是None);

第二步:如果first的类型为“dict”,会输出一条警告信息:推断模式时建议RDD的元素类型为Row(pyspark.sql.Row),dict已被弃用;

第三步:如果samplingRatio为None,则直接使用first(也就是RDD的第一条记录)推断模式;如果samplingRation不为None,则根据该值“筛选”数据推断模式。

我们将着重介绍第三步的实现逻辑。

1. samplingRatio is None

_infer_schema使用一行记录row(也就是RDD的第一行记录)推断模式,大致分为四个步骤:

(1)如果记录row的数据类型为dict;

由此我们可以得出items实际就是一个键值对列表,其中键值对也可以理解为(列名,列值);之所以要进行排序操作(sorted)是为了保证列名顺序的一致性(dict.items()并不负责返回的列表元素顺序)。

(2)如果记录row的数据类型为tuple或list,可以细分为三种情况:

a. row的数据类型为Row,模拟处理过程:

b. row的数据类型为namedtuple,模拟处理过程:

c. row的数据类型为其它(tuple or tuple),模拟处理过程:

(3)如果记录row的数据类型为object;

由(1)、(2)、(3)可以看出,它们最终的逻辑是一致的,就是将记录row转换为一个键值对列表;如果(1)、(2)、(3)均不匹配,则认为无法推断,抛出异常即可。

(4)创建模式(StructType)

items中的每一个键值对会对应着形成一个StructField,StructField用于描述一个列的模式,它接收三个参数:列名、列类型、可否包含None;列名就是“键”,列类型则需要根据“值”推断(_infer_type),这里默认设置可以包含None。

迭代items中的这些键值对会形成一个StructField列表,最后通过StructType创建模式。

这是根据RDD的一行记录创建模式的过程,这其中还没有涉及具体的数据类型是如何被推断的,我们还需要看一下_infer_type:

_infer_type就是根据传入的obj来推断类型的,返回值为类型实例,需要处理以下六种情况:

(1)如果obj为None,则类型为NullType;

(2)真的没有理解,不解释;

(3)尝试根据type(obj)直接从_type_mappings中获取对应的类型信息dataType,_type_mappings就是一个字典,预先保留着一些Python类型与Spark SQL数据类型的对应关系,如下:

如果dataType不为None,则直接返回相应类型的实例即可;需要特殊处理的是DecimalType,考虑到实际数据中可能存在precision和scale不一致的情况,这里统一处理为precision:38,scale:18;如果dataType为None,则表明obj为复合数据类型(数组、字典、结构体)。

(4)如果obj的数据类型为dict,我们需要分别推断它的键类型(递归调用_infer_type)、值类型(递归调用_infer_type),然后构造MapType实例并返回;

推断键、值类型时,仅仅选取某一个键值对:它的键、值均不为None,如果存在多个这样的键值对,则选取是随机的,取决于dict.items();如果找不到这样的键值对,则认为键、值的类型均为NullType。

(5)如果obj的数据类型为list或array,则选取其中某一个不为None的元素推断其类型(递归调用_infer_type);如果找不到不为None的元素,则认为元素类型为NullType;最后构造ArrayType实例并返回;

(6)如果(1)、(2)、(3)、(4)、(5)均无法完成推断,则我们认为obj可能(仅仅是可能)是一个结构体类型(StructType),使用_infer_schema推断其类型;

2. samplingRatio is not None

samplingRatio为None时,则仅仅选取RDD的第一行记录参与推断,这就对这一行记录的“质量”提出很高的要求,某些情况下它无法代表全局,此时我们可以通过显示设置samplingRatio,“筛选”足够多的数据参与推断过程。

如果samplingRatio的值小于0.99,则使用RDD sample API根据samplingRatio“筛选”部分数据(rdd)参与推断;否则整个RDD(rdd)的所有记录参与推断。

推断过程可以简单理解为两步:

(1)对于RDD中的每一行记录通过方法_infer_schema推断出一个类型(map);

(2)将这些类型进行聚合(reduce)。

我们着重看一下聚合的实现逻辑:

聚合的实现逻辑由方法_merge_type完成,需要处理六种情况:

(1)如果a是NullType的实例,则返回b的类型;

(2)如果a不是NullType的实例,b是NullType的实例,则返回a的类型;

(3)如果a和b的类型不相同,则抛出异常;

以下处理过程基于a和b的类型相同。

(4)如果a的类型为StructType(结构体),则以a中的各个元素为模板合并类型(递归调用_merge_type),并追加b-a(差集)的元素(类型);

(5)如果a的类型为ArrayType(数组),则合并(递归调用_merge_type)两者的元素类型即可;

(6)如果a的类型为MapType(字典),则需要分别合并两者的键类型(递归调用_merge_type)、值类型(递归调用_merge_type)。

个人觉得目前的类型聚合逻辑过于简单,实际使用意义不大。

时间: 2024-10-06 11:21:33

Spark SQL  inferSchema实现原理探微(Python)的相关文章

Spark SQL inferSchema实现原理探微(Python)【转】

使用Spark SQL的基础是"注册"(Register)若干表,表的一个重要组成部分就是模式,Spark SQL提供两种选项供用户选择: (1)applySchema applySchema的方式需要用户编码显示指定模式,优点:数据类型明确,缺点:多表时有一定的代码工作量. (2)inferSchema inferSchema的方式无需用户编码显示指定模式,而是系统自动推断模式,代码比较简洁,但既然是推断,就可能出现推断错误(即与用户期望的数据类型不匹配的情况),所以我们需要对其推断

Spark SQL / Catalyst 内部原理 与 RBO

原创文章,转载请务必将下面这段话置于文章开头处. 本文转发自技术世界,原文链接 http://www.jasongj.com/spark/rbo/ 本文所述内容均基于 2018年9月10日 Spark 最新 Release 2.3.1 版本.后续将持续更新 Spark SQL 架构 Spark SQL 的整体架构如下图所示 从上图可见,无论是直接使用 SQL 语句还是使用 DataFrame,都会经过如下步骤转换成 DAG 对 RDD 的操作 Parser 解析 SQL,生成 Unresolve

【原创】经验分享(15)spark sql limit实现原理

之前讨论过hive中limit的实现,详见 https://www.cnblogs.com/barneywill/p/10109217.html下面看spark sql中limit的实现,首先看执行计划: spark-sql> explain select * from test1 limit 10;== Physical Plan ==CollectLimit 10+- HiveTableScan [id#35], MetastoreRelation temp, test1Time taken

【原创 Hadoop&Spark 动手实践 10】Spark SQL 程序设计基础与动手实践(下)

[原创 Hadoop&Spark 动手实践 10]Spark SQL 程序设计基础与动手实践(下) 目标: 1. 深入理解Spark SQL 程序设计的原理 2. 通过简单的命令来验证Spark SQL的运行原理 3. 通过一个完整的案例来验证Spark SQL的运行原理,自己实际动手来进行掌握 4. 顺利完成“篮球运动员评估系统”

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Spark1.1.0 Spark SQL Programming Guide

Spark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically Configuration JSON Datasets Hive Tables Performance Tuning

梯度迭代树(GBDT)算法原理及Spark MLlib调用实例(Scala/Java/python)

梯度迭代树(GBDT)算法原理及Spark MLlib调用实例(Scala/Java/python) http://blog.csdn.net/liulingyuan6/article/details/53426350 梯度迭代树 算法简介: 梯度提升树是一种决策树的集成算法.它通过反复迭代训练决策树来最小化损失函数.决策树类似,梯度提升树具有可处理类别特征.易扩展到多分类问题.不需特征缩放等性质.Spark.ml通过使用现有decision tree工具来实现. 梯度提升树依次迭代训练一系列的

Spark(Hive) SQL中UDF的使用(Python)

相对于使用MapReduce或者Spark Application的方式进行数据分析,使用Hive SQL或Spark SQL能为我们省去不少的代码工作量,而Hive SQL或Spark SQL本身内置的各类UDF也为我们的数据处理提供了不少便利的工具,当这些内置的UDF不能满足于我们的需要时,Hive SQL或Spark SQL还为我们提供了自定义UDF的相关接口,方便我们根据自己的需求进行扩展. 在Hive的世界里使用自定义UDF的过程是比较复杂的.我们需要根据需求使用Java语言开发相应的

Spark SQL Table Join(Python)

示例 Spark SQL注册“临时表”执行“Join”(Inner Join.Left Outer Join.Right Outer Join.Full Outer Join) 代码 from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext, Row conf = SparkConf().setAppName("spark_sql_table_join") sc = SparkConte