学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark
以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:
package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.JdbcRDD; import java.io.Serializable; import java.sql.*; public class JavaJdbcRDDSuite implements Serializable { public static void prepareData() throws ClassNotFoundException, SQLException { //使用本地数据库derby,当然可以使用mysql等关系型数据库 Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); try { //创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列 Statement create = connection.createStatement(); create.execute( "CREATE TABLE FOO(" + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "DATA INTEGER)"); create.close(); //插入数据 PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); for (int i = 1; i <= 5; i++) { insert.setInt(1, i * 2); insert.executeUpdate(); } insert.close(); } catch (SQLException e) { // If table doesn‘t exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } } finally { connection.close(); } } public static void shutdownDB() throws SQLException { try { DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true"); } catch (SQLException e) { // Throw if not normal single database shutdown // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html if (e.getSQLState().compareTo("08006") != 0) { throw e; } } } public static void main(String[] args) throws Exception { JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite"); //准备数据 prepareData(); //构建JdbcRDD JavaRDD<Integer> rdd = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { @Override public Connection getConnection() throws SQLException { return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); } }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 5, 1, new Function<ResultSet, Integer>() { @Override public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); //结果: [2, 4, 6, 8, 10] System.out.println(rdd.collect()); shutdownDB(); sc.stop(); } }
详细了解RDD的api的话,可以参考: spark core RDD api原理详解
时间: 2024-12-20 01:05:30