Spark SQL External Data Sources JDBC官方实现写测试

通过Spark SQL External Data Sources JDBC实现将RDD的数据写入到MySQL数据库中。

jdbc.scala重要API介绍:

/**
 * Save this RDD to a JDBC database at `url` under the table name `table`.
 * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
 * If you pass `true` for `allowExisting`, it will drop any table with the
 * given name; if you pass `false`, it will throw if the table already
 * exists.
 */
def createJDBCTable(url: String, table: String, allowExisting: Boolean) 

/**
 * Save this RDD to a JDBC database at `url` under the table name `table`.
 * Assumes the table already exists and has a compatible schema.  If you
 * pass `true` for `overwrite`, it will `TRUNCATE` the table before
 * performing the `INSERT`s.
 *
 * The table must already exist on the database.  It must have a schema
 * that is compatible with the schema of this RDD; inserting the rows of
 * the RDD in order via the simple statement
 * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
 */
def insertIntoJDBC(url: String, table: String, overwrite: Boolean) 
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

val sqlContext  = new SQLContext(sc)
import sqlContext._

#数据准备
val url = "jdbc:mysql://hadoop000:3306/test?user=root&password=root"

val arr2x2 = Array[Row](Row.apply("dave", 42), Row.apply("mary", 222))
val arr1x2 = Array[Row](Row.apply("fred", 3))
val schema2 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: Nil)

val arr2x3 = Array[Row](Row.apply("dave", 42, 1), Row.apply("mary", 222, 2))
val schema3 = StructType(StructField("name", StringType) :: StructField("id", IntegerType) :: StructField("seq", IntegerType) :: Nil) 

import org.apache.spark.sql.jdbc._

================================CREATE======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)

srdd.createJDBCTable(url, "person", false)
sqlContext.jdbcRDD(url, "person").collect.foreach(println)
[dave,42]
[mary,222]

==============================CREATE with overwrite========================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
srdd.createJDBCTable(url, "person2", false)
sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
[mary,222,2]
[dave,42,1]

val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
srdd2.createJDBCTable(url, "person2", true)
sqlContext.jdbcRDD(url, "person2").collect.foreach(println)
[fred,3]

================================CREATE then INSERT to append======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)
srdd.createJDBCTable(url, "person3", false)
sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
[mary,222]
[dave,42]

srdd2.insertIntoJDBC(url, "person3", false)
sqlContext.jdbcRDD(url, "person3").collect.foreach(println)
[mary,222]
[dave,42]
[fred,3]

================================CREATE then INSERT to truncate======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr1x2), schema2)

srdd.createJDBCTable(url, "person4", false)
sqlContext.jdbcRDD(url, "person4").collect.foreach(println)
[dave,42]
[mary,222]

srdd2.insertIntoJDBC(url, "person4", true)
[fred,3]

================================Incompatible INSERT to append======================================
val srdd = sqlContext.applySchema(sc.parallelize(arr2x2), schema2)
val srdd2 = sqlContext.applySchema(sc.parallelize(arr2x3), schema3)
srdd.createJDBCTable(url, "person5", false)
srdd2.insertIntoJDBC(url, "person5", true)
    java.sql.SQLException: Column count doesn‘t match value count at row 1
时间: 2024-10-23 19:30:42

Spark SQL External Data Sources JDBC官方实现写测试的相关文章

Spark SQL External Data Sources JDBC简易实现

在spark1.2版本中最令我期待的功能是External Data Sources,通过该API可以直接将External Data Sources注册成一个临时表,该表可以和已经存在的表等通过sql进行查询操作.External Data Sources API代码存放于org.apache.spark.sql包中. 具体的分析可参见OopsOutOfMemory的两篇精彩博文: http://blog.csdn.net/oopsoom/article/details/42061077 ht

Spark SQL之External DataSource外部数据源(一)示例

一.Spark SQL External DataSource简介 随着Spark1.2的发布,Spark SQL开始正式支持外部数据源.Spark SQL开放了一系列接入外部数据源的接口,来让开发者可以实现. 这使得Spark SQL支持了更多的类型数据源,如json, parquet, avro, csv格式.只要我们愿意,我们可以开发出任意的外部数据源来连接到Spark SQL.之前大家说的支持HBASE,Cassandra都可以用外部数据源的方式来实现无缝集成. (Ps: 关于Exter

Apache Spark 2.2.0 中文文档 - Spark SQL, DataFrames and Datasets Guide | ApacheCN

Spark SQL, DataFrames and Datasets Guide Overview SQL Datasets and DataFrames 开始入门 起始点: SparkSession 创建 DataFrames 无类型的Dataset操作 (aka DataFrame 操作) Running SQL Queries Programmatically 全局临时视图 创建Datasets RDD的互操作性 使用反射推断Schema 以编程的方式指定Schema Aggregatio

Spark1.1.0 Spark SQL Programming Guide

Spark SQL Programming Guide Overview Getting Started Data Sources RDDs Inferring the Schema Using Reflection Programmatically Specifying the Schema Parquet Files Loading Data Programmatically Configuration JSON Datasets Hive Tables Performance Tuning

Spark SQL, DataFrames and Datasets 指南

概述 Spark SQL 是 Spark 处理结构化数据的模块; 与基础的 Spark RDD API 不同, Spark SQL 提供的接口提供给 Spark 更多的关于数据和执行计算的结; 内在的, Spark SQL 使用这些额外的信息去执行额外的优化; 这里有几种包括 SQL 和 Datasets API 在内的与 Spark SQL 交互的方法; 当计算结果使用相同的执行引擎, 独立于你使用的表达计算的 API/语言; 这种统一意味着开发者可以依据哪种 APIs 对于给定的表达式提供了

spark结构化数据处理:Spark SQL、DataFrame和Dataset

本文讲解Spark的结构化数据处理,主要包括:Spark SQL.DataFrame.Dataset以及Spark SQL服务等相关内容.本文主要讲解Spark 1.6.x的结构化数据处理相关东东,但因Spark发展迅速(本文的写作时值Spark 1.6.2发布之际,并且Spark 2.0的预览版本也已发布许久),因此请随时关注Spark SQL官方文档以了解最新信息. 文中使用Scala对Spark SQL进行讲解,并且代码大多都能在spark-shell中运行,关于这点请知晓. 概述 相比于

Spark SQL 编程

Spark SQL的依赖 Spark SQL的入口:SQLContext 官方网站参考 https://spark.apache.org/docs/1.6.2/sql-programming-guide.html#starting-point-sqlcontext 针对几种不同的语言来写. Spark SQL的入口:HiveContext SQLContext vs HiveContext Spark SQL的作用与使用方式 Spark SQL支持的API 从程序中使用SparkSQL的基本套路

12.spark sql之读写数据

简介 ??Spark SQL支持多种结构化数据源,轻松从各种数据源中读取Row对象.这些数据源包括Parquet.JSON.Hive表及关系型数据库等. ??当只使用一部分字段时,Spark SQL可以智能地只扫描这些字段,而不会像hadoopFile方法一样简单粗暴地扫描全部数据. Parquet ??Parquet是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录.Parquet自动保存原始数据的类型,当写入Parquet文件时,所有的列会自动转为可空约束. scala // Enc

Spark SQL and DataFrame Guide(1.4.1)——之Data Sources

数据源(Data Sources) Spark SQL通过DataFrame接口支持多种数据源操作.一个DataFrame可以作为正常的RDD操作,也可以被注册为临时表. 1. 通用的Load/Save函数 默认的数据源适用所有操作(可以用spark.sql.sources.default设置默认值) 之后,我们就可以使用hadoop fs -ls /user/hadoopuser/在此目录下找到namesAndFavColors.parquet文件. 手动指定数据源选项 我们可以手动指定数据源