Spark 连接mysql 及MongoDB

在spark 运算过程中,常常需要连接不同类型的数据库以获取或者存储数据,这里将提及Spark如何连接mysql和MongoDB.

1. 连接mysql , 在1.3版本提出了一个新概念DataFrame ,因此以下方式获取到的是DataFrame,但是可通过JavaRDD<Row> rows = jdbcDF.toJavaRDD()转化为JavaRDD。

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class Main implements Serializable {

    private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Main.class);

    private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
    private static final String MYSQL_USERNAME = "expertuser";
    private static final String MYSQL_PWD = "expertuser123";
    private static final String MYSQL_CONNECTION_URL =
            "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PWD;

    private static final JavaSparkContext sc =
            new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));

    private static final SQLContext sqlContext = new SQLContext(sc);

    public static void main(String[] args) {
        //Data source options
        Map<String, String> options = new HashMap<>();
        options.put("driver", MYSQL_DRIVER);
        options.put("url", MYSQL_CONNECTION_URL); //getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
        options.put("dbtable",
                    "(select emp_no, concat_ws(‘ ‘, first_name, last_name) as full_name from employees) as employees_name");
//     sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:"select title, author from books where ? < = id and id <= ?"
        options.put("partitionColumn", "emp_no");//进行分区的表字段
        options.put("lowerBound", "10001");
//     owerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
        options.put("upperBound", "499999");
        options.put("numPartitions", "10");

        //Load MySQL query result as DataFrame
        DataFrame jdbcDF = sqlContext.load("jdbc", options);
        JavaRDD<Row> rows = jdbcDF.toJavaRDD();
        List<Row> employeeFullNameRows = jdbcDF.collectAsList();

        for (Row employeeFullNameRow : employeeFullNameRows) {
            LOGGER.info(employeeFullNameRow);
        }
    }
}

2. 连接mongoDB

可参考 https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage

时间: 2024-10-13 23:32:25

Spark 连接mysql 及MongoDB的相关文章

记录一次spark连接mysql遇到的问题

在使用spark连接mysql的过程中报错了,错误如下 08:51:32.495 [main] ERROR - Error loading factory org.apache.calcite.jdbc.CalciteJdbc41Factory java.lang.NoClassDefFoundError: org/apache/calcite/linq4j/QueryProvider at java.lang.ClassLoader.defineClass1(Native Method) ~[

spark连接jdbc,连接mysql

1 最直接的方式 scala> val jdbcDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://hadoop1:3306/rdd")-------mysql 接口和库名 .option("dbtable", "rddtable")-----两张表名 .option("user", "root"

node.js连接MySQL操作及注意事项

node.js作为服务端的js运行环境已经出现了有几年了,最近我有个朋友也在做这方面的开发,但是也是刚刚接触,遇到了很多坑.前几天他们在操作数据库的时候出现了点问题,后来我们一起看了看,其实都是node本身机制的一些问题,这里总结一下给新手做借鉴. 我朋友的数据库采用的是MySQL.(至于为什么不用mongoDB,这个是公司上层选型的结果,因为很多新手朋友似乎总是觉的node.js就是应该和mongoDB联系在一起,所以这里简单说下).我后来写了一个简单的小例子,整个小例子使用了express框

pmm 监控mysql、mongodb、系统

Pmm监控 1.概述 Pmm是(percona management and monitoring)一款用于数据库(mysql.mongodb)的监控工具,是一种典型的C/S架构.本次部署采用的是docker,pmm-server端包括数据汇集.展示等,pmm-client主要是部署在需要监控的服务器上,用于数据收集.Pmm-server比较占系统资源,建议将其安装在单独一台服务器上面或安装在一台性能比较好的服务器上. 2.部署server端 系统环境:centos7.2 Docker:1.12

使用Apache Spark 对 mysql 调优 查询速度提升10倍以上

在这篇文章中我们将讨论如何利用 Apache Spark 来提升 MySQL 的查询性能. 介绍 在我的前一篇文章Apache Spark with MySQL 中介绍了如何利用 Apache Spark 实现数据分析以及如何对大量存放于文本文件的数据进行转换和分析.瓦迪姆还做了一个基准测试用来比较 MySQL 和 Spark with Parquet 柱状格式 (使用空中交通性能数据) 二者的性能. 这个测试非常棒,但如果我们不希望将数据从 MySQL 移到其他的存储系统中,而是继续在已有的

【Python】Windows平台下Python、Pydev连接Mysql数据库

Mysql数据库是跨平台的,不是说Python一定就要连接Mongodb. Python连接Mysql数据库是很简单的. 首先,你要配置好Python的开发环境,详见<[Python]Windows版本的Python开发环境的配置,Helloworld,Python中文问题,输入输出.条件.循环.数组.类>(点击打开链接),与Mysql的开发环境,详见<[Mysql]Mysql的安装.部署与图形化>(点击打开链接). 之后,打开Python的官网(点击打开链接),如下图,直接下载一

Spark与mysql整合

一.需求:把最终结果存储在mysql中 1.UrlGroupCount1类 import java.net.URL import java.sql.DriverManager import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * 把最终结果存储在mysql中 */ object UrlGroupCount1 { def main(args: Array[String]): U

MySQL、MongoDB、Redis 数据库之间的区别与使用(本章迭代更新)

MySQL.MongoDB.Redis 数据库之间的区别与使用 MySQL.MongoDB.Redis 数据库之间的区别与使用(本章迭代更新) update:2019年2月20日 15:21:19(本章迭代更新) 一.数据库之间的区别 MySQL MySQL概述 关系型数据库.无论数据还是索引都存放在硬盘中.到要使用的时候才交换到内存中.能够处理远超过内存总量的数据. 在不同的引擎上有不同 的存储方式. 查询语句是使用传统的 SQL 语句,拥有较为成熟的体系,成熟度很高. 开源数据库的份额在不断

python入门(十七)python连接mysql数据库

mysql 数据库:关系型数据库mysql:互联网公司 sqllite:小型数据库,占用资源少,手机里面使用oracle:银行.保险.以前外企.sybase:银行+通信 互联网公司key:valuemongodb:磁盘上redis:内存数据库,持久化memchache:内存数据库 mysql -uroot -p密码装完了之后,cmd下输入mysql命令,需要将安装目录下的bin目录( mysql.exe 所在的目录)加入到path中 本地连接 mysql -uroot -p mysql -h12