spark sql on hive初探

前一段时间由于shark项目停止更新,sql on spark拆分为两个方向,一个是spark sql on hive,另一个是hive on spark。hive on spark达到可用状态估计还要等很久的时间,所以打算试用下spark sql on hive,用来逐步替代目前mr on hive的工作。

当前试用的版本是spark1.0.0,如果要支持hive,必须重新进行编译,编译的命令有所变化

export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
mvn -Pyarn -Phive -Dhadoop.version=2.3.0-cdh5.0.0 -DskipTests clean package

写了段比较简单的代码

    val conf = new SparkConf().setAppName("SqlOnHive")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)
    import hiveContext._
    hql("FROM tmp.test SELECT id limit 1").foreach(println)

编译后export出jar文件,使用standalone模式,故采用java -cp的方式提交,提交之前需要将hive-site.xml文件copy到$SPARK_HOME/conf目录下

    java -XX:PermSize=256M -cp /home/hadoop/hql.jar com.yintai.spark.sql.SqlOnHive spark://h031:7077

提交后会报异常

java.lang.RuntimeException: Error in configuring object
	at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
	at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
	at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
	at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:155)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:187)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:181)
	at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:93)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
	at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.Task.run(Task.scala:51)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
	at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
	... 27 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
	at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)
	at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)
	at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
	... 32 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1801)
	at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)
	... 34 more

解决办法是需要设置相关的环境变量,在spark-env.sh中设置

    SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/path/to/your/hadoop-lzo/libs/native
    SPARK_CLASSPATH=$SPARK_CLASSPATH:/path/to/your/hadoop-lzo/java/libs

修改过环境变量之后重新提交,继续报错

14/07/23 10:25:19 ERROR RetryingHMSHandler: NoSuchObjectException(message:There is no database named tmp)
        at org.apache.hadoop.hive.metastore.ObjectStore.getMDatabase(ObjectStore.java:431)
        at org.apache.hadoop.hive.metastore.ObjectStore.getDatabase(ObjectStore.java:441)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.hive.metastore.RetryingRawStore.invoke(RetryingRawStore.java:124)
        at com.sun.proxy.$Proxy9.getDatabase(Unknown Source)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_database(HiveMetaStore.java:628)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103)
        at com.sun.proxy.$Proxy10.get_database(Unknown Source)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:810)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
        at com.sun.proxy.$Proxy11.getDatabase(Unknown Source)
        at org.apache.hadoop.hive.ql.metadata.Hive.getDatabase(Hive.java:1139)
        at org.apache.hadoop.hive.ql.metadata.Hive.databaseExists(Hive.java:1128)
        at org.apache.hadoop.hive.ql.exec.DDLTask.switchDatabase(DDLTask.java:3479)
        at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:237)
        at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65)
        at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414)
        at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
        at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:185)
        at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160)
        at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:249)
        at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:246)
        at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85)
        at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90)
        at com.yintai.spark.sql.SqlOnHive$.main(SqlOnHive.scala:20)
        at com.yintai.spark.sql.SqlOnHive.main(SqlOnHive.scala)

14/07/23 10:25:19 ERROR DDLTask: org.apache.hadoop.hive.ql.metadata.HiveException: Database does not exist: tmp
        at org.apache.hadoop.hive.ql.exec.DDLTask.switchDatabase(DDLTask.java:3480)
        at org.apache.hadoop.hive.ql.exec.DDLTask.execute(DDLTask.java:237)
        at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:151)
        at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:65)
        at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1414)
        at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1192)
        at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1020)
        at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888)
        at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:185)
        at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:160)
        at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:249)
        at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:246)
        at org.apache.spark.sql.hive.HiveContext.hiveql(HiveContext.scala:85)
        at org.apache.spark.sql.hive.HiveContext.hql(HiveContext.scala:90)
        at com.yintai.spark.sql.SqlOnHive$.main(SqlOnHive.scala:20)
        at com.yintai.spark.sql.SqlOnHive.main(SqlOnHive.scala)

造成这个错误的原因就是spark程序无法加载到hive-site.xml,从而无法获取到远程metastore服务的地址,只能在本地的derby数据库中查找,自然找不到相关库表的元数据信息。spark sql实际上是通过实例化HiveConf类来加载hive-site.xml文件的,这个跟hive cli的方式是一致的,代码如下

    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    if (classLoader == null) {
      classLoader = HiveConf.class.getClassLoader();
    }
    hiveDefaultURL = classLoader.getResource("hive-default.xml");
    // Look for hive-site.xml on the CLASSPATH and log its location if found.
    hiveSiteURL = classLoader.getResource("hive-site.xml");

使用java -cp提交的方式不能正确的设置环境变量,在1.0.0版本中,新增了使用spark-submit脚本进行提交的方式,后来改为使用此脚本来提交

/usr/lib/spark/bin/spark-submit --class com.yintai.spark.sql.SqlOnHive     --master spark://h031:7077     --executor-memory 1g     --total-executor-cores 1     /home/hadoop/hql.jar

这个脚本在提交过程中会设置SparkConf中的spark.executor.extraClassPath和spark.driver.extraClassPath属性,从而保证可以正确加载到所需的配置文件,到此测试成功。

目前spark sql on hive兼容了大部分hive的语法和UDF,在SQL解析的时候使用了Catalyst框架,作业在运行效率上高出hive很多,不过目前的版本还存在一些BUG,稳定性上会有一些问题,需要等到新的稳定版发布再进行进一步的测试。

参考资料

http://spark.apache.org/docs/1.0.0/sql-programming-guide.html

http://hsiamin.com/posts/2014/05/03/enable-lzo-compression-on-hadoop-pig-and-spark/

时间: 2024-11-06 12:13:42

spark sql on hive初探的相关文章

Spark SQL with Hive

前一篇文章是Spark SQL的入门篇Spark SQL初探,介绍了一些基础知识和API,但是离我们的日常使用还似乎差了一步之遥. 终结Shark的利用有2个: 1.和Spark程序的集成有诸多限制 2.Hive的优化器不是为Spark而设计的,计算模型的不同,使得Hive的优化器来优化Spark程序遇到了瓶颈. 这里看一下Spark SQL 的基础架构: Spark1.1发布后会支持Spark SQL CLI , Spark SQL的CLI会要求被连接到一个Hive Thrift Server

第57课:Spark SQL on Hive配置及实战

1,首先需要安装hive,参考http://lqding.blog.51cto.com/9123978/1750967 2,在spark的配置目录下添加配置文件,让Spark可以访问hive的metastore. [email protected]:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/conf# vi hive-site.xml <configuration> <property>   <name>hive.metast

Spark SQL on HIVE

1. SPARK CONF中添加hive-site.xml hive.metastore.uris thrift://master:9083 2. 启动hive元数据 hive --metastore >meta.log 2>&1 & 3. scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) //hiveContext scala>hiveContext.sql("us

spark sql 查询hive表并写入到PG中

import java.sql.DriverManager import java.util.Properties import com.zhaopin.tools.{DateUtils, TextUtils} import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession /** * Created by xiaoyan on 2018/5/21. */ object IhrDownloadPg

spark sql 访问hive数据时找不mysql的解决方法

我尝试着在classpath中加n入mysql的驱动仍不行 解决方法:在启动的时候加入参数--driver-class中加入mysql 驱动 [[email protected] spark-1.0.1-bin-hadoop2]$ bin/spark-shell --driver-class-path lib/mysql-connector-java-5.1.30-bin.jar 总结:1.spark的版本必须编译的时候加上了hive 1.0.0预编译版没有加入hive  1.0.1是含有hiv

Spark SQL Hive Support Demo

前提: 1.spark1.0的包编译时指定支持hive:./make-distribution.sh --hadoop 2.3.0-cdh5.0.0 --with-yarn --with-hive --tgz 2.安装完spark1.0: 3.安装与hadoop对应的CDH版本的hive: Spark SQL 支持Hive案例: 1.将hive-site.xml配置文件拷贝到$SPARK_HOME/conf下 hive-site.xml文件内容形如: <?xml version="1.0&

Spark SQL CLI 实现分析

背景 本文主要介绍了Spark SQL里目前的CLI实现,代码之后肯定会有不少变动,所以我关注的是比较核心的逻辑.主要是对比了Hive CLI的实现方式,比较Spark SQL在哪块地方做了修改,哪些地方与Hive CLI是保持一致的.可以先看下总结一节里的内容. Spark SQL的hive-thriftserver项目里是其CLI实现代码,下面先说明Hive CLI的主要实现类和关系,再说明Spark SQL CLI的做法. Hive CLI 核心启动类是org.apache.hive.se

spark sql简单示例

运行环境 集群环境:CDH5.3.0 具体JAR版本如下: spark版本:1.2.0-cdh5.3.0 hive版本:0.13.1-cdh5.3.0 hadoop版本:2.5.0-cdh5.3.0 spark sql的JAVA版简单示例 spark sql直接查询JSON格式的数据 spark sql的自定义函数 spark sql查询hive上面的表 import java.util.ArrayList; import java.util.List; import org.apache.sp

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio