Spark parquet merge metadata问题

在spark sql 1.2.x当中存在一个问题:

当我们尝试在一个查询中访问多个parquet文件时,如果这些parquet文件中的字段名和类型是完全一致的、只是字段的顺序不一样,例如一个文件中是name string, id int,另一个文件是id int, name string时,查询会报错,抛出metadata merge的异常。

在1.3当中,这个问题其实已经解决。那么在1.2.x中解决的办法是:

在spark源码的sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala文件中,找到override def getSplits(configuration: Configuration, footers: JList[Footer]): JList[ParquetInputSplit]这个方法,在如下这段代码之前:

if (globalMetaData == null) {
     val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
     return splits
    }

将val globalMetaData改成 var globalMetaData

在上面这段代码之后加上如下几行:

val startTime = System.currentTimeMillis();
val metadata = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
val mergedMetadata = globalMetaData.getKeyValueMetaData.updated(RowReadSupport.SPARK_METADATA_KEY, setAsJavaSet(Set(metadata)))
globalMetaData = new GlobalMetaData(globalMetaData.getSchema, mergedMetadata, globalMetaData.getCreatedBy)
val endTime = System.currentTimeMillis();
logInfo("\n*** updated globalMetadata in " + (endTime - startTime) + " ms. ***\n");

其中第2-4行是必须的,这三行是从spark1.3里面摘出来的。其他三行只是想打个日志,看看这段代码放的执行时间。

然后就是编译源码了:

mvn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package

具体参考http://spark.apache.org/docs/1.2.1/building-spark.html

我在一台服务器上测试了编译之后的spark,问题解决了,执行很顺利,性能没有任何影响。读取600个parquet文件,加上的几行代码只用了1ms左右。

时间: 2024-11-25 20:19:37

Spark parquet merge metadata问题的相关文章

把Spark SQL的metadata存储到mysql

1:安装配置mysql yum install mysql mysql-server service mysqld start mysqladmin -u root  password newpassword mysql -u root -p 登录mysql mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%'WITH GRANT OPTION myslq>FLUSH PRIVILEGES 2:配置hive-site.xml 下载一个hive的安装包,

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Spark1.1.0 Spark SQL Programming Guide

Spark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically Configuration JSON Datasets Hive Tables Performance Tuning

spark的累加器-SQL-Streaming

RDD持久化 --------------- memory disk off-heap serial replication Memory_ONLY(true , false ,false , true ,1) 广播变量 --------------- driver端切成小块,存放到blockmanager,executor广播变量 的小块,首先从自己的blockmgr中提取,如果提取不到,在从其他 节点(driver + executor)提取,一旦提取到存放在自己的blockmgr. RDD

Spark SQL and DataFrame Guide(1.4.1)——之Data Sources

数据源(Data Sources) Spark SQL通过DataFrame接口支持多种数据源操作.一个DataFrame可以作为正常的RDD操作,也可以被注册为临时表. 1. 通用的Load/Save函数 默认的数据源适用所有操作(可以用spark.sql.sources.default设置默认值) 之后,我们就可以使用hadoop fs -ls /user/hadoopuser/在此目录下找到namesAndFavColors.parquet文件. 手动指定数据源选项 我们可以手动指定数据源

二十种特征变换方法及Spark MLlib调用实例(Scala/Java/python)(二)

VectorIndexer 算法介绍: VectorIndexer解决数据集中的类别特征Vector.它可以自动识别哪些特征是类别型的,并且将原始值转换为类别指标.它的处理流程如下: 1.获得一个向量类型的输入以及maxCategories参数. 2.基于原始数值识别哪些特征需要被类别化,其中最多maxCategories需要被类别化. 3.对于每一个类别特征计算0-based类别指标. 4.对类别特征进行索引然后将原始值转换为指标. 索引后的类别特征可以帮助决策树等算法处理类别型特征,并得到较

基于spark2.0整合spark-sql + mysql + parquet + HDFS

一.概述 spark 2.0做出的改变大家可以参考官网以及其他资料,这里不再赘述由于spark1.x的sqlContext在spark2.0中被整合到sparkSession,故而利用spark-shell客户端操作会有些许不同,具体如下文所述 二.spark额外配置 1. 正常配置不再赘述,这里如果需要读取MySQL数据,则需要在当前用户下的环境变量里额外加上JDBC的驱动jar包 例如我的是:mysql-connector-java-5.1.18-bin.jar 存放路径是$SPARK_HO

Atitit.atiDataStoreService   v2 新特性

Atitit.atiDataStoreService   v2 新特性 1.1. V1  基础实现1 1.2. V2  增加了对  $uuid  $cur_uid参数的支持1 1.3. 增加了fld fun的支持1 2. fld fun1 2.1. Invoke   简化版全局函数txt2html1 2.2. ---------atiDataStoreService   .js1 2.3. dslUtil2 2.4. jAva3 3. code4 1.1. V1  基础实现 1.2. V2  增

Spark1.6.2 java实现读取txt文件插入MySql数据库代码

package com.gosun.spark1; import java.util.ArrayList;import java.util.List;import java.util.Properties; import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spa