1、DataFrame简介:
在Spark中,DataFrame是一种以RDD为基础的分布式数据据集,类似于传统数据库听二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。
类似这样的
root |-- age: long (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true)
2、准备测试结构化数据集
people.json
{"id":1, "name":"Ganymede", "age":32} {"id":2, "name":"Lilei", "age":19} {"id":3, "name":"Lily", "age":25} {"id":4, "name":"Hanmeimei", "age":25} {"id":5, "name":"Lucy", "age":37} {"id":6, "name":"Tom", "age":27}
3、通过编程方式理解DataFrame
1) 通过DataFrame的API来操作数据
import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Level import org.apache.log4j.Logger object DataFrameTest { def main(args: Array[String]): Unit = { //日志显示级别 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR) //初始化 val conf = new SparkConf().setAppName("DataFrameTest") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("people.json") //查看df中的数据 df.show() //查看Schema df.printSchema() //查看某个字段 df.select("name").show() //查看多个字段,plus为加上某值 df.select(df.col("name"), df.col("age").plus(1)).show() //过滤某个字段的值 df.filter(df.col("age").gt(25)).show() //count group 某个字段的值 df.groupBy("age").count().show() //foreach 处理各字段返回值 df.select(df.col("id"), df.col("name"), df.col("age")).foreach { x => { //通过下标获取数据 println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2)) } } //foreachPartition 处理各字段返回值,生产中常用的方式 df.select(df.col("id"), df.col("name"), df.col("age")).foreachPartition { iterator => iterator.foreach(x => { //通过字段名获取数据 println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age")) }) } } }
2)通过注册表,操作sql的方式来操作数据
-
import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Level import org.apache.log4j.Logger /** * @author Administrator */ object DataFrameTest2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("DataFrameTest2") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df = sqlContext.read.json("people.json") df.registerTempTable("people") df.show(); df.printSchema(); //查看某个字段 sqlContext.sql("select name from people ").show() //查看多个字段 sqlContext.sql("select name,age+1 from people ").show() //过滤某个字段的值 sqlContext.sql("select age from people where age>=25").show() //count group 某个字段的值 sqlContext.sql("select age,count(*) cnt from people group by age").show() //foreach 处理各字段返回值 sqlContext.sql("select id,name,age from people ").foreach { x => { //通过下标获取数据 println("col1: " + x.get(0) + ", col2: " + "name: " + x.get(2) + ", col3: " + x.get(2)) } } //foreachPartition 处理各字段返回值,生产中常用的方式 sqlContext.sql("select id,name,age from people ").foreachPartition { iterator => iterator.foreach(x => { //通过字段名获取数据 println("id: " + x.getAs("id") + ", age: " + "name: " + x.getAs("name") + ", age: " + x.getAs("age")) }) } } }
两种方式运行结果是一样的,第一种适合程序员,第二种适合熟悉sql的人员。
4、对于非结构化的数据
people.txt
-
1,Ganymede,32 2, Lilei, 19 3, Lily, 25 4, Hanmeimei, 25 5, Lucy, 37 6, wcc, 4
1) 通过字段反射来映射注册临时表
import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Level import org.apache.log4j.Logger import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.Row /** * @author Administrator */ object DataFrameTest3 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("DataFrameTest3") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val people = sc.textFile("people.txt") val peopleRowRDD = people.map { x => x.split(",") }.map { data => { val id = data(0).trim().toInt val name = data(1).trim() val age = data(2).trim().toInt Row(id, name, age) } } val structType = StructType(Array( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true))); val df = sqlContext.createDataFrame(peopleRowRDD, structType); df.registerTempTable("people") df.show() df.printSchema() } }
2) 通过case class反射来映射注册临时表
import org.apache.spark.sql.SQLContext import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.log4j.Level import org.apache.log4j.Logger import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.Row /** * @author Administrator */ object DataFrameTest4 { case class People(id: Int, name: String, age: Int) def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR); Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR); val conf = new SparkConf().setAppName("DataFrameTest4") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val people = sc.textFile("people.txt") val peopleRDD = people.map { x => x.split(",") }.map { data => { People(data(0).trim().toInt, data(1).trim(), data(2).trim().toInt) } } //这里需要隐式转换一把 import sqlContext.implicits._ val df = peopleRDD.toDF() df.registerTempTable("people") df.show() df.printSchema() } }
5、总结:
Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。
DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。
时间: 2024-10-23 20:00:27