package dev.spark.sql import java.util.Properties import org.apache.spark.sql.{Row, SQLContext, SaveMode}import org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql.types.{IntegerType, StructField, StructType}import org.apache.spark.{SparkConf, SparkContext} object DataFrame { val num = 0 val map = scala.collection.immutable.Map("url" -> "jdbc:mysql://192.168.0.1:3306/spark", "dbtable"-> "tmp_table3", "user"-> "spark", "password"->"spark") def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setMaster("local") conf.setAppName("dataFrame") val sc = new SparkContext(conf) val ssc = new SQLContext(sc) val df = ssc.read.json() ssc.read.format("json").load(".json") // dataFrame.show 直接查看数据集 按条件查看数据集 df.show() df.filter(df.col("col")<= num).show() // 将dataFrame注册为临时表 按照SQL方式访问数据集 df.registerTempTable("tmp_table0") // 返回的结果是将每行包装为ROW的数据集集 val dataSet0 = ssc.sql("SELECT col FROM tmp_table WHERE col <="+ num) // dataSet属性方法很多 dataSet0.collect()foreach(println) dataSet0.columns.foreach(println) dataSet0.rdd.foreach(println) dataSet0.explain() dataSet0.alias("") dataSet0.cache() dataSet0.na // SQLContext格式化读取文件 // parquet val pssc = new SQLContext(sc) pssc.read.format("parquet")load(".parquet") // jdbc val dataSet3 = ssc.read.format("jdbc").options(map).load() dataSet3.write.jdbc("jdbc:mysql://192.168.0.1:3306/spark","tmp_table3",new Properties()) // HiveSQLContext在resources中配置hive-site.xml后对hive仓库进行查询 注意:优先从临时表中查询,可以通过数据库.表名的方式完全限定避免歧义,默认仓库是default val hssc = new HiveContext(sc) val dataSet1 = hssc.sql("SELECT col FROM database.table") dataSet1.registerTempTable("tmp_table1") // 相同sparkContext上下文可以进行联表操作 hssc.sql("SELECT * FROM tmp_table0 t0 inner join tmp_table1 t1 on t0.col = t1.col") // 数据映射为表 dataSet0.rdd.map(line=>Row(line.size)) val rowkeyStructField = new StructField("rowkey", IntegerType,true) val tableStructType = new StructType(Array(rowkeyStructField)) val dataSet2 = hssc.createDataFrame(dataSet0.rdd, tableStructType) dataSet2.registerTempTable("tmp_table2") dataSet2.write.mode(SaveMode.Append).saveAsTable("hive_spark.tmp_table2") // rdd转dataframe需要隐式转换 import ssc.implicits._ case class RowKeyClass (rowkey:Int) dataSet0.rdd.map(x => new RowKeyClass(x.size)).toDF() }}
原文地址:https://www.cnblogs.com/mrerror/p/10853264.html
时间: 2024-09-29 09:51:50