sparkSQL1.1入门之六:sparkSQL之基础应用

sparkSQL1.1对数据的查询分成了2个分支:sqlContext 和 hiveContext。

在sqlContext中,sparkSQL可以使用SQL-92语法对定义的表进行查询,表的源数据可以来自:

  • RDD
  • parquet文件
  • json文件

在hiveContext中,sparkSQL可以使用HQL语法,对hive数据进行查询,sparkSQL1.1支持hive0.12的HQL语法;如果遇上不支持的语法,用户可以通过更改配置切换到sql语法。笔者猜测,从spark1.1开始,将打开sqlContext和hiveContext之间的壁垒,混用sqlContext和hiveContext中定义的表。本文因时间关系,尚未对该特性做测试,等spark1.1正式版发布,再专门讨论一下这个问题。另外,在hiveContext中,hql()将被弃用,sql()将代替hql()来提交查询语句。

为了方便演示,我们在spark-shell里面进行下列演示,并加以说明。首先,启动spark集群,然后在客户端wy上启动spark-shell:

bin/spark-shell --master spark://hadoop1:7077 --executor-memory 3g

1:sqlContext基础应用

sqlContext先将外部读入的数据转换成SchemaRDD,然后注册成表,才能进行表的操作。要使用sqlContext,首先要引入sqlContext库及其隐式函数:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

1.1:RDD

要将RDD转换成sqlContext中的table,首先要定义case class,在RDD的transform过程中使用case class可以隐式转化成SchemaRDD,然后再使用registerAsTable注册成表。注册成表后就可以在sqlContext对表进行操作,如select 、insert、join等。注意,case class可以是嵌套的,也可以使用类似Sequences 或 Arrays之类复杂的数据类型。

下面的例子是定义一个符合数据文件/sparksql/people.txt类型的case clase(Person),然后将数据文件读入后隐式转换成SchemaRDD:people,并将people在sqlContext中注册成表rddTable,最后对表进行查询,找出年纪在13-19岁之间的人名。

/sparksql/people.txt的内容有3行:

运行下列代码:

//RDD演示
case class Person(name:String,age:Int)
val rddpeople=sc.textFile("/sparksql/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
rddpeople.registerAsTable("rddTable")

sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

运行结果:

1.2:parquet文件

同样得,sqlContext可以读取parquet文件,由于parquet文件中保留了schema的信息,所以不需要使用case class来隐式转换。sqlContext读入parquet文件后直接转换成SchemaRDD,也可以将SchemaRDD保存成parquet文件格式。

我们先将上面建立的SchemaRDD:people保存成parquet文件:

rddpeople.saveAsParquetFile("/sparksql/people.parquet")

运行后/sparksql/目录下就多出了一个名称为people.parquet的目录:

然后,将people.parquet读入,注册成表parquetTable,查询年纪大于25岁的人名:

//parquet演示
val parquetpeople = sqlContext.parquetFile("/sparksql/people.parquet")
parquetpeople.registerAsTable("parquetTable")

sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)

运行结果:

1.3:json文件

sparkSQL1.1开始提供对json文件格式的支持,这意味着开发者可以使用更多的数据源,如鼎鼎大名的NOSQL数据库MongDB等。sqlContext可以从jsonFile或jsonRDD获取schema信息,来构建SchemaRDD,注册成表后就可以使用。

  • jsonFile - 加载JSON文件目录中的数据,文件的每一行是一个JSON对象。
  • jsonRdd - 从现有的RDD加载数据,其中RDD的每个元素包含一个JSON对象的字符串。

下面的例子读入一个json文件/sparksql/people.json,注册成jsonTable,并查询年纪大于25岁的人名。

/sparksql/people.json的内容:

运行下面代码:

//json演示
val jsonpeople = sqlContext.jsonFile("/sparksql/people.json")
jsonpeople.registerAsTable("jsonTable")

sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)

运行结果:

</pre><img src="http://img.blog.csdn.net/20140910090615849?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYm9va19tbWlja3k=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" alt="" /></div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;">2:hiveContext基础应用</div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;">      使用hiveContext之前首先要确认以下两点:</div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;"><ul style="margin: 5px 0px 5px 40px; padding: 0px;"><li>使用的Spark是支持hive</li><li>hive的配置文件hive-site.xml已经存在conf目录中</li></ul>      前者可以查看lib目录下是否存在以datanucleus开头的3个JAR来确定,后者注意是否在hive-site.xml里配置了uris来访问hive metastore。</div><div style="line-height: 28px; font-family: 'Hiragino Sans GB W3', 'Hiragino Sans GB', Arial, Helvetica, simsun, 宋体; font-size: 16px;"><div></div><div>要使用hiveContext,需要先构建hiveContext:</div><div><pre name="code" class="html">val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

然后就可以对hive数据进行操作了,下面我们将使用hive中的销售数据(第五小结中的hive数据),首先切换数据库到saledata并查看有几个表:

hiveContext.sql("use saledata")
hiveContext.sql("show tables").collect().foreach(println)

可以看到有在第五小节定义的3个表:

现在查询一下所有订单中每年的销售单数、销售总额:

//所有订单中每年的销售单数、销售总额
//三个表连接后以count(distinct a.ordernumber)计销售单数,sum(b.amount)计销售总额
hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)

运行结果:

再做一个稍微复杂点的查询,求出所有订单每年最大金额订单的销售额:

/************************
所有订单每年最大金额订单的销售额:
第一步,先求出每份订单的销售额以其发生时间
select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber
第二步,以第一步的查询作为子表,和表tblDate连接,求出每年最大金额订单的销售额
select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear
*************************/

hiveContext.sql("select c.theyear,max(d.sumofamount) from tbldate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d  on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)

运行结果:

最后做一个更复杂的查询,求出所有订单中每年最畅销货品:

/************************
所有订单中每年最畅销货品:
第一步:求出每年每个货品的销售金额
select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid

第二步:求出每年单品销售的最大金额
select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear

第三步:求出每年与销售额最大相符的货品就是最畅销货品
select distinct  e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear
*************************/

hiveContext.sql("select distinct  e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)

运行结果:

3:混合使用

在sqlContext和hiveContext中各自来源于不同数据源的表可以混用,但是sqlContext和hiveContext之间目前尚不能混合使用。

sqlContext中混合使用:

//sqlContext中混合使用
//sqlContext中来自rdd的表rddTable和来自parquet文件的表parquetTable混合使用
sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)

运行结果:

hiveContext中混合使用:

//hiveContext中混合使用
//创建一个hiveTable,并将数据加载,注意people.txt第二列有空格,所以age取string类型
hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' ")
hiveContext.sql("LOAD DATA LOCAL INPATH '/home/mmicky/mboo/MyClass/doc/sparkSQL/data/people.txt' INTO TABLE hiveTable")

//创建一个源自parquet文件的表parquetTable2,然后和hiveTable混合使用
hiveContext.parquetFile("/sparksql/people.parquet").registerAsTable("parquetTable2")
hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)

运行结果:

但是sqlContext中定义的表不能和hiveContext中定义的表混合使用。

4:缓存之使用

sparkSQL的cache可以使用两种方法来实现:

  • cacheTable()方法
  • CACHE TABLE命令

千万不要先使用cache SchemaRDD,然后registerAsTable ;使用RDD的cache()将使用原生态的cache,而不是针对SQL优化后的内存列存储。看看cacheTable的源代码:

在默认的情况下,内存列存储的压缩功能是关闭的,要使用压缩功能需要配置变量COMPRESS_CACHED。

在sqlContext里可以如下使用cache:

//sqlContext的cache使用
sqlContext.cacheTable("rddTable")
sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

sqlContext.sql("CACHE TABLE parquetTable")
sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

观察webUI,可以看到cache的信息。(注意cache是lazy的,要有action才会实现;uncache是eager的,可以立即实现)

使用如下命令可以取消cache:

sqlContext.uncacheTable("rddTable")
sqlContext.sql("UNCACHE TABLE parquetTable")

同样的,在hiveContext也可以使用上面的方法cache或uncache。

5:DSL之使用

sparkSQL除了支持HiveQL和SQL-92语法外,还支持DSL(Domain Specific Language)。在DSL中,使用scala符号‘+标示符表示基础表中的列,spark的execution engine会将这些标示符隐式转换成表达式。另外可以在API中找到很多DSL相关的方法,如where()、select()、limit()等等,详细资料可以查看catalyst模块中的dsl子模块,下面为其中定义几种常用方法:

关于DSL的使用,随便举个例子,结合DSL方法,很容易上手:

//DSL演示
val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name)
teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)

6:Tips

上面介绍了sparkSQL的基础应用,sparkSQL还在高速发展中,存在者不少缺陷,如:

  • scala2.10.4本身对case class有22列的限制,在使用RDD数据源的时候就会造成不方便;
  • sqlContext中3个表不能同时join,需要两两join后再join一次;
  • sqlContext中不能直接使用values插入数据;
  • 。。。

总的来说,hiveContext还是令人满意,sqlContext就有些差强人意了。另外,顺便提一句,在编写sqlContext应用程序的时候,case class要定义在object之外。

最近将在炼数成金开课Spark大数据快速计算平台(第三期),本资料为新课素材。

时间: 2024-10-27 05:34:22

sparkSQL1.1入门之六:sparkSQL之基础应用的相关文章

sparkSQL1.1入门之一:为什么sparkSQL

2014年9月11日,Spark1.1.0忽然之间发布.笔者立即下载.编译.部署了Spark1.1.0.关于Spark1.1的编译和部署,请参看笔者博客Spark1.1.0 源码编译和部署包生成 . Spark1.1.0中变化较大是sparkSQL和MLlib,sparkSQL1.1.0主要的变动有: 增加了JDBC/ODBC Server(ThriftServer),用户可以在应用程序中连接到SparkSQL并使用其中的表和缓存表. 增加了对JSON文件的支持 增加了对parquet文件的本地

sparkSQL1.1入门之十:总结

回顾一下,在前面几章中,就sparkSQL1.1.0基本概念.运行架构.基本操作和实用工具做了基本介绍. 基本概念: SchemaRDD Rule Tree LogicPlan Parser Analyzer Optimizer SparkPlan 运行架构: sqlContext运行架构 hiveContext运行架构 基本操作 原生RDD的操作 parquet文件的操作 json文件的操作 hive数据的操作 和其他spark组件混合使用 实用工具 hive/console的操作 CLI的配

转 Python爬虫入门二之爬虫基础了解

静觅 » Python爬虫入门二之爬虫基础了解 2.浏览网页的过程 在用户浏览网页的过程中,我们可能会看到许多好看的图片,比如 http://image.baidu.com/ ,我们会看到几张的图片以及百度搜索框,这个过程其实就是用户输入网址之后,经过DNS服务器,找到服务器主机,向服务器发出一个请求,服务器经过解析之后,发送给用户的浏览器 HTML.JS.CSS 等文件,浏览器解析出来,用户便可以看到形形色色的图片了. 因此,用户看到的网页实质是由 HTML 代码构成的,爬虫爬来的便是这些内容

WCF入门教程:WCF基础知识问与答(转)

学习WCF已有近两年的时间,其间又翻译了Juval的大作<Programming WCF Services>,我仍然觉得WCF还有更多的内容值得探索与挖掘.学得越多,反而越发觉得自己所知太少,直到现在,我也认为自己不过是初窥WCF的门径而已. 学以致用”,如果仅仅是希望能够在项目中合理地应用WCF,那么对于程序员而言,可以有两种选择,一种是“知其然而不知其所以然”,只要掌握了WCF的基础知识,那么对于一般的应用就足够了.要做到这一点就很容易了,微软秉承了一贯的方式,将WCF这门技术优雅地呈现给

[转载]WPF入门教程系列一——基础

一. 前言   最近在学习WPF,学习WPF首先上的是微软的MSDN,然后再搜索了一下网络有关WPF的学习资料.为了温故而知新把学习过程记录下来,以备后查.这篇主要讲WPF的开发基础,介绍了如何使用Visual Studio 2013创建一个WPF应用程序. 首先说一下学习WPF的基础知识: 1) 要会一门.NET所支持的编程语言.例如C#. 2) 会一点“标准通用标记语言”:WPF窗体程序使用的XAML语言,也属于“标准通用标记语言”的一个分支.如果以前接触过XML.HTML.XHTML.AS

Kotlin快速入门(一)基础

Kotlin快速入门(一)基础 Kotlin学习笔记,主要记录与Java不同的地方. 1 基本类型 1.1 数字 1)数字没有隐式扩宽转换 val b: Byte = 1 // OK, 字面值是静态检测的 val i: Int = b // 错误 但我们可以显示转换 val i: Int = b.toInt() 上下文推断转换类型是可以的 val l = 1L + 3 // Long + Int => Long 2)Kotlin中字符不是数字,但字符可以显示转换为数字 fun check(c:

【慕课网】php工程师学习计划之我的学习笔记——01 入门必学web基础 htmlcss基础课程 篇

为了进一步学习PHP,本周我选定了慕课网的PHP工程师学习计划, 从今天2015-07-06 10:24:47开始从头学习:计划本周尽快学习完成本课程,谨此作为笔记. 有个好的学习计划和思路非常非常重要,非常感谢慕课网提供本套学习计划,希望更多地学习平台能提供像这样全面一条龙学习思路清晰地教程. 计划图:链接 我的学习状况:2015-07-06 10:29:46 开始随记: php工程师学习计划笔记——01 入门必学web基础 htmlcss基础课程 篇 入门篇: text-align:cent

编程入门:C语言基础知识全网超全不用到处找了!(文末附清单)

你背或者不背,干货就在那里,不悲不喜 你学或者不学,编程就在那里,不来不去 听到这话的你是否略感扎心? 编程入门:C语言基础知识全网超全不用到处找了!(文末附清单)01基础知识 计算机系统的主要技术指标与系统配置. 计算机系统.硬件.软件及其相互关系. 微机硬件系统的基本组成. 包括:中央处理器(运算器与控制器),内存储器(RAM与ROM),外存储器(硬盘.软盘与光盘),输入设备(键盘与鼠标)输出设备(显示器与打印机).如果大家如果在自学遇到困难,想找一个C++的学习环境,可以加入我们的C++学

Nodejs入门到精通(基础就业)

Nodejs入门到精通(基础就业) 下载链接:https://www.yinxiangit.com/180.html 原文地址:https://www.cnblogs.com/bingerger/p/11629017.html