spark之JDBC开发(连接数据库测试)


以下操作属于本地模式操作:

1、在Eclipse4.5中建立工程RDDToJDBC,并创建一个文件夹lib用于放置第三方驱动包

[[email protected] software]$ cd /project/RDDToJDBC/
[[email protected] RDDToJDBC]$ mkdir -p lib
[[email protected] RDDToJDBC]$ ls
bin lib src

2、添加必要的环境

2.1、将MySql的jar包拷贝到工程目录RDDToJDBC下的lib目录下
[[email protected] software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
2.1、将Spark的开发库Spark2.1.1-All追加到RDDToJDBC工程的classpath路径中去(可以通过添加用户库的方式来解决);Spark2.1.1-All中包含哪些包,请点击此处

3、准备spark的源数据:

[[email protected] spark]$ cd /home/hadoop/test/jdbc/
[[email protected] jdbc]$ ls
myuser  testJDBC.txt
[[email protected] jdbc]$ cat myuser
lisi 123456 165 1998-9-9
lisan 123ss 187 2009-10-19
wangwu 123qqwe 177 1990-8-3

4、开发源码:

package com.mmzs.bigdata.spark.core.local;

import java.io.File;
import java.sql.Connection;
import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple4;

public class TestMain {
    /**
     * 全局计数器
     */
    private static int count;

    /**
     * 数据库连接
     */
    private static Connection conn;

    /**
     * 预编译语句
     */
    private static PreparedStatement pstat;

    private static final File OUT_PATH=new File("/home/hadoop/test/jdbc/output");

    static{
        delDir(OUT_PATH);
        try {
            String sql="insert into t_user(userName,passWord,height,birthday) values(?,?,?,?)";
            String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8";
            Class.forName("com.mysql.jdbc.Driver");
            conn=DriverManager.getConnection(url, "root", "123456");
            pstat=conn.prepareStatement(sql);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    /**
     * 删除任何目录或文件
     * @param f
     */
    private static void delDir(File f){
        if(!f.exists())return;
        if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){
            f.delete();
            return;
        }
        File[] files=f.listFiles();
        for(File fp:files)delDir(fp);
        f.delete();
    }

    private static void batchSave(Tuple4<String,String,Double,Date> line,boolean isOver){
        try{
            pstat.setString(1, line._1());
            pstat.setString(2, line._2());
            pstat.setDouble(3, line._3());
            pstat.setDate(4, line._4());

            if(isOver){//如果结束了循环则直接写磁盘
                pstat.addBatch();
                pstat.executeBatch();
                pstat.clearBatch();
                pstat.clearParameters();
            }else{ //如果没有结束则将sql语句添加到批处理中去
                pstat.addBatch();
                count++;
                if(count%100==0){ //如果满一个批次就提交一次批处理操作
                    pstat.executeBatch();
                    pstat.clearBatch();
                    pstat.clearParameters();
                }
            }
        }catch(SQLException e){
            e.printStackTrace();
        }
    }

    /**
     * 将RDD集合中的数据存储到关系数据库MYSql中去
     * @param statResRDD
     */
    private static void saveToDB(JavaRDD<String> statResRDD){
        final long rddNum=statResRDD.count();
        statResRDD.foreach(new VoidFunction<String>(){
            private long count=0;
            @Override
            public void call(String line) throws Exception {
                String[] fields=line.split(" ");
                String userName=fields[1];
                String passWord=fields[2];
                Double height=Double.parseDouble(fields[3]);
                Date birthday=Date.valueOf(fields[4]);
                Tuple4<String,String,Double,Date> fieldTuple=new Tuple4<String,String,Double,Date>(userName,passWord,height,birthday);
                if(++count<rddNum){
                    batchSave(fieldTuple,false);
                }else{
                    batchSave(fieldTuple,true);
                }
            }
        });

        try{
            if(null!=pstat)pstat.close();
            if(null!=conn)conn.close();
        }catch(SQLException e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        SparkConf conf=new SparkConf();
        conf.setAppName("Java Spark local");
        conf.setMaster("local");

        //根据Spark配置生成Spark上下文
        JavaSparkContext jsc=new JavaSparkContext(conf);

        //读取本地的文本文件成内存中的RDD集合对象
        JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc/myuser");

        //...........其它转换或统计操作................

        //存储统计之后的结果到磁盘文件中去
        //lineRdd.saveAsTextFile("/home/hadoop/test/jdbc/output");
        saveToDB(lineRdd);

        //关闭Spark上下文
        jsc.close();
    }
}

5、初始化MySql数据库服务(节点在192.168.154.134上)

A、启动MySql数据库服务

[[email protected] ~]# cd /software/mysql-5.5.32/multi-data/3306/
[[email protected] 3306]# ls
data my.cnf my.cnf.bak mysqld
[[email protected] 3306]# ./mysqld start
Starting MySQL...

B、建立test库

[[email protected] 3306]# cd /software/mysql-5.5.32/bin/
[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
+--------------------+
[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;"
[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
+--------------------+
| Database           |
+--------------------+
| information_schema |
| mysql              |
| performance_schema |
| test               |
+--------------------+

C、建立myuser表:

[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.myuser(username varchar(30),password varchar(30),height double(10,1),birthday date)engine=myisam charset=utf8;"
[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;show tables;"
+-------------------+
| Tables_in_test |
+-------------------+
| myuser             |
+-------------------+
[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;desc test.myuser;"
+-------------+--------------+-------+----+---------+-------+
| Field       | Type         | Null  | Key| Default | Extra |
+-------------+--------------+-------+----+---------+-------+
| username    | varchar(30)  | YES   |    | NULL    |       |
| password    | varchar(30)  | YES   |    | NULL    |       |
| height      | double(10,1) | YES   |    | NULL    | NULL  |
| birthday    | date         | YES   |    | NULL    |       |
+-------------+--------------+-------+----+---------+-------+

#目前数据库表中还没有数据
[[email protected] bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;"

6、运行并查看数据库中结果

6.1、在Eclipse4.5中直接运行Spark代码,观察Eclipse控制台输出
6.2、检查在关系数据库MySql中是否已经存在数据

原文地址:https://www.cnblogs.com/mmzs/p/8476286.html

时间: 2024-11-06 15:16:29

spark之JDBC开发(连接数据库测试)的相关文章

Spark管理与开发

Spark 2.x管理与开发 ==========第一篇:Scala编程语言========= 一.Scala语言基础 1.Scala语言简介 Scala是一种多范式的编程语言,其设计的初衷是要集成面向对象编程和函数式编程的各种特性.Scala运行于Java平台(Java虚拟机),并兼容现有的Java程序.它也能运行于CLDC配置的Java ME中.目前还有另一.NET平台的实现,不过该版本更新有些滞后.Scala的编译模型(独立编译,动态类加载)与Java和C#一样,所以Scala代码可以调用

JDBC - 开发实例 - MVC模式

JDBC - 开发实例 - MVC模式  1. 在web.xml中配置连接数据库的信息 web.xml: <context-param> <param-name>server</param-name> //主机名 <param-value>localhost</param-value> </context-param> <context-param> <param-name>db</param-name&

spark Intellij IDEA开发环境搭建

(1)创建Scala项目 File->new->Project,如下图 选择Scala 然后next 其中Project SDK指定安装的JDK,Scala SDK指定安装的Scala(这里使用的是IDEA自带的scala SDK),这里将项目名称命令为SparkWordCount,然后finish  在IDEA中开发应用程序时,常常需要通过一定的文件目录组织进行源码编写,例如源文件目录.测试源文件目录,下面演示在Intellij IDEA的src目录下创建main/scala源文件目录. 直

动态代理在WEB与JDBC开发中的应用(JDBC篇)

背景描述 如果之前看过<动态代理在WEB与JDBC开发中的应用(WEB篇)>,这篇的内容可以全当是另一种应用的进阶举例,而在实现上确实没有太多进步的地方.我们先看一下项目所面临问题以及期望解决方案.在作者所接触的这个项目中,直接使用原始JDBC技术,java.sql.PreparedStatement和java.sql.ResultSet几乎占领了数据访问层,没有半点OR Mapping的迹象,看起来是不是很悲催?命啊-在项目开发至一半的时候,突然发现要对日文字符进行支持,而在之前一直使用英文

敏捷开发下, 如何将需求分析,架构(软件)设计,开发与测试,一气呵成式的结合且高效的完成 ?

产品开发中,时常会发生类似如图中 "削马铃薯"的悲剧. 悲剧的发生,往往是由于我们只传递了 "要作什么功能"给开发人员.却缺乏了一个有效的且轻量级的实践,能在正式进入迭代开发前,确认开发人员是否真有能力,能将 "使用者的需求"转化为 "可执行的代码"? "场景树" 便是一结合Use Case, Domain Driven Design, UML 的轻量级可视化的敏捷实践. 经由场景树,可确认开发人员,是否已

ASP.NET 连接数据库测试(VS2010)

1.新建一个ASP.NET网站模板:双击web.config文件,在<configuration>和</configuration>节点中添加一个<connectionStrings>节点,代码如下: <connectionStrings> <add name="Con" connectionString="server=YAYUN\SQLEXPRESS;DataBase=Hotel;User ID=sa;Password

中小企业可以用docker来标准化开发、测试、生产环境

一.使用 Docker 搭建 Tomcat 运行环境 1 Docker与虚拟机 2 搭建过程 2.1 准备宿主系统 准备一个 CentOS 7操作系统,具体要求如下: 必须是 64 位操作系统 建议内核在 3.8 以上 通过以下命令查看您的 CentOS 内核: # uname -r 2.2 安装Docker # yum install docker 可使用以下命令,查看 Docker 是否安装成功: # docker version 若输出了 Docker 的版本号,则说明安装成功了,可通过以

web接口开发与测试

最近一直在学习和整理web开发与接口测试的相关资料.接口测试本身毫无任何难度,甚至有很多工具和类库来帮助我们进行接口测试.大多测试人员很难深入了解web接口测试的原因是对web开发不太了解,当你越了解开发就会越看得清接口是什么.当然,web开发是比较麻烦,我们很难一下子掌握. 注:不过本文并不是一个零基础的文章,需要你对 Django web开发,requests接口库,unittest单元测试框架,三者有一定的了解. Django快速开发之投票系统 之前分享过一篇Django开发投票系统的例子

android开发及测试工具

1.Buckfacebook开源的Android编译工具,效率是ant的两倍.主要优点在于:(1) 加快编译速度,通过并行利用多核cpu和跟踪不变资源减少增量编译时间实现(2) 可以在编译系统中生成编译规则而无须另外的系统生成编译规则文件(3) 编译同时可生成单元测试结果(4) 既可用于IDE编译也可用于持续集成编译(5) facebook持续优化中项目地址:https://github.com/facebook/buck 2.Android Maven PluginAndroid Maven插