Spark SQL External Data Sources JDBC简易实现

在spark1.2版本中最令我期待的功能是External Data Sources,通过该API可以直接将External Data Sources注册成一个临时表,该表可以和已经存在的表等通过sql进行查询操作。External Data Sources API代码存放于org.apache.spark.sql包中。

具体的分析可参见OopsOutOfMemory的两篇精彩博文:

http://blog.csdn.net/oopsoom/article/details/42061077

http://blog.csdn.net/oopsoom/article/details/42064075

自己尝试实现了一个简易的读取关系型数据库的外部数据源,代码参见:https://github.com/luogankun/spark-jdbc

支持MySQL/Oracle/DB2,以及几种简单的数据类型,暂时还不支持PrunedScan、PrunedFilteredScan,仅支持TableScan,后续在接着完善。

使用步骤:

1、编译spark-jdbc代码

sbt package

2、添加jar包到spark-env.sh

export SPARK_CLASSPATH=/home/spark/software/source/spark_package/spark-jdbc/target/scala-2.10/spark-jdbc_2.10-0.1.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/ojdbc14-10.2.0.3.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/db2jcc-9.7.jar:$SPARK_CLASSPATH
export SPARK_CLASSPATH=/home/spark/lib/mysql-connector-java-3.0.10.jar:$SPARK_CLASSPATH

3、启动spark-sql

CREATE TEMPORARY TABLE jdbc_table
USING com.luogankun.spark.jdbc
OPTIONS (
sparksql_table_schema  ‘(TBL_ID int, TBL_NAME string, TBL_TYPE string)‘,
jdbc_table_name    ‘TBLS‘,
jdbc_table_schema ‘(TBL_ID , TBL_NAME , TBL_TYPE)‘,
url    ‘jdbc:mysql://hadoop000:3306/hive‘,
user    ‘root‘,
password    ‘root‘,
num_partitions ‘6‘,
where "TBL_ID > 766 AND TBL_NAME=‘order_created_4_partition‘"
);

参数说明:

sparksql_table_schema:spark sql表字段名称与类型

jdbc_table_name:关系型数据库表名

jdbc_table_schema: 关系型数据库表字段名称

url :关系型数据库url

user :关系型数据库用户名

password: 关系型数据库密码

num_partitions:partitions数目,默认是5,可省略

where:过滤条件,可省略

select TBL_ID,TBL_NAME,TBL_TYPE from jdbc_table;

在测试过程中遇到的问题:

如上的代码在连接MySQL数据库操作时没有问题,但是在操作Oracle或者DB2数据库时,报错如下:

09:56:48,302 [Executor task launch worker-0] ERROR Logging$class : Error in TaskCompletionListener
java.lang.AbstractMethodError: oracle.jdbc.driver.OracleResultSetImpl.isClosed()Z
    at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:99)
    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
    at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
    at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:71)
    at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:85)
    at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:110)
    at org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:108)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:108)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
    at org.apache.spark.scheduler.Task.run(Task.scala:54)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
09:56:48,302 [Executor task launch worker-1] ERROR Logging$class : Error in TaskCompletionListener

跟了下JdbcRDD源代码发现,问题在于:

我在本案例中使用的oracle的驱动是ojdbc14-10.2.0.3.jar,查阅了些资料说是Oracle的实现类没有该方法;

该issues详见: https://issues.apache.org/jira/browse/SPARK-5239

解决办法:

1、升级驱动包;

2、暂时屏蔽掉这两个isClosed的判断方法(https://github.com/apache/spark/pull/4033)

后续将会继续完善实现PrunedScan、PrunedFilteredScan,现在的实现确实很“丑陋”,凑合着先能使用吧。

时间: 2024-08-29 16:34:54

Spark SQL External Data Sources JDBC简易实现的相关文章

Spark SQL External Data Sources JDBC官方实现写测试

通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中. jdbc.scala重要API介绍: /** * Save this RDD to a JDBC database at `url` under the table name `table`. * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. * If you pass `tru

Spark SQL之External DataSource外部数据源(一)示例

一.Spark SQL External DataSource简介 随着Spark1.2的发布,Spark SQL开始正式支持外部数据源.Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现. 这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式.只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL.之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成. (Ps: 关于Exter

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Spark1.1.0 Spark SQL Programming Guide

Spark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically Configuration JSON Datasets Hive Tables Performance Tuning

Spark SQL, DataFrames and Datasets 指南

概述 Spark SQL 是 Spark 处理结构化数据的模块; 与基础的 Spark RDD API 不同, Spark SQL 提供的接口提供给 Spark 更多的关于数据和执行计算的结; 内在的, Spark SQL 使用这些额外的信息去执行额外的优化; 这里有几种包括 SQL 和 Datasets API 在内的与 Spark SQL 交互的方法; 当计算结果使用相同的执行引擎, 独立于你使用的表达计算的 API/语言; 这种统一意味着开发者可以依据哪种 APIs 对于给定的表达式提供了

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

12.spark sql之读写数据

简介 ??Spark SQL支持多种结构化数据源,轻松从各种数据源中读取Row对象.这些数据源包括Parquet.JSON.Hive表及关系型数据库等. ??当只使用一部分字段时,Spark SQL可以智能地只扫描这些字段,而不会像hadoopFile方法一样简单粗暴地扫描全部数据. Parquet ??Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录.Parquet自动保存原始数据的类型,当写入Parquet文件时,所有的列会自动转为可空约束. scala // Enc

Spark SQL and DataFrame Guide(1.4.1)——之Data Sources

数据源(Data Sources) Spark SQL通过DataFrame接口支持多种数据源操作.一个DataFrame可以作为正常的RDD操作,也可以被注册为临时表. 1. 通用的Load/Save函数 默认的数据源适用所有操作(可以用spark.sql.sources.default设置默认值) 之后,我们就可以使用hadoop fs -ls /user/hadoopuser/在此目录下找到namesAndFavColors.parquet文件. 手动指定数据源选项 我们可以手动指定数据源

Spark SQL之External DataSource外部数据源(二)源代码分析

上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External DataSource使用篇地址:Spark SQL之External DataSource外部数据源(一)演示样例 http://blog.csdn.net/oopsoom/article/details/42061077) 一.Sources包核心 Spark SQL在Spark1.2中提供了Exte