sparksql parquet 合并元数据

java

 1 public class ParquetMergeSchema {
 2     private static SparkConf conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local");
 3     private static JavaSparkContext jsc = new JavaSparkContext(conf);
 4     private static SparkSession session = new SparkSession(jsc.sc());
 5
 6     public static void main(String[] args) {
 7         JavaRDD<Tuple2<String, Object>> rdd1 = jsc.parallelize(
 8                 Arrays.asList(new Tuple2<String, Object>("jack", 21), new Tuple2<String, Object>("lucy", 20)));
 9
10         JavaRDD<Row> row1 = rdd1.map(new Function<Tuple2<String, Object>, Row>() {
11
12             private static final long serialVersionUID = 1L;
13
14             @Override
15             public Row call(Tuple2<String, Object> v1) throws Exception {
16                 return RowFactory.create(v1._1, v1._2);
17             }
18         });
19
20         JavaRDD<Tuple2<String, Object>> rdd2 = jsc.parallelize(
21                 Arrays.asList(new Tuple2<String, Object>("jack", "A"), new Tuple2<String, Object>("yeye", "B")));
22
23         JavaRDD<Row> row2 = rdd2.map(new Function<Tuple2<String, Object>, Row>() {
24
25             private static final long serialVersionUID = 1L;
26
27             @Override
28             public Row call(Tuple2<String, Object> v1) throws Exception {
29                 return RowFactory.create(v1._1, v1._2);
30             }
31         });
32
33         StructType schema1 = DataTypes
34                 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
35                         DataTypes.createStructField("age", DataTypes.IntegerType, false)));
36
37         StructType schema2 = DataTypes
38                 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false),
39                         DataTypes.createStructField("grade", DataTypes.StringType, false)
40
41                 ));
42
43         // 将rdd转成dataset
44         Dataset<Row> ds1 = session.createDataFrame(row1, schema1);
45
46         Dataset<Row> ds2 = session.createDataFrame(row2, schema2);
47
48         // 保存为parquet文件
49         ds1.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest");
50         ds2.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest");
51
52         // 指定parquet文件的目录进行读取,设置mergeSchema为true进行合并
53         Dataset<Row> dataset = session.read().option("mergeSchema", true)
54                 .load("./src/main/java/cn/tele/spark_sql/parquet/mergetest");
55
56         dataset.printSchema();
57         dataset.show();
58
59         session.stop();
60         jsc.close();
61
62     }
63 }

scala

 1 object ParquetMergeSchema {
 2   def main(args: Array[String]): Unit = {
 3     val conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local")
 4     val sc = new SparkContext(conf)
 5     val sqlContext = new SQLContext(sc)
 6
 7     val rdd1 = sc.parallelize(Array(("jack", 18), ("tele", 20)), 2).map(tuple => { Row(tuple._1, tuple._2) })
 8     val rdd2 = sc.parallelize(Array(("tele", "A"), ("wyc", "A"), ("yeye", "C")), 2).map(tuple => { Row(tuple._1, tuple._2) })
 9
10     //schema
11     val schema1 = DataTypes.createStructType(Array(
12       StructField("name", DataTypes.StringType, false),
13       StructField("age", DataTypes.IntegerType, false)))
14
15     val schema2 = DataTypes.createStructType(Array(
16       StructField("name", DataTypes.StringType, false),
17       StructField("grade", DataTypes.StringType, false)))
18
19     //转换
20     val df1 = sqlContext.createDataFrame(rdd1, schema1)
21     val df2 = sqlContext.createDataFrame(rdd2, schema2)
22
23     //写出
24     df1.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
25     df2.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
26
27     //读取进行合并
28     val df = sqlContext.read.option("mergeSchema", true).parquet("./src/main/scala/cn/tele/spark_sql/parquet/mergetest")
29     df.printSchema()
30     df.show()
31   }
32 }

原文地址:https://www.cnblogs.com/tele-share/p/10390972.html

时间: 2024-11-09 09:28:08

sparksql parquet 合并元数据的相关文章

39、Parquet数据源之自动分区推断&amp;合并元数据

一.自动分区推断 1.概述 表分区是一种常见的优化方式,比如Hive中就提供了表分区的特性.在一个分区表中,不同分区的数据通常存储在不同的目录中, 分区列的值通常就包含在了分区目录的目录名中.Spark SQL中的Parquet数据源,支持自动根据目录名推断出分区信息. 例如,如果将人口数据存储在分区表中,并且使用性别和国家作为分区列.那么目录结构可能如下所示: tableName |- gender=male |- country=US ... ... ... |- country=CN ..

load、save方法、spark sql的几种数据源

load.save方法的用法 DataFrame usersDF = sqlContext.read().load("hdfs://spark1:9000/users.parquet");                usersDF.select("name", "favorite_color").write()                .save("hdfs://spark1:9000/namesAndFavColors.pa

大数据-spark理论(3)sparkSql,sparkStreaming,spark调优

导读目录 第一节:sparksql 1:简介 2:核心 3:与hive整合 4:dataFrame 5:函数 第二节:spark Streaming 1:对比strom 2:DStream的算子 3:代码 4:driver HA 5:读取数据 第三节:spark调优 第一节:sparksql (1)简介: Shark:shark是sparksql的前身,hive是shark的前身 快的原因:不仅是内存,还有谓词下移(减少一定量的数据IO) 正常 谓词下移 (先关联表在切割) (先将表中的字段过滤

网易视频云:新一代列式存储格式Parquet

网易视频云是网易倾力打造的一款基于云计算的分布式多媒体处理集群和专业音视频技术,提供稳定流畅.低时延.高并发的视频直播.录制.存储.转码及点播等音视频的PAAS服务,在线教育.远程医疗.娱乐秀场.在线金融等各行业及企业用户只需经过简单的开发即可打造在线音视频平台.现在,网易视频云的技术专家给大家分享一则技术文: 新一代列式存储格式Parquet. Apache Parquet是Hadoop生态圈中一种新型列式存储格式,它可以兼容Hadoop生态圈中大多数计算框架(Hadoop.Spark等),被

Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

Spark版本:1.6.2 概览 Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化.可以通过SQL.DataFrames API.Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理.用户可以根据自己喜好,在不同API中选择合适的进行处理.本章中所有用例均可以在spark-shell.pyspark shel

[Spark]-结构化数据查询之数据源篇

7. 数据源 Spark-SQL 支持通过Dataframe接口对各种数据源进行操作 各种数据源的加载&保存 数据转换(relational transformations) 注册临时视图(temporary view),来允许SQL的形式直接对临时视图进行操作 7.1  数据源加载 Spark-SQL的默认数据源为parquet(spark.sql.sources.default设置),一些数据源加载的例子如下: /** * 加载parquet数据源 */ spark.read.load(&qu

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

spark复习总结03

1.DataFrame的创建方式 1.1 通过加载外部文件创建 //通过sqlContext读取json文件创建DataFrame DataFrame dataFrame=sqlContext.read().json("src/main/resources/datafromcreate.txt");//通过两种方式加载json文件//sqlContext.read().json("src/main/resources/datafromcreate.txt");sql

Spark SQL初始化和创建DataFrame的几种方式

一.前述       1.SparkSQL介绍 Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制. SparkSQL支持查询原生的RDD. RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础. 能够在Scala中写SQL语句.支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用.     2.Spark on Hive和Hive on Spa