java
1 public class ParquetMergeSchema { 2 private static SparkConf conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local"); 3 private static JavaSparkContext jsc = new JavaSparkContext(conf); 4 private static SparkSession session = new SparkSession(jsc.sc()); 5 6 public static void main(String[] args) { 7 JavaRDD<Tuple2<String, Object>> rdd1 = jsc.parallelize( 8 Arrays.asList(new Tuple2<String, Object>("jack", 21), new Tuple2<String, Object>("lucy", 20))); 9 10 JavaRDD<Row> row1 = rdd1.map(new Function<Tuple2<String, Object>, Row>() { 11 12 private static final long serialVersionUID = 1L; 13 14 @Override 15 public Row call(Tuple2<String, Object> v1) throws Exception { 16 return RowFactory.create(v1._1, v1._2); 17 } 18 }); 19 20 JavaRDD<Tuple2<String, Object>> rdd2 = jsc.parallelize( 21 Arrays.asList(new Tuple2<String, Object>("jack", "A"), new Tuple2<String, Object>("yeye", "B"))); 22 23 JavaRDD<Row> row2 = rdd2.map(new Function<Tuple2<String, Object>, Row>() { 24 25 private static final long serialVersionUID = 1L; 26 27 @Override 28 public Row call(Tuple2<String, Object> v1) throws Exception { 29 return RowFactory.create(v1._1, v1._2); 30 } 31 }); 32 33 StructType schema1 = DataTypes 34 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 35 DataTypes.createStructField("age", DataTypes.IntegerType, false))); 36 37 StructType schema2 = DataTypes 38 .createStructType(Arrays.asList(DataTypes.createStructField("name", DataTypes.StringType, false), 39 DataTypes.createStructField("grade", DataTypes.StringType, false) 40 41 )); 42 43 // 将rdd转成dataset 44 Dataset<Row> ds1 = session.createDataFrame(row1, schema1); 45 46 Dataset<Row> ds2 = session.createDataFrame(row2, schema2); 47 48 // 保存为parquet文件 49 ds1.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest"); 50 ds2.write().mode(SaveMode.Append).save("./src/main/java/cn/tele/spark_sql/parquet/mergetest"); 51 52 // 指定parquet文件的目录进行读取,设置mergeSchema为true进行合并 53 Dataset<Row> dataset = session.read().option("mergeSchema", true) 54 .load("./src/main/java/cn/tele/spark_sql/parquet/mergetest"); 55 56 dataset.printSchema(); 57 dataset.show(); 58 59 session.stop(); 60 jsc.close(); 61 62 } 63 }
scala
1 object ParquetMergeSchema { 2 def main(args: Array[String]): Unit = { 3 val conf = new SparkConf().setAppName("parquetmergeschema").setMaster("local") 4 val sc = new SparkContext(conf) 5 val sqlContext = new SQLContext(sc) 6 7 val rdd1 = sc.parallelize(Array(("jack", 18), ("tele", 20)), 2).map(tuple => { Row(tuple._1, tuple._2) }) 8 val rdd2 = sc.parallelize(Array(("tele", "A"), ("wyc", "A"), ("yeye", "C")), 2).map(tuple => { Row(tuple._1, tuple._2) }) 9 10 //schema 11 val schema1 = DataTypes.createStructType(Array( 12 StructField("name", DataTypes.StringType, false), 13 StructField("age", DataTypes.IntegerType, false))) 14 15 val schema2 = DataTypes.createStructType(Array( 16 StructField("name", DataTypes.StringType, false), 17 StructField("grade", DataTypes.StringType, false))) 18 19 //转换 20 val df1 = sqlContext.createDataFrame(rdd1, schema1) 21 val df2 = sqlContext.createDataFrame(rdd2, schema2) 22 23 //写出 24 df1.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest") 25 df2.write.mode(SaveMode.Append).save("./src/main/scala/cn/tele/spark_sql/parquet/mergetest") 26 27 //读取进行合并 28 val df = sqlContext.read.option("mergeSchema", true).parquet("./src/main/scala/cn/tele/spark_sql/parquet/mergetest") 29 df.printSchema() 30 df.show() 31 } 32 }
原文地址:https://www.cnblogs.com/tele-share/p/10390972.html
时间: 2024-11-09 09:28:08