spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark

以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:

package com.twq.javaapi.java7;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;

import java.io.Serializable;
import java.sql.*;

public class JavaJdbcRDDSuite implements Serializable {

    public static void prepareData() throws ClassNotFoundException, SQLException {
        //使用本地数据库derby,当然可以使用mysql等关系型数据库
        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
        Connection connection =
                DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");

        try {
            //创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列
            Statement create = connection.createStatement();
            create.execute(
                    "CREATE TABLE FOO(" +
                            "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
                            "DATA INTEGER)");
            create.close();

            //插入数据
            PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
            for (int i = 1; i <= 5; i++) {
                insert.setInt(1, i * 2);
                insert.executeUpdate();
            }
            insert.close();
        } catch (SQLException e) {
            // If table doesn‘t exist...
            if (e.getSQLState().compareTo("X0Y32") != 0) {
                throw e;
            }
        } finally {
            connection.close();
        }
    }

    public static void shutdownDB() throws SQLException {
        try {
            DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
        } catch (SQLException e) {
            // Throw if not normal single database shutdown
            // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
            if (e.getSQLState().compareTo("08006") != 0) {
                throw e;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");

        //准备数据
        prepareData();

        //构建JdbcRDD
        JavaRDD<Integer> rdd = JdbcRDD.create(
                sc,
                new JdbcRDD.ConnectionFactory() {
                    @Override
                    public Connection getConnection() throws SQLException {
                        return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
                    }
                },
                "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
                1, 5, 1,
                new Function<ResultSet, Integer>() {
                    @Override
                    public Integer call(ResultSet r) throws Exception {
                        return r.getInt(1);
                    }
                }
        );
        //结果: [2, 4, 6, 8, 10]
        System.out.println(rdd.collect());

        shutdownDB();

        sc.stop();
    }
}

详细了解RDD的api的话,可以参考: spark core RDD api原理详解

时间: 2024-12-20 01:05:30

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库的相关文章

spark2.x由浅入深深到底系列六之RDD java api详解三

学习任何spark知识点之前请先正确理解spark,可以参考:正确理解spark 本文详细介绍了spark key-value类型的rdd java api 一.key-value类型的RDD的创建方式 1.sparkContext.parallelizePairs JavaPairRDD<String, Integer> javaPairRDD =         sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3

spark2.x由浅入深深到底系列六之RDD java api详解四

学习spark任何的知识点之前,先对spark要有一个正确的理解,可以参考:正确理解spark 本文对join相关的api做了一个解释 SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<Integer, Integer> javaPa

spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了java api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: //从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从

spark2.x由浅入深深到底系列六之RDD java api详解二

package com.twq.javaapi.java7; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.funct

spark2.x由浅入深深到底系列六之RDD java api调用scala api的原理

RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext 一.简单实现scala版本的RDD和SparkContext class RDD[T](value: Seq[T]) {   //RDD的map操作   def map[U](f: T => U): RDD[U] = {     new RDD(value.map(f))   

spark2.x由浅入深深到底系列六之RDD 支持java8 lambda表达式

学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark 我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口.接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现: 一.非lambda实现的java spark

spark2.x由浅入深深到底系列七之RDD python api详解一

学习spark任何技术之前,请先正确理解spark,可以参考:正确理解spark 以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了python api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: """ 创建RDD的方法: 1: 从一个稳定的存储系统中,比如hdfs文件, 或者本地文件系统 """

spark2.x由浅入深深到底系列五之python开发spark环境配置

学习spark任何的技术前,请先正确理解spark,可以参考: 正确理解spark 以下是在mac操作系统上配置用python开发spark的环境 一.安装python spark2.2.0需要python的版本是Python2.6+ 或者 Python3.4+ 可以参考: http://jingyan.baidu.com/article/7908e85c78c743af491ad261.html 二.下载spark编译包并配置环境变量 1.在官网中: http://spark.apache.o

【Head First Java 读书笔记】(六)认识Java API

第五章 使用Java函数库 ArrayList add(Object elem) remove(int index) remove(Object elem) contains(Object elem) isEmpty() indexOf(Object elem) size() get(int index) ArrayList与一般数组的区别 ArrayList ArrayList myList = new ArrayList(); String a = new String("whoohoo&q