[Spark]-结构化数据查询之数据源篇

7. 数据源

  Spark-SQL 支持通过Dataframe接口对各种数据源进行操作

    各种数据源的加载&保存

    数据转换(relational transformations)

    注册临时视图(temporary view),来允许SQL的形式直接对临时视图进行操作

  7.1  数据源加载

    Spark-SQL的默认数据源为parquet(spark.sql.sources.default设置),一些数据源加载的例子如下:    

 /**
        * 加载parquet数据源
        */
        spark.read.load("D:\\data\\users.parquet").show()
        /**
        * 加载非parquet数据源,需要手动设置选项
        *   Spark-SQLd对内置数据源,可以使用短名称.对自定义的数据源需要使用全名称(比如:org.apache.spark.sql.parquet)
        *   Spark-SQL内置数据源:json, parquet, jdbc, orc, libsvm, csv, text
        */
        spark.read.format("csv").load("D:\\data\\people.csv").show()
        /**
        * 以SQL的形式读取文件
        */
        spark.sql("select * from json.`D:\\data\\sku.json`").show()

  7.2 数据源保存

    7.2.1 保存模式

      数据源的保存提供了以下保存的模式

Scala/Java Any Language 描述
SaveMode.ErrorIfExists(默认) error/errorifexists 如果保存的数据已存在,将抛出错误
SaveMode.Append append 如果保存的数据已存在,数据将以追加的形式写入
SaveMode.Overwrite overwrite 如果保存的数据已存在,数据将以覆盖的形式写入
SaveMode.Ignore ignore 如果保存的数据已存在,数据将不会写入(也不会抛出错误,即原数据不变,新数据忽略)

      数据源的保存是 非原子性 的,这一点务必要注意.比如以Overwrite形式写入时,数据将先删除后写入(覆盖),这里有数据丢失的可能性的

       7.2.2 存储源

      Spark-SQL 可以将数据保存在两种源上: 文件(file-based) 和 持久化表(persistent tables) 

      i).文件(file-based)

        数据保存的格式可以变化,比如将一个csv保存为一个txt的例子

        spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath")

       ii).持久化表(persistent tables)

        将数据保存到Hive的表中.

        元数据

          数据保存到Hive的表中,将同时自动保存schema到Hive的MetaStore里

        内/外表

          数据保存时,根据是否指定path来区分内外表.

            当手动设置为其它路径时成为外部表(表删除只删除元数据不删除表数据)

            不设置path则成为内部表(表删除将同时删除元数据和表数据)

        表分区

          如果表被设置为外部表,默认不会收集分区信息,需要手动同步(msck / add partition)

  7.3.内置数据源

    7.3.1 Parquet

        Parquet是一种列式存储格式,无法直接阅读.但有非常好的压缩消耗和压缩比.

        Parquet是Spark默认和推荐使用的数据格式.在很多方面,Spark都对Parquet有最大支持

        在Spark中,出于兼容性的考虑,所有的Columns都将自动转换为可空类型

        7.3.1.1 一个Parquet读写的例子

        spark.read.format("csv").load("InputPath").write.format("text").save("OutputPath")

           7.3.1.2 Parquet的分区发现

        在Hive之类中,分区是以目录的形式存在.分区键本身是目录的一部分.但此时,分区信息是不会自动的被Hive发现.

        而在Parquet中,Spark可以自动的发现分区和推断数据类型.推断的依据就是目录

        比如:一个存放所有订单信息的目录,以类似这样的格式 order/{time(xxxx-MM-dd)}/{area(xx)}/xxxx.parquet

           将路径(/order)传入,读取将自动增加两列time和area作为分区列,并自动在实际使用中应用分区信息

        注意:

          i).对Parquet的分区发现字段现仅支持数字型和字符串型.

            spark.sql.sources.partitionColumnTypeInference.enabled(默认为true)设置,可以关闭分区发现的数据类型推断

           ii).默认情况下,分区发现只能找到给定路径的分区信息

            比如 path/to/table/gender=male =>默认发现的分区列不会包含 gender

                 在数据源选项中设置basePath (path/to/table/)=> gender将成为一个分区列

        7.3.1.3 Parquet的元数据合并(schema merging)

         实际工作中,Parquet数据文件很可能是由一个开始比较简单的schema逐渐变得复杂(一开始的业务比较简单,但随着业务的推进会加入越来越多的字段)

         因为Parquet的schema是自存储的,所以需要一种途径来保持对历史数据schema的兼容性,这就是模式演进(schema evolution),而实现的手段就是模式合并

         模式合并是一个昂贵操作,而在大多数情况下都是不会使用到它,所以模式合并默认是关闭的

         有以下两种途径可以打开模式合并

          i).临时=>在Option中设置mergeSchema=true

            spark.read.option("mergeSchema", "true")

            ii).永久=> 配置文件配置 spark.sql.parquet.mergeSchema 设置为 true

      7.3.1.4 Parquet 与Hive MetaStore

        在Spark读取和写入Hive的Parquet表时,Spark将使用自己的Parquet Support,而不是Hive SerDe ( spark.sql.hive.convertMetastoreParquet 默认开启)

         Spark-Parquet-Schema 与 Hive-MetaStore 是有一些区别的,这些区别体现在:

          Hive-MetaStore不区分大小写,而Spark-Parquet-Schema是区分大小写的.

          Hive-MetaStore认为所有列都是可空的,而Spark-Parquet-Schema认为列是否可空必须指定

        所以在Spark-Parquet-Schema 与 Hive-MetaStore 会进行一些自协调(reconciled schema),规则如下:

          只出现在Spark-Parquet-Schema中的任何字段会被自动删除

          只出现在Hive-MetaStore中的任何字段以可空字段(nullable field)形式增加

      7.3.1.5 Parquet的元数据刷新(Metadata Refreshing)

         Parquet的元数据将会被自动缓存以获得更好的性能.

         但需要注意的是:Hive metastore Parquet table conversion,这些Hive表的MetaData也会被缓存.

         如果这些表在Hive或其它工具层面被改变,Spark无法感知这些改变,必须手工刷新.刷新方式如下:

          spark.catalog.refreshTable("my_table")

      7.3.1.6 Parquet的常用配置

          

参数名 默认值 描述
spark.sql.parquet.binaryAsString false
一些其它的基于Parquet系统(比如Hive,Impala,旧版Spark-SQL),在写出Parquet-schema时不区分binary data (二进制数据)和 strings (字符串)

启用这个属性,将保持将二进制数据解释为字符串的兼容性

spark.sql.parquet.int96AsTimestamp true 一些其它的基于Parquet系统(比如Hive,Impala),会将Timestamp写为int96.该属性告知Spark-SQL将int96解析为Timestamp的兼容性
spark.sql.parquet.cacheMetadata true 指示是否打开parquet的元数据缓存(这可以加快查询静态数据的速度)
spark.sql.parquet.compression.codec snappy 指示写出parquet文件的压缩类型(默认snappy).可选项为:uncompressed, snappy, gzip, lzo
spark.sql.parquet.filterPushdown true 设置为 true 时启用 (过滤谓词下推,尝试调整算子执行顺序,将写在后面的过滤谓词提前处理以减少查询数据量,提高性能)
spark.sql.hive.convertMetastoreParquet true 面对Hive时,是否启用使用Spark-SQL内置的parquet,设为否则放弃使用内置parquet转而使用Hive serDe
spark.sql.parquet.mergeSchema false 是否全局启用parquet的元数据合并,设为否则从summary file或random file中随机挑选
spark.sql.optimizer.metadataOnly true 当设为true时,将使用metadata信息来构建分区列而不是走表扫描.(只是在查询的所有列都是分区列时才有意义,并且此时依然有各种聚合能力)

    7.3.1 ORC

      ORC格式是Spark-2.3之后才支持的数据源格式.它的可配置信息如下:

参数名 默认值 描述
spark.sql.orc.impl hive ORC的实现名称.可选值hive(使用hive-1.2.1的ORC库),native(或者hive.native,将使用Apache ORC 1.4.1)
spark.sql.orc.enableVectorizedReader true 是否在本机使用向量化ORC解码.如果为false,会在本机构造一个向量化ORC阅读器,对于上面设置为hive,这个属性将被忽略

    7.3.2 JSON

      Spark-SQL可以自动推断Json的schema.(Json的schema依然是自存储的)

      Spark-SQL加载将以DataFrame[Row]的形式,并且可以非常简单的转为一个Dataset[T](.as[T])

      一个简单的例子如下:

        //读取Json数据源,如果某些行缺age属性的,补0

        spark.read.json("D:\\data\\people.json").na.fill(0,Seq("age"))

     7.3.3 Hive

      7.3.3.1 Hive支持

        Spark-SQL还支持读取和写入存储在Hive中的数据.Spark-SQL使用Hive,但需要做一些补充配置

          i).Hive的库必须在classpath中被找到.因为Hive的库有大量的依赖,而这些依赖不一定是被Spark完全打包的,所以在运行时,必须要能找到这些库包

           注意:这种依赖与找到是针对每个executor而言的,因为executor才是真正的执行者.

            ii).需要在Spark.Conf中拷入 hive-site.xmlcore-site.xml(用于安全配置)和 hdfs-site.xml (用于 HDFS 配置)文件

           iii).需要在SparkSession中启用对Hive的支持:spark.enableHiveSupport()

             在实际运行中,不一定需要部署一个完整的Hive系统,比如当hive-site.xml不存在时,会在当前目录创建一个metastore_db用以存储元数据(不推荐如此)

      7.3.3.2 Hive的存储格式

        创建一个Hive表,需要定义读取/写入文件系统的格式,以及数据的行断定义,列断定义等.Spark-Sql读取Hive表时,默认以文本格式读取.

        一些Hive存储格式相关格式如下:

属性名 描述
fileFormat fileFormat是一种存储格式规范的包,包括 "serde","input format" 和 "output format"。 目前支持6个文件格式:‘sequencefile‘,‘rcfile‘,‘orc‘,‘parquet‘,‘textfile‘和‘avro‘
inputFormat, outputFormat 这两个选项将相应的 "InputFormat" 和 "OutputFormat" 类的名称指定为字符串文字,例如: `org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。 这两个选项必须成对出现,如果已经指定了 "fileFormat" 选项,则无法指定它们
serde 此选项指定 serde 类的名称。 当指定 `fileFormat` 选项时,如果给定的 `fileFormat` 已经包含 serde 的信息,那么不要指定这个选项。 目前的 "sequencefile", "textfile" 和 "rcfile" 不包含 serde 信息,你可以使用这3个文件格式的这个选项
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与 "textfile" 文件格式一起使用。它们定义如何将分隔的文件读入行

   7.3.3 JDBC

      Spark-SQL 同样支持以JDBC的形式从其它的关系型数据库读取数据.(此方式优于JdbcRDD,因为DF更容易与处理与其它数据源的交互,比如直接映射为临时视图等等)

      开始使用时,需要在Spark类路径包含目标数据库的JDBC驱动程序  例如,要从 Spark Shell 连接到 postgres

        bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

      可以在数据源选项中指定 JDBC 连接属性。用户 和 密码通常作为登录数据源的连接属性提供。 除了连接属性外,Spark 还支持以下不区分大小写的选项

属性 描述
url 要连接的JDBC URL。 源特定的连接属性可以在URL中指定。 例如jdbc:jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 应该读取的 JDBC 表。请注意,可以使用在SQL查询的 FROM 子句中有效的任何内容。 例如,您可以使用括号中的子查询代替完整表
driver 用于连接到此 URL 的 JDBC driver 程序的类名
partitionColumn, lowerBound, upperBound 这三个属性必须被一起设置.partitionColumn 必须是数字列.lowerBound 和 upperBound 仅用于决定分区的大小而不是用于过滤表中的行.因此,表中的所有行将被分区并返回
numPartitions 在表读写中可以用于并行度的最大分区数。这也确定并发JDBC连接的最大数量。 如果要写入的分区数超过此限制,则在写入之前通过调用 coalesce(numPartitions) 将其减少到此限制
fetchsize JDBC 抓取的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能,它们的默认值较小(例如: Oracle 是 10 行)。 该选项仅适用于读取操作
batchsize JDBC 批处理的大小,用于确定每次数据往返传递的行数。 这有利于提升 JDBC driver 的性能。 该选项仅适用于写操作。默认值为 1000
isolationLevel
事务隔离级别,适用于当前连接。 它可以是 NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, 或 SERIALIZABLE 之一,对应于 JDBC 连接对象定义的标准事务隔离级别,默认为 READ_UNCOMMITTED

此选项仅适用于写操作。请参考 java.sql.Connection 中的文档

sessionInitStatement
在每个数据库会话打开到远程DB并开始读取数据之后,该选项将执行一个自定义SQL语句(或PL/SQL块)。使用它来实现会话初始化代码。

示例:选项(“sessionInitStatement”、“”开始执行即时的“alter session set”“_serial_direct_read”=true”;结束;”“”)

truncate 这是一个与 JDBC 相关的选项.启用 SaveMode.Overwrite 时,此选项会导致 Spark 截断现有表,而不是删除并重新创建。它默认为 false。 此选项仅适用于写操作
createTableOptions 这是一个与JDBC相关的选项。 如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB. )。此选项仅适用于写操作
createTableColumnTypes 这是一个与JDBC相关的选项.如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:CREATE TABLE t (name string) ENGINE=InnoDB.).此选项仅适用于写操作
customSchema
用于从连接器读取JDBC数据的自定义模式。例如,“id DECIMAL(38,0), name STRING”。还可以指定部分字段,其他字段使用默认类型映射。

例如,“id DECIMAL(38,0)”。列名应该与JDBC表对应的列名相同。用户可以指定Spark SQL的相应数据类型,而不是使用默认值。此选项仅适用于读取

原文地址:https://www.cnblogs.com/NightPxy/p/9266125.html

时间: 2024-10-13 07:53:09

[Spark]-结构化数据查询之数据源篇的相关文章

[Spark]-结构化流之监控&故障恢复篇

6 流的监控以及故障恢复 6.1.流的运行时数据    结构化流启动后返回的 StreamingQuery 对象. val query = df.writeStream.format("console").start() // get the query object query.id // get the unique identifier of the running query that persists across restarts from checkpoint data

各式结构化数据 动态 接入-存储-查询 的处理办法 (第二部分)

各式结构化数据的动态接入存储查询,这一需求相信有很多人都遇到过,随着实现技术路线选择的不同,遇到的问题出入大了,其解决办法也是大相径庭.数据存储在哪儿,是关系型数据库,还是NoSQL数据库,是MySQL还是Oracle,怎么建立索引,建立什么类型的索引,都是大学问.下面,我要把我对这一解决办法的思考总结一下,有成熟的也有不成熟的,希望大家一起共同探讨. 关键词:结构化数据, 动态, 接入, 存储, 查询 首先,我们得定义一下在本文中什么是结构化数据,这里的结构化数据主要是指扁平化的.可以由基础数

非结构化数据的存储与查询

当今信息化时代充斥着大量的数据.海量数据存储是一个必然的趋势.然而数据如何的存储和查询,尤其是当今非结构化数据的快速增长,对其数据的存储,处理,查询.使得如今的 关系数据库存储带来了巨大的挑战.分布存储技术是云计算的基础,主要研究如何存储.组织和管理数据中心上的大规模海量数据.由于面临的数据规模和用户规模更加庞大,在可扩展性.容错性以及成本控制方面面临着更加严峻的挑战[1]. 对于大量的半结构化数据(semi-structure data)和非结构化数据,对其存储和并发计算以及扩展能力而设计出了

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

Salesforce开源TransmogrifAI:用于结构化数据的端到端AutoML库

AutoML 即通过自动化的机器学习实现人工智能模型的快速构建,它可以简化机器学习流程,方便更多人利用人工智能技术.近日,软件行业巨头 Salesforce 开源了其 AutoML 库 TransmogrifAI.Salesforce Einstein 数据科学高级总监 Shubha Nabar 在 Medium 上撰文介绍了该 AutoML 库,包括工作流程和设计原则等. GitHub 链接:https://github.com/salesforce/TransmogrifAI Transmo

搜索引擎系列十:Solr(solrj 、索引API 、 结构化数据导入)

一.SolrJ介绍 1. SolrJ是什么? Solr提供的用于JAVA应用中访问solr服务API的客户端jar.在我们的应用中引入solrj: <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>7.3.0</version> </dependency> 2. SolrJ的核

MaxCompute读取分析OSS非结构化数据的实践经验总结

摘要: 本文背景 很多行业的信息系统中,例如金融行业的信息系统,相当多的数据交互工作是通过传统的文本文件进行交互的.此外,很多系统的业务日志和系统日志由于各种原因并没有进入ELK之类的日志分析系统,也是以文本文件的形式存在的. 1. 本文背景 很多行业的信息系统中,例如金融行业的信息系统,相当多的数据交互工作是通过传统的文本文件进行交互的.此外,很多系统的业务日志和系统日志由于各种原因并没有进入ELK之类的日志分析系统,也是以文本文件的形式存在的.随着数据量的指数级增长,对超大文本文件的分析越来

非结构化数据

rlist扩展包 设计目标:更方便地在R中操作list对象 特性: 提供一系列高阶函数,可以方便地对list对象中的元素进行映射(mapping).筛选(filtering).分组(grouping).排序(sorting).合并(joining).更新(updating).搜索(searching)以及其他常用操作. 对管道操作(pipeline)友好,方便非结构化数据处理的流程化. 整合多种非结构化数据源的读写方法,方便接入数据源和输出数据. 合理利用R的元编程特性,简化使用. 基于表达式的

阿里云OTS(开放结构化数据服务)可视化管理工具的设计和功能介绍

设计初衷 最近一段时间,工作中一直用阿里云的服务器ECS以及SLB.OTS(开放结构化数据服务)等相关的服务,特别是OTS刚刚商业化,官方还没出一种可视化的管理工具,而且我曾跟阿里云OTS的架构师深入交谈过,虽然他们有推出可视化管理工具的想法,但是无奈由于种种原因,目前暂时未提上日程,而自己又在开发中负责OTS源码的修改以适合公司相应的业务逻辑并且其他人也需要调用我封装的OTS相关的服务,为了方便.高效的快速应用ots,所以一个可视化.方便快捷的OTS管理工具的需求迫在眉睫..so--我决定自己