Spark SQL with Hive

前一篇文章是Spark SQL的入门篇Spark SQL初探,介绍了一些基础知识和API,但是离我们的日常使用还似乎差了一步之遥。

终结Shark的利用有2个:

1、和Spark程序的集成有诸多限制

2、Hive的优化器不是为Spark而设计的,计算模型的不同,使得Hive的优化器来优化Spark程序遇到了瓶颈。

这里看一下Spark SQL 的基础架构:

Spark1.1发布后会支持Spark SQL CLI , Spark SQL的CLI会要求被连接到一个Hive Thrift Server上,来实现类似hive shell的功能。(ps:目前git里面的branch-1.0-jdbc。目前还没有正式release,我测了一下午,发现还是有bug的,耐心等待release吧!)

本着研究的心态,想和Hive环境集成一下,在spark shell里执行hive的语句。

一、编译Spark支持Hive

让Spark支持Hive有2种sbt编译方式:

1、sbt前加变量名

SPARK_HADOOP_VERSION=0.20.2-cdh3u5 SPARK_HIVE=true sbt/sbt assembly

2、修改project/SparkBuild.scala文件

val DEFAULT_HADOOP_VERSION = "0.20.2-cdh3u5"
val DEFAULT_HIVE = true 

然后执行sbt/sbt assembly

二、Spark SQL 操作Hive

前置:hive可用,并且在Spark-env.sh下,需要将Hive的conf和Hadoop的conf配到CLASSPATH里。

启动spark-shell

[[email protected] spark]# bin/spark-shell --master spark://10.1.8.210:7077 --driver-class-path /app/hadoop/hive-0.11.0-bin/lib/mysql-connector-java-5.1.13-bin.jar:/app/hadoop/hive-0.11.0-bin/lib/hadoop-lzo-0.4.15.jar

导入HiveContext

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext: org.apache.spark.sql.hive.HiveContext = [email protected]

scala> import hiveContext._
import hiveContext._

hiveContext里提供了一个执行sql的函数 hql(string text)

去hive里show databases. 这里Spark会parse hql 然后生成Query Plan。但是这里不会执行查询,只有调用collect的时候才会执行。

scala> val show_databases = hql("show databases")
14/07/09 19:59:09 INFO storage.BlockManager: Removing broadcast 0
14/07/09 19:59:09 INFO storage.BlockManager: Removing block broadcast_0
14/07/09 19:59:09 INFO parse.ParseDriver: Parsing command: show databases
14/07/09 19:59:09 INFO parse.ParseDriver: Parse Completed
14/07/09 19:59:09 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/09 19:59:09 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/09 19:59:09 INFO analysis.Analyzer: Max iterations (2) reached for batch Check Analysis
14/07/09 19:59:09 INFO storage.MemoryStore: Block broadcast_0 of size 393044 dropped from memory (free 308713881)
14/07/09 19:59:09 INFO broadcast.HttpBroadcast: Deleted broadcast file: /tmp/spark-c29da0f8-c5e3-4fbf-adff-9aa77f9743b2/broadcast_0
14/07/09 19:59:09 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange
14/07/09 19:59:09 INFO sql.SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions
14/07/09 19:59:09 INFO spark.ContextCleaner: Cleaned broadcast 0
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=Driver.run>
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=TimeToSubmit>
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=compile>
14/07/09 19:59:09 INFO exec.ListSinkOperator: 0 finished. closing...
14/07/09 19:59:09 INFO exec.ListSinkOperator: 0 forwarded 0 rows
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=parse>
14/07/09 19:59:09 INFO parse.ParseDriver: Parsing command: show databases
14/07/09 19:59:09 INFO parse.ParseDriver: Parse Completed
14/07/09 19:59:09 INFO ql.Driver: </PERFLOG method=parse start=1404907149927 end=1404907149928 duration=1>
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=semanticAnalyze>
14/07/09 19:59:09 INFO ql.Driver: Semantic Analysis Completed
14/07/09 19:59:09 INFO ql.Driver: </PERFLOG method=semanticAnalyze start=1404907149928 end=1404907149977 duration=49>
14/07/09 19:59:09 INFO exec.ListSinkOperator: Initializing Self 0 OP
14/07/09 19:59:09 INFO exec.ListSinkOperator: Operator 0 OP initialized
14/07/09 19:59:09 INFO exec.ListSinkOperator: Initialization Done 0 OP
14/07/09 19:59:09 INFO ql.Driver: Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:database_name, type:string, comment:from deserializer)], properties:null)
14/07/09 19:59:09 INFO ql.Driver: </PERFLOG method=compile start=1404907149925 end=1404907149980 duration=55>
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=Driver.execute>
14/07/09 19:59:09 INFO ql.Driver: Starting command: show databases
14/07/09 19:59:09 INFO ql.Driver: </PERFLOG method=TimeToSubmit start=1404907149925 end=1404907149980 duration=55>
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=runTasks>
14/07/09 19:59:09 INFO ql.Driver: <PERFLOG method=task.DDL.Stage-0>
14/07/09 19:59:09 INFO metastore.HiveMetaStore: 0: get_all_databases
14/07/09 19:59:09 INFO HiveMetaStore.audit: ugi=root    ip=unknown-ip-addr      cmd=get_all_databases
14/07/09 19:59:09 INFO exec.DDLTask: results : 1
14/07/09 19:59:10 INFO ql.Driver: </PERFLOG method=task.DDL.Stage-0 start=1404907149980 end=1404907150032 duration=52>
14/07/09 19:59:10 INFO ql.Driver: </PERFLOG method=runTasks start=1404907149980 end=1404907150032 duration=52>
14/07/09 19:59:10 INFO ql.Driver: </PERFLOG method=Driver.execute start=1404907149980 end=1404907150032 duration=52>
14/07/09 19:59:10 INFO ql.Driver: OK
14/07/09 19:59:10 INFO ql.Driver: <PERFLOG method=releaseLocks>
14/07/09 19:59:10 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404907150033 end=1404907150033 duration=0>
14/07/09 19:59:10 INFO ql.Driver: </PERFLOG method=Driver.run start=1404907149925 end=1404907150033 duration=108>
14/07/09 19:59:10 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/09 19:59:10 INFO ql.Driver: <PERFLOG method=releaseLocks>
14/07/09 19:59:10 INFO ql.Driver: </PERFLOG method=releaseLocks start=1404907150037 end=1404907150037 duration=0>
show_databases: org.apache.spark.sql.SchemaRDD =
SchemaRDD[16] at RDD at SchemaRDD.scala:100
== Query Plan ==
<Native command: executed by Hive>

执行查询计划:

scala> show_databases.collect()
14/07/09 20:00:44 INFO spark.SparkContext: Starting job: collect at SparkPlan.scala:52
14/07/09 20:00:44 INFO scheduler.DAGScheduler: Got job 2 (collect at SparkPlan.scala:52) with 1 output partitions (allowLocal=false)
14/07/09 20:00:44 INFO scheduler.DAGScheduler: Final stage: Stage 2(collect at SparkPlan.scala:52)
14/07/09 20:00:44 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/07/09 20:00:44 INFO scheduler.DAGScheduler: Missing parents: List()
14/07/09 20:00:44 INFO scheduler.DAGScheduler: Submitting Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52), which has no missing parents
14/07/09 20:00:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 2 (MappedRDD[20] at map at SparkPlan.scala:52)
14/07/09 20:00:44 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
14/07/09 20:00:44 INFO scheduler.TaskSetManager: Starting task 2.0:0 as TID 9 on executor 0: web01.dw (PROCESS_LOCAL)
14/07/09 20:00:44 INFO scheduler.TaskSetManager: Serialized task 2.0:0 as 1511 bytes in 0 ms
14/07/09 20:00:45 INFO scheduler.DAGScheduler: Completed ResultTask(2, 0)
14/07/09 20:00:45 INFO scheduler.TaskSetManager: Finished TID 9 in 12 ms on web01.dw (progress: 1/1)
14/07/09 20:00:45 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
14/07/09 20:00:45 INFO scheduler.DAGScheduler: Stage 2 (collect at SparkPlan.scala:52) finished in 0.014 s
14/07/09 20:00:45 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.020520428 s
res5: Array[org.apache.spark.sql.Row] = Array([default])

返回default数据库。

同样的执行:show tables

scala> hql("show tables").collect()
14/07/09 20:01:28 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
14/07/09 20:01:28 INFO scheduler.DAGScheduler: Stage 3 (collect at SparkPlan.scala:52) finished in 0.013 s
14/07/09 20:01:28 INFO spark.SparkContext: Job finished: collect at SparkPlan.scala:52, took 0.019173851 s
res7: Array[org.apache.spark.sql.Row] = Array([item], [src])

理论上是支持HIVE所有的操作,包括UDF。

PS:遇到的问题:

Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "BoneCP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.

解决办法:就是我上面启动的时候带上sql-connector的路径。。

三、总结:

Spark SQL 兼容了Hive的大部分语法和UDF,但是在处理查询计划的时候,使用了Catalyst框架进行优化,优化成适合Spark编程模型的执行计划,使得效率上高出hive很多。由于Spark1.1暂时还未发布,目前还存在bug,等到稳定版发布了再继续测试了。

全文完:)

原创文章,转载请注明出自:http://blog.csdn.net/oopsoom/article/details/37603261

Spark SQL with Hive

时间: 2024-08-13 12:58:34

Spark SQL with Hive的相关文章

spark sql on hive初探

前一段时间由于shark项目停止更新,sql on spark拆分为两个方向,一个是spark sql on hive,另一个是hive on spark.hive on spark达到可用状态估计还要等很久的时间,所以打算试用下spark sql on hive,用来逐步替代目前mr on hive的工作. 当前试用的版本是spark1.0.0,如果要支持hive,必须重新进行编译,编译的命令有所变化 export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M

第57课:Spark SQL on Hive配置及实战

1,首先需要安装hive,参考http://lqding.blog.51cto.com/9123978/1750967 2,在spark的配置目录下添加配置文件,让Spark可以访问hive的metastore. [email protected]:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf# vi hive-site.xml <configuration> <property>   <name>hive.metast

Spark SQL on HIVE

1. SPARK CONF中添加hive-site.xml hive.metastore.uris thrift://master:9083 2. 启动hive元数据 hive --metastore >meta.log 2>&1 & 3. scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) //hiveContext scala>hiveContext.sql("us

spark sql 查询hive表并写入到PG中

import java.sql.DriverManager import java.util.Properties import com.zhaopin.tools.{DateUtils, TextUtils} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession /** * Created by xiaoyan on 2018/5/21. */ object IhrDownloadPg

spark sql 访问hive数据时找不mysql的解决方法

我尝试着在classpath中加n入mysql的驱动仍不行 解决方法:在启动的时候加入参数--driver-class中加入mysql 驱动 [[email protected] spark-1.0.1-bin-hadoop2]$ bin/spark-shell --driver-class-path lib/mysql-connector-java-5.1.30-bin.jar 总结:1.spark的版本必须编译的时候加上了hive 1.0.0预编译版没有加入hive  1.0.1是含有hiv

Spark SQL Hive Support Demo

前提: 1.spark1.0的包编译时指定支持hive:./make-distribution.sh --hadoop 2.3.0-cdh5.0.0 --with-yarn --with-hive --tgz 2.安装完spark1.0: 3.安装与hadoop对应的CDH版本的hive: Spark SQL 支持Hive案例: 1.将hive-site.xml配置文件拷贝到$SPARK_HOME/conf下 hive-site.xml文件内容形如: <?xml version="1.0&

Spark SQL CLI 实现分析

背景 本文主要介绍了Spark SQL里目前的CLI实现,代码之后肯定会有不少变动,所以我关注的是比较核心的逻辑.主要是对比了Hive CLI的实现方式,比较Spark SQL在哪块地方做了修改,哪些地方与Hive CLI是保持一致的.可以先看下总结一节里的内容. Spark SQL的hive-thriftserver项目里是其CLI实现代码,下面先说明Hive CLI的主要实现类和关系,再说明Spark SQL CLI的做法. Hive CLI 核心启动类是org.apache.hive.se

spark sql简单示例

运行环境 集群环境:CDH5.3.0 具体JAR版本如下: spark版本:1.2.0-cdh5.3.0 hive版本:0.13.1-cdh5.3.0 hadoop版本:2.5.0-cdh5.3.0 spark sql的JAVA版简单示例 spark sql直接查询JSON格式的数据 spark sql的自定义函数 spark sql查询hive上面的表 import java.util.ArrayList; import java.util.List; import org.apache.sp

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio