39、Parquet数据源之自动分区推断&合并元数据

一、自动分区推断

1、概述

表分区是一种常见的优化方式,比如Hive中就提供了表分区的特性。在一个分区表中,不同分区的数据通常存储在不同的目录中,
分区列的值通常就包含在了分区目录的目录名中。Spark SQL中的Parquet数据源,支持自动根据目录名推断出分区信息。
例如,如果将人口数据存储在分区表中,并且使用性别和国家作为分区列。那么目录结构可能如下所示:

tableName
  |- gender=male
    |- country=US
      ...
      ...
      ...
    |- country=CN
      ...
  |- gender=female
    |- country=US
      ...
    |- country=CH
      ... 

如果将/tableName传入SQLContext.read.parquet()或者SQLContext.read.load()方法,那么Spark SQL就会自动根据目录结构,推断出分区信息,是gender和country。
即使数据文件中只包含了两列值,name和age,但是Spark SQL返回的DataFrame,调用printSchema()方法时,会打印出四个列的值:name,age,country,gender。这就是自动分区推断的功能。

此外,分区列的数据类型,也是自动被推断出来的。目前,Spark SQL仅支持自动推断出数字类型和字符串类型。有时,用户也许不希望Spark SQL自动推断分区列的数据类型。
此时只要设置一个配置即可, spark.sql.sources.partitionColumnTypeInference.enabled,默认为true,即自动推断分区列的类型,设置为false,即不会自动推断类型。
禁止自动推断分区列的类型时,所有分区列的类型,就统一默认都是String。

案例:自动推断用户数据的性别和国家

2、java案例实现

##创建hdfs目录,上传文件
##创建了一个users目录,之下又创建了性别=男,国家=US两个目录
[[email protected] sql]# hdfs dfs -mkdir /spark-study/users
[[email protected] sql]# hdfs dfs -mkdir /spark-study/users/gender=male
[[email protected] sql]# hdfs dfs -mkdir /spark-study/users/gender=male/country=US
[[email protected] sql]# hdfs dfs -put users.parquet /spark-study/users/gender=male/country=US

--------------
package cn.spark.study.sql;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class ParquetPartitionDiscovery {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ParquetPartitionDiscovery");
        JavaSparkContext sc = new JavaSparkContext();
        SQLContext sqlConf = new SQLContext(sc);

        DataFrame usersDF = sqlConf.read().parquet("hdfs://spark1:9000/spark-study/users/gender=male/country=US/users.parquet");

        usersDF.printSchema();
        usersDF.show();

    }

}

##打包、上传

##运行脚本
[[email protected] sql]# cat ParquetPartitionDiscovery.sh
/usr/local/spark-1.5.1-bin-hadoop2.4/bin/spark-submit --class cn.spark.study.sql.ParquetPartitionDiscovery --num-executors 3 --driver-memory 100m --executor-memory 100m --executor-cores 3 --files /usr/local/hive/conf/hive-site.xml --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar /usr/local/spark-study/java/sql/saprk-study-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar 

##结果
##可见,已经自动推断出了性别=男,国家=US两个分区,并加到了字段中
+------+--------------+----------------+------+-------+
|  name|favorite_color|favorite_numbers|gender|country|
+------+--------------+----------------+------+-------+
|Alyssa|          null|  [3, 9, 15, 20]|  male|     US|
|   Ben|           red|              []|  male|     US|
+------+--------------+----------------+------+-------+

二、合并元数据

1、概述

如同ProtocolBuffer,Avro,Thrift一样,Parquet也是支持元数据合并的。用户可以在一开始就定义一个简单的元数据,然后随着业务需要,逐渐往元数据中添加更多的列。
在这种情况下,用户可能会创建多个Parquet文件,有着多个不同的但是却互相兼容的元数据。Parquet数据源支持自动推断出这种情况,并且进行多个Parquet文件的元数据的合并。

因为元数据合并是一种相对耗时的操作,而且在大多数情况下不是一种必要的特性,从Spark 1.5.0版本开始,默认是关闭Parquet文件的自动合并元数据的特性的。
可以通过以下两种方式开启Parquet数据源的自动合并元数据的特性:
1、读取Parquet文件时,将数据源的选项,mergeSchema,设置为true
2、使用SQLContext.setConf()方法,将spark.sql.parquet.mergeSchema参数设置为true

案例:合并学生的基本信息,和成绩信息的元数据

2、scala案例实现

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SaveMode

object ParquetMergeSchema {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("ParquetMergeSchema")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    // 创建一个DataFrame,作为学生的基本信息,并写入一个parquet文件中
    // toSeq转换为 Seq; Seq是列表,适合存有序重复数据,进行快速插入/删除元素等场景
    // sc.parallelize: 创建并行集合,2:指定了将数据集切分为2份
    val studentWithNameAge = Array(("leo", 30), ("jack", 26)).toSeq
    val studentWithNameAgeDF = sc.parallelize(studentWithNameAge, 2).toDF("name", "age")
    studentWithNameAgeDF.save("hdfs://spark1:9000/spark-study/students", "parquet", SaveMode.Append)

    // 创建第二个DataFrame,作为学生的成绩信息,并写入一个parquet文件中
    val studentWithNameGrade = Array(("tom", "A"), ("marry", "B")).toSeq
    val studentWithNameGradeDF = sc.parallelize(studentWithNameGrade, 2).toDF("name", "grade")
    studentWithNameGradeDF.save("hdfs://spark1:9000/spark-study/students", "parquet", SaveMode.Append)

    // 首先,第一个DataFrame和第二个DataFrame的元数据肯定是不一样的
    // 一个是包含了name和age两个列,一个是包含了name和grade两个列
    // 所以, 这里期望的是,读取出来的表数据,自动合并两个文件的元数据,出现三个列,name、age、grade

    // 用mergeSchema的方式,读取students表中的数据,进行元数据的合并
    val students = sqlContext.read.option("mergeSchema", "true")
      .parquet("hdfs://spark1:9000/spark-study/students")

    students.printSchema()
    students.show()

  }
}

##打包--上传--运行

##运行脚本
[[email protected] sql]# cat ParquetMergeSchema.sh
/usr/local/spark-1.5.1-bin-hadoop2.4/bin/spark-submit --class cn.spark.study.sql.ParquetMergeSchema --num-executors 3 --driver-memory 100m --executor-memory 100m --executor-cores 3 --files /usr/local/hive/conf/hive-site.xml --driver-class-path /usr/local/hive/lib/mysql-connector-java-5.1.17.jar /usr/local/spark-study/scala/sql/spark-study-scala.jar 

##结果,两个DataFrame元数据已合并
+-----+----+-----+
| name| age|grade|
+-----+----+-----+
|  leo|  30| null|
| jack|  26| null|
|marry|null|    B|
|  tom|null|    A|
+-----+----+-----+

3、java案例实现

package cn.spark.study.sql;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class ParquetMergeSchema {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ParquetMergeSchemaJava").setMaster("local");
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sparkContext);

        // 创建一个DataFrame,作为学生的基本信息,并写入一个parquet文件中
        List<String> studentWithNameAndAge = new ArrayList<String>();
        studentWithNameAndAge.add("tom,18");
        studentWithNameAndAge.add("jarry,17");
        JavaRDD<String> studentWithNameAndAgeRDD = sparkContext.parallelize(studentWithNameAndAge, 2);
        JavaRDD<Row> studentWithNameAndAgeRowRDD = studentWithNameAndAgeRDD
            .map(new Function<String, Row>() {
            @Override
            public Row call(String v1) throws Exception {
                return RowFactory.create(v1.split(",")[0], Integer.parseInt(v1.split(",")[1]));
            }
        });

        List<StructField> fieldList = new ArrayList<StructField>();
        fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        StructType structType = DataTypes.createStructType(fieldList);

        DataFrame studentWithNameAndAgeDF = sqlContext.createDataFrame(studentWithNameAndAgeRowRDD, structType);
        studentWithNameAndAgeDF.write().format("parquet").mode(SaveMode.Append)
            .save("hdfs://spark1:9000/spark-study/students");

        // 创建第二个DataFrame,作为学生的成绩信息,并写入一个parquet文件中
        List<String> studentWithNameAndGrade = new ArrayList<String>();
        studentWithNameAndGrade.add("leo,B");
        studentWithNameAndGrade.add("jack,A");
        JavaRDD<String> studentWithNameAndGradeRDD = sparkContext.parallelize(studentWithNameAndGrade, 2);
        JavaRDD<Row> studentWithNameAndGradeRowRDD = studentWithNameAndGradeRDD
            .map(new Function<String, Row>() {
            @Override
            public Row call(String v1) throws Exception {
                return RowFactory.create(v1.split(",")[0], v1.split(",")[1]);
            }
        });
        fieldList = new ArrayList<StructField>();
        fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("grade", DataTypes.StringType, true));
        structType = DataTypes.createStructType(fieldList);

        DataFrame studentWithNameAndGradeDF = sqlContext.createDataFrame(studentWithNameAndGradeRowRDD, structType);
        studentWithNameAndGradeDF.write().format("parquet").mode(SaveMode.Append)
            .save("hdfs://spark1:9000/spark-study/students");

        // 首先,第一个DataFrame和第二个DataFrame的元数据肯定是不一样的吧
        // 一个是包含了name和age两个列,一个是包含了name和grade两个列
        // 所以, 这里期望的是,读取出来的表数据,自动合并两个文件的元数据,出现三个列,name、age、grade
        // 用mergeSchema的方式,读取students表中的数据,进行元数据的合并
        DataFrame df = sqlContext.read().option("mergeSchema", "true")
            .parquet("hdfs://spark1:9000/spark-study/students");
        df.schema();
        df.show();
    }
}

原文地址:https://www.cnblogs.com/weiyiming007/p/11277227.html

时间: 2024-11-09 04:41:27

39、Parquet数据源之自动分区推断&合并元数据的相关文章

spark之数据源之自动分区推断

在hadoop上创建目录/spark-study/users/gender=male/country=US/users.parquet(并且把文件put上去) code: package cn.spark.study.core.mycode_dataFrame; import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.sql.DataFr

为已有表快速创建自动分区和Long类型like 的方法-Oracle 11G

对上一篇文章进行实际的运用.在工作中遇到有一张大表(五千万条数据),在开始的时候忘记了创建自动分区,导致现在使用非常不方便,查询的速度非常的满,所以就准备重新的分区表,最原始方法是先创建新的分区表,然后将数据依次插入到新的表中,但是我们的表的数据比较的大,如果这样做可能导致效率相对较低,经过寻扎发现了上一篇文章,这篇文章有三个方法,第一个就是最原始的方法,我没有进行实验,第二种(交换分区)和第三种的(在线重定义)我都进行了测试,第三种方法,我初以为会比较快速,但是经过测试需要超过2个小时的时间,

sparksql parquet 合并元数据

java 1 public class ParquetMergeSchema { 2 private static SparkConf conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local"); 3 private static JavaSparkContext jsc = new JavaSparkContext(conf); 4 private static SparkS

C++11新特性:自动类型推断和类型获取

声明:本文是在Alex Allain的文章http://www.cprogramming.com/c++11/c++11-auto-decltype-return-value-after-function.html的基础上写成的. 加入了很多个人的理解,不是翻译. 转载请注明出处 http://blog.csdn.net/srzhz/article/details/7934483 自动类型推断 当编译器能够在一个变量的声明时候就推断出它的类型,那么你就能够用auto关键字来作为他们的类型: [c

磁盘自动分区

#!/bin/bash##########################################Function:    auto fdisk#Usage:       bash auto_fdisk.sh#Author:      Customer service department#Company:     Alibaba Cloud Computing#Version:     2.0######################################### count

MySql自动分区

自动分区需要开启MySql中的事件调度器,可以通过如下命令查看是否开启了调度器 show variables like '%scheduler%'; 如果没开启的话通过如下指令开启 SET GLOBAL event_scheduler = 1; 1.创建一个分区表 CREATE TABLE sales ( id INT AUTO_INCREMENT, amount DOUBLE NOT NULL, createTime DATETIME NOT NULL, PRIMARY KEY(id, cre

磁盘分区及合并

磁盘的分区与合并 磁盘没分区,所有信息都在C盘怎么办?电脑空闲磁盘怎么处理? 不用进入重装系统,不用下载分区助手,只通过电脑的"管理"工具,轻松搞定. 在桌面上找到"计算机"图标,右击,选择"管理". 然后,在左侧列表找到"磁盘管理",点击打开. 找到需要被分区的磁盘,右击,选择"压缩卷",打开. 如图,"输入压缩空间量"一栏中输入将要新建磁盘的空间大小,然后单击"压缩&quo

Timestamp 与 Date 变量绑定与Oracle的自动分区

好久没有更新博客了,其实是工作中遇到的很多问题在Google上都能找到答案,也就没有记录下来的必要了.今天主要想聊一下在实际的系统中遇到的Oracle数据库的问题,希望对大家有一点点帮助就好. 我首先描述一下我所遇到的场景:我们的数据库用的是Oracle 11g,我想大家立马就对它的自动分区(Interval)有了基本的认识了,这是一个非常棒的功能,免除了在建表时弄一大堆建Range分区的代码,也免除了以后对数据库进行分区扩充的麻烦.当然利用JOB也是可以完成分区扩展的,但是既然Oracle提供

自动分区、格式化、挂载脚本

功能:自动检测是否有尚未分区的数据盘,格式化新的数据盘并自动挂载 解决了什么问题:一键式检测是否有尚未分区的数据盘,并能对其格式化和自动挂载,省去了复杂的命令和步骤 执行方法:以root身份执行命令 wget http://mirrors.linuxeye.com/scripts/auto_fdisk.sh chmod +x auto_fdisk.sh ./auto_fdisk.sh 结果:出现如下即自动分区.格式化.挂载成功: 脚本内容如下: #!/bin/bash # Author: yeh