一. Spark SQL和SchemaRDD
关于Spark SQL的前生就不再多说了,我们只关注它的操作。但是,首先要搞明白一个问题,那就是究竟什么是SchemaRDD呢?从Spark的Scala API可以知道org.apache.spark.sql.SchemaRDD和class SchemaRDD extends RDD[Row] with SchemaRDDLike,我们可以看到类SchemaRDD继承自抽象类RDD。官方文档的定义是"An RDD of Row objects that has an associated schema. In addition to standard RDD functions, SchemaRDDs can be used in relational queries",直接翻译过来就是"SchemaRDD由行对象组成,行对象拥有一个模式来描述行中每一列的数据类型"。自己认为SchemaRDD就是Spark SQL提供的一种特殊的RDD,主要的目的就是为了SQL查询,因此,在操作的时候就需要把RDD等转换成为SchemaRDD。更加通俗一点,我们可以把SchemaRDD类比为传统关系型数据库中的一张表。
从上图中我们可以看出,Spark SQL可以处理Hive,JSON,Parquet(列式存储格式)等数据格式,也就是说SchemaRDD可以从这些数据格式中进行创建。我们可以通过JDBC/ODBC,Spark Application,Spark Shell等操作Spark SQL,将Spark SQL中的数据读取出来之后就可以通过数据挖掘,数据可视化(Tableau)等进行操作。
二. Spark SQL操作txt文件
首先要说明的是在Spark 1.3中及以后,SchemaRDD改为叫做DataFrame。学习过Python中Pandas类库的人应该对DataFrame非常的了解,直观一点来说,其实就是一张表格。不过,我们一般还把DataFrame叫做SchemaRDD,只是由于Spark API的改变导致Spark SQL的操作也会发生相应的变化。我们实验使用的是Spark 1.3.0版本。
1. 创建SQLContext
根据SparkContext (sc)创建SQLContext,如下所示:
1 val sqlContext = new org.apache.spark.sql.SQLContext(sc) 2 import sqlContext.implicits._
解析:
- 第1行:sc指的是org.apache.spark.SparkContext,当我们运行spark shell时,内置对象sc已经创建,与Java Web中的内置对象比较类似。
- 第2行:把RDD隐式转换成为DataFrame(即SchemaRDD)。
2. 定义case class
我们定义case class如下所示:
1 case class Person(name: String, age: Int)
解析:通过反射来读取case class的参数名字,然后作为列的名字。case class可以嵌套或者包含复杂的数据类型,比如Sequences,Arrays等。
3. 创建DataFrame
创建DataFrame如下所示:
1 val rddPerson = sc.textFile("/home/essex/people.txt").map(_.split(",")).map(p=>Person(p(0), p(1).trim.toInt)).toDF()
解析:
- 通过RDD的Transform过程,我们可以把case class隐式转化成为DataFrame(即addPerson)。
- 文件people.txt中的内容为mechel, 29;andy, 30;jusdin, 19。(这样写是为了排版整齐,其实是每个<name, age>一行)
4. 注册成表
1 rddPerson.registerTempTable("rddTable")
解析:我们将rddPerson在sqlContext中注册成表rddTable。因为注册成表后就可以对表进行操作,比如select,insert,join等。
5. 查询操作
1 sqlContext.sql("select name from rddTable where age >= 13 and age <= 19").map(t =>"name: " + t(0)).collect().foreach(println)
解析:找出年龄在13-19岁之间的姓名。
总结:通过以上步骤,Spark SQL基本操作是首先创建sqlContext并且定义case class,然后通过RDD的Transform过程,把case class隐式转化成为DataFrame,最后将DataFrame在sqlContext中注册成表,我们就可以对表进行操作了。
三. Spark SQL操作parquet文件
四. Spark SQL操作json文件
五. Spark SQL操作JDBC
六. hiveContext详细讲解
七. Spark SQL其它高级操作
参考文献:
[1] Spark SQL深度理解篇:模块实现、代码结构及执行流程总览:http://www.csdn.net/article/2014-07-15/2820658/1
[2] 《Learning Spark》
[3] Spark SQL Programming Guide:http://spark.apache.org/docs/1.0.0/sql-programming-guide.html
[4] Spark SQL小结:http://blog.selfup.cn/657.html
[5] SchemaRDD显示转换和隐式转换:http://www.iteblog.com/archives/1224
[6] Spark SQL之基础应用:http://www.it165.net/database/html/201409/8093.html
[7] Spark SQL中的DataFrame:http://ju.outofmemory.cn/entry/128891
[8] Spark SQL中的数据源:http://blog.javachen.com/2015/04/03/spark-sql-datasource.html
[9] Spark SQL应用样例:http://blog.itpub.net/10037372/viewspace-1449008/