Spark SQL操作详细讲解

一. Spark SQL和SchemaRDD

关于Spark SQL的前生就不再多说了,我们只关注它的操作。但是,首先要搞明白一个问题,那就是究竟什么是SchemaRDD呢?从Spark的Scala API可以知道org.apache.spark.sql.SchemaRDD和class SchemaRDD extends RDD[Row] with SchemaRDDLike,我们可以看到类SchemaRDD继承自抽象类RDD。官方文档的定义是"An RDD of Row objects that has an associated schema. In addition to standard RDD functions, SchemaRDDs can be used in relational queries",直接翻译过来就是"SchemaRDD由行对象组成,行对象拥有一个模式来描述行中每一列的数据类型"。自己认为SchemaRDD就是Spark SQL提供的一种特殊的RDD,主要的目的就是为了SQL查询,因此,在操作的时候就需要把RDD等转换成为SchemaRDD。更加通俗一点,我们可以把SchemaRDD类比为传统关系型数据库中的一张表。

从上图中我们可以看出,Spark SQL可以处理Hive,JSON,Parquet(列式存储格式)等数据格式,也就是说SchemaRDD可以从这些数据格式中进行创建。我们可以通过JDBC/ODBC,Spark Application,Spark Shell等操作Spark SQL,将Spark SQL中的数据读取出来之后就可以通过数据挖掘,数据可视化(Tableau)等进行操作。

二. Spark SQL操作txt文件

首先要说明的是在Spark 1.3中及以后,SchemaRDD改为叫做DataFrame。学习过Python中Pandas类库的人应该对DataFrame非常的了解,直观一点来说,其实就是一张表格。不过,我们一般还把DataFrame叫做SchemaRDD,只是由于Spark API的改变导致Spark SQL的操作也会发生相应的变化。我们实验使用的是Spark 1.3.0版本。

1. 创建SQLContext

根据SparkContext (sc)创建SQLContext,如下所示:

1 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
2 import sqlContext.implicits._

解析:

  • 第1行:sc指的是org.apache.spark.SparkContext,当我们运行spark shell时,内置对象sc已经创建,与Java Web中的内置对象比较类似。
  • 第2行:把RDD隐式转换成为DataFrame(即SchemaRDD)。

2. 定义case class

我们定义case class如下所示:

1 case class Person(name: String, age: Int)

解析:通过反射来读取case class的参数名字,然后作为列的名字。case class可以嵌套或者包含复杂的数据类型,比如Sequences,Arrays等。

3. 创建DataFrame

创建DataFrame如下所示:

1 val rddPerson = sc.textFile("/home/essex/people.txt").map(_.split(",")).map(p=>Person(p(0), p(1).trim.toInt)).toDF()

解析:

  • 通过RDD的Transform过程,我们可以把case class隐式转化成为DataFrame(即addPerson)。
  • 文件people.txt中的内容为mechel, 29;andy, 30;jusdin, 19。(这样写是为了排版整齐,其实是每个<name, age>一行)

4. 注册成表

1 rddPerson.registerTempTable("rddTable")

解析:我们将rddPerson在sqlContext中注册成表rddTable。因为注册成表后就可以对表进行操作,比如select,insert,join等。

5. 查询操作

1 sqlContext.sql("select name from rddTable where age >= 13 and age <= 19").map(t =>"name: " + t(0)).collect().foreach(println)

解析:找出年龄在13-19岁之间的姓名。

总结:通过以上步骤,Spark SQL基本操作是首先创建sqlContext并且定义case class,然后通过RDD的Transform过程,把case class隐式转化成为DataFrame,最后将DataFrame在sqlContext中注册成表,我们就可以对表进行操作了。

三. Spark SQL操作parquet文件

四. Spark SQL操作json文件

五. Spark SQL操作JDBC

六. hiveContext详细讲解

七. Spark SQL其它高级操作

参考文献:

[1] Spark SQL深度理解篇:模块实现、代码结构及执行流程总览:http://www.csdn.net/article/2014-07-15/2820658/1
[2] 《Learning Spark》
[3] Spark SQL Programming Guide:http://spark.apache.org/docs/1.0.0/sql-programming-guide.html
[4] Spark SQL小结:http://blog.selfup.cn/657.html
[5] SchemaRDD显示转换和隐式转换:http://www.iteblog.com/archives/1224
[6] Spark SQL之基础应用:http://www.it165.net/database/html/201409/8093.html
[7] Spark SQL中的DataFrame:http://ju.outofmemory.cn/entry/128891
[8] Spark SQL中的数据源:http://blog.javachen.com/2015/04/03/spark-sql-datasource.html
[9] Spark SQL应用样例:http://blog.itpub.net/10037372/viewspace-1449008/

时间: 2024-10-06 00:41:18

Spark SQL操作详细讲解的相关文章

spark sql 操作

DSL风格语法 1.查看DataFrame中的内容 scala> df1.show +---+--------+---+ | id| name|age| +---+--------+---+ | 1|zhansgan| 16| | 2| lisi| 18| | 3| wangwu| 21| | 4|xiaofang| 22| +---+--------+---+ 2.查看DataFrame部分列的数据 scala> df1.select(df1.col("name")).s

linux 中解压与压缩 常用操作详细讲解

平时有时候 会在服务器进行一些文件的操作,比如安装一些服务与软件等等,都有解压操作,一般在 导出一些简单的服务器文件,也是先压缩后再导出,因此,在这里根据平时用到解压与压缩命令的频率来记录下: 1.最常用的当属 tar 命令了,(常针对于 tar.gz 文件) 压缩 : tar -zcvf [被压缩后的文件名] [目录或者文件] eg: tar zcvf redis.tar.gz  redis-2.8.12 解压 :tar -zxvf  [压缩包的文件名] eg: tar zxvf redis-

Spark SQL之External DataSource外部数据源(二)源码分析

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

第十一篇:Spark SQL 源码分析之 External DataSource外部数据源

上周Spark1.2刚发布,周末在家没事,把这个特性给了解一下,顺便分析下源码,看一看这个特性是如何设计及实现的. /** Spark SQL源码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)示例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了External

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte

Spark SQL下的Parquet使用最佳实践和代码实战

一:Spark SQL下的Parquet使用最佳实践 1,过去整个业界对大数据的分析的技术栈的Pipeline一般分为一下两种方式: A)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL) -> HDFS Parquet -> SparkSQL/impala -> Result Service(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用): B)Data Source -> Real time update

Spark SQL with Hive

前一篇文章是Spark SQL的入门篇Spark SQL初探,介绍了一些基础知识和API,但是离我们的日常使用还似乎差了一步之遥. 终结Shark的利用有2个: 1.和Spark程序的集成有诸多限制 2.Hive的优化器不是为Spark而设计的,计算模型的不同,使得Hive的优化器来优化Spark程序遇到了瓶颈. 这里看一下Spark SQL 的基础架构: Spark1.1发布后会支持Spark SQL CLI , Spark SQL的CLI会要求被连接到一个Hive Thrift Server

Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南

Spark版本:1.6.2 概览 Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化.可以通过SQL.DataFrames API.Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理.用户可以根据自己喜好,在不同API中选择合适的进行处理.本章中所有用例均可以在spark-shell.pyspark shel

Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

/** Spark SQL源代码分析系列文章*/ 接上一篇文章Spark SQL Catalyst源代码分析之Physical Plan.本文将介绍Physical Plan的toRDD的详细实现细节: 我们都知道一段sql,真正的运行是当你调用它的collect()方法才会运行Spark Job,最后计算得到RDD. lazy val toRdd: RDD[Row] = executedPlan.execute() Spark Plan基本包括4种操作类型,即BasicOperator基本类型