Spark Streaming通过JDBC操作数据库

本文记录了学习使用Spark Streaming通过JDBC操作数据库的过程,源数据从Kafka中读取。

Kafka从0.10版本提供了一种新的消费者API,和0.8不同,因此Spark Streaming也提供了两种API与之对应,其中spark-streaming-kafka-0-8支持Kafka 0.8.2.1以后的Broker;spark-streaming-kafka-0-10支持0.10.0以上Broker,处于实验阶段。两者的对比如下表所示。

|spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10

--------------- |------------------------ |--------------------------

Broker Version | 0.8.2.1 or higher | 0.10.0 or higher

Api Stability | Stable | Experimental

Language Support | Scala, Java, Python | Scala, Java

Receiver DStream | Yes | No

Direct DStream | Yes | Yes

SSL / TLS Support | No | Yes

Offset Commit Api | No | Yes

Dynamic Topic Subscription | No | Yes

Spark Streaming集成Kafka的说明可以参考如下资料:

spark-streaming-kafka-0-10 http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

spark-streaming-kafka-0-8 http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

1.2 运行组件

本文所写的例子基于JDK1.8和Scala 2.11,运行依赖组件的情况如下表所示。

组件 部署方式 IP地址 操作系统
spark-2.0.1 伪分布式 192.168.1.91 CentOS 7.1
Kafka-0.10.0.1 伪分布式 192.168.1.90 CentOS 7.1
postgresql-9.4.5 单机 192.168.1.213 CentOS 7.1

数据库中创建了一张名为kafka_message的表,共有三个字段,都是varchar类型。

CREATE TABLE kafka_message (
    timeseq varchar(16),
    thread varchar(32),
    message varchar(255)
);

2. 代码

2.1 pom.xml

依赖的lib如下。

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>com.jolbox</groupId>
        <artifactId>bonecp</artifactId>
        <version>0.8.0.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>postgresql</groupId>
        <artifactId>postgresql</artifactId>
        <version>9.1-901-1.jdbc4</version>
    </dependency>
</dependencies>

2.2 数据库连接池

import java.sql.Connection

import com.jolbox.bonecp.{BoneCP, BoneCPConfig}
import org.slf4j.LoggerFactory

 /**
  *  数据库连接池,使用了BoneCP
  */
object ConnectionPool {

  val logger = LoggerFactory.getLogger(this.getClass)

  //连接池配置
  private val connectionPool: Option[BoneCP] = {
    try{
      Class.forName("org.postgresql.Driver")
      val config = new BoneCPConfig()
      config.setJdbcUrl("jdbc:postgresql://192.168.1.213/yourdb")
      config.setUsername("postgres")
      config.setPassword("******")
      config.setLazyInit(true)

      config.setMinConnectionsPerPartition(3)
      config.setMaxConnectionsPerPartition(5)
      config.setPartitionCount(5)
      config.setCloseConnectionWatch(true)
      config.setLogStatementsEnabled(false)
      Some(new BoneCP(config))
    }catch {
      case exception: Exception =>
        logger.warn("Create Connection Error: \n" + exception.printStackTrace())
        None
    }
  }

  // 获取数据库连接
  def getConnection: Option[Connection] = {
    connectionPool match {
      case Some(pool) => Some(pool.getConnection)
      case None => None
    }
  }

  // 释放数据库连接
  def closeConnection(connection:Connection): Unit = {
    if(!connection.isClosed) {
      connection.close()
    }
  }

2.3 Kafka -> Spark-Streaming -> JDBC

Spark Streaming从Kafka中读取数据,并把数据写入数据库。SPark Streaming编程的基本顺序是:

  1. 创建Spark Streaming上下文
  2. 从数据源接口创建DStream
  3. 在DStream上做转换(Transformations)
  4. 指定计算结果存储的位置
  5. 启动计算

代码如下,详见注释。

import java.sql.Connection

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

/**
  * 从Kafka中读取数据,并把数据写入数据库。
  */
object KafkaToDB {

  val logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    // 参数校验
    if (args.length < 2) {
      System.err.println(
        s"""
           |Usage: KafkaToDB <brokers> <topics>
           |  <brokers> broker列表,至少1个,以英文逗号分割
           |  <topics> topic列表,至少一个,以英文逗号分割
           |""".stripMargin)
      System.exit(1)
    }

    // 处理参数
    val Array(brokers, topics) = args
    val topicSet: Set[String] = topics.split(",").toSet
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    // 1.创建上下文,以每1秒间隔的数据作为一个批次
    val sparkConf = new SparkConf().setAppName("KafkaToDB")
    val streamingContext = new StreamingContext(sparkConf, Seconds(1))

    // 2.创建输入流,获取数据。流操作基于DStream,InputDStream继承于DStream
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topicSet, kafkaParams)
    )

    // 3. DStream上的转换操作
    // 取消息中的value数据,以英文逗号分割,并转成Tuple3
    val values = stream.map(_.value.split(","))
      .filter(x => x.length == 3)
      .map(x => new Tuple3[String, String, String](x(0), x(1), x(2)))

    // 输入前10条到控制台,方便调试
    values.print()

    // 4.同foreachRDD保存到数据库
    val sql = "insert into kafka_message(timeseq, thread, message) values (?,?,?)"
    values.foreachRDD(rdd => {
      val count = rdd.count()
      println("-----------------count:" + count)
      if (count > 0) {
        rdd.foreachPartition(partitionOfRecords => {
          val conn = ConnectionPool.getConnection.orNull
          if (conn != null) {
            partitionOfRecords.foreach(data => insert(conn, sql, data))
            ConnectionPool.closeConnection(conn)
          }
        })
      }
    })

    // 5. 启动计算
    streamingContext.start()
    streamingContext.awaitTermination() // 等待中断结束计算
  }

  /**
    * 保存数据到数据库
    *
    * @param conn 数据库连接
    * @param sql  prepared statement sql
    * @param data 要保存的数据,Tuple3结构
    */
  def insert(conn: Connection, sql: String, data: (String, String, String)): Unit = {
    try {
      val ps = conn.prepareStatement(sql)
      ps.setString(1, data._1)
      ps.setString(2, data._2)
      ps.setString(3, data._3)
      ps.executeUpdate()
      ps.close()
    } catch {
      case e: Exception =>
        logger.error("Error in execution of insert. " + e.getMessage)
    }
  }

}

3. 任务运行

3.1 数据库驱动配置

由于本次程序运行环境是spark standalone 的伪分布式,指定SPARK_CLASSPATH时,相当于同时指定了driver和executor的classpath。

编辑spark-env.sh。

vi $SPARK_HOME/conf/spark-env.sh

输入以下内容,注意把postgresql驱动包放在对应的位置。

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jobs/postgresql-9.1-901-1.jdbc4.jar

实际上,Spark已经不推荐使用SPARK_CLASSPATH参数了,启动的时候,我们会发现如下的日志,提示我们用spark.executor.extraClassPathspark.driver.extraClassPath来代替。如果是spark local模式,只需指定spark.driver.extraClassPath即可。

日志如下。

16/10/21 15:15:33 WARN SparkConf:
SPARK_CLASSPATH was detected (set to ‘:/opt/jobs/postgresql-9.1-901-1.jdbc4.jar‘).
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

16/10/21 15:15:33 WARN SparkConf: Setting ‘spark.executor.extraClassPath‘ to ‘:/opt/jobs/
postgresql-9.1-901-1.jdbc4.jar‘ as a work-around.
16/10/21 15:15:33 WARN SparkConf: Setting ‘spark.driver.extraClassPath‘ to ‘:/opt/jobs/
postgresql-9.1-901-1.jdbc4.jar‘ as a work-around.

3.2 启动任务

$SPARK_HOME/bin/spark-submit \
--master spark://192.168.1.91:7077
--class com.xxx.streaming.KafkaToDB  spark-streaming-demo.jar 192.168.1.90:9092 my-topic

在192.168.1.90上,通过kafka命令发送消息,命令如下:

bin/kafka-console-producer.sh --broker-list 192.168.1.90:9092 --topic mytopic

消息如下:

1475589918658    thread-15    msg-0
1475589920177    thread-15    msg-1

之后,我们可以在控制台看到消息被输出,也可以在数据库中查询到这些数据。

4.Spark程序找不到JDBC驱动的问题

控制台曾经报出找不到JDBC驱动的异常,日志如下。

java.sql.SQLException: No suitable driver found for jdbc:……

该问题可以用前文所述的数据库驱动配置办法解决。在这里推荐一篇博客,对这个问题给出的很好的解释。

http://www.codexiu.cn/spark/blog/12672/

博客来源:https://www.jianshu.com/p/a73c0c95d2fe

原文地址:https://www.cnblogs.com/camilla/p/8350988.html

时间: 2024-10-24 20:00:12

Spark Streaming通过JDBC操作数据库的相关文章

JDBC操作数据库的学习(2)

在上一篇博客<JDBC操作数据库的学习(1)>中通过对例1,我们已经学习了一个Java应用如何在程序中通过JDBC操作数据库的步骤流程,当然我们也说过这样的例子是无法在实际开发中使用的,本篇就在简单开发中如何对上一篇的例子进行“升级”,满足简单开发中对数据库的增删改查(CRUD). 如果按照上一篇中的例子,那么我们在做增删改查的话将会出现每个方法都要获取连接,释放资源,代码会出现很大的重复性,因此我们应该将每个增删改查每个方法中可以复用的代码抽取出来,同时为了能切换数据库方便,也该将一些配置信

jdbc操作数据库语句

非常有用的jdbc操作数据库语句,记录下来,以方便以后的查询. public class PersonDao { // 增加操作 public void insert(Person person) throws Exception; // 修改操作 public void update(Person person) throws Exception; // 删除操作 public void delete(String id) throws Exception ; // 按ID查询操作 publi

JDBC操作数据库的学习(1)

单单对数据库的操作,比如说MySQL,我们可以在命令行窗口中执行,但是一般是应用程序要操作数据库,因此我们应该在程序中的代码上体现对数据库的操作,那么使用程序应用如何操作数据库呢?那就要使用到数据库的连接驱动,应用程序通过这些驱动来操作数据库: 但是这里就又有一个问题了,不同的数据库有各自的驱动程序,而一个应用程序要操作不同的数据库,那么就要懂得要使用的数据库的驱动如何操作,这样就增加了学习成本.好在我们使用Java开发应用,而Java中只需要使用JDBC就能操作所有的数据库,因为JDBC提供的

几种通过JDBC操作数据库的方法,以及返回数据的处理

1.SQL TO String :只返回一个查询结果 例如查询某条记录的总数 rs = stmt.executeQuery(replacedCommand);             if (rs != null && rs.next()) // rs only contains one row and one column             {                    String tempStr = rs.getString(1);                 

JDBC操作数据库的基本步骤

JDBC操作数据库的基本步骤: 1)加载(注册)数据库驱动(到JVM). 2)建立(获取)数据库连接. 3)创建(获取)数据库操作对象. 4)定义操作的SQL语句. 5)执行数据库操作. 6)获取并操作结果集. 7)关闭对象,回收数据库资源(关闭结果集-->关闭数据库操作对象-->关闭连接). package com.yangshengjie.jdbc; import java.sql.Connection; import java.sql.DriverManager; import java

JDBC操作数据库的基本操作

JDBC操作数据库的基本步骤: 1)加载(注册)数据库驱动(到JVM). 2)建立(获取)数据库连接. 3)创建(获取)数据库操作对象. 4)定义操作的SQL语句. 5)执行数据库操作. 6)获取并操作结果集. 7)关闭对象,回收数据库资源(关闭结果集-->关闭数据库操作对象-->关闭连接). 1 package com.yangshengjie.jdbc; 2 import java.sql.Connection; 3 import java.sql.DriverManager; 4 imp

JDBC操作数据库的三种方式比较

JDBC(java Database Connectivity)java数据库连接,是一种用于执行上sql语句的javaAPI,可以为多种关系型数据库提供统一访问接口.我们项目中经常用到的MySQL.oracle.DB2等关系型数据库均是通过JDBC来访问的,现在主流的ORM框架Hibernate.Mybatis等均是在JDBC的基础上做的进一步封装.优化.一般小型的项目,可以直接用JDBC来访问数据库,简单方便.我在进过几个项目后,总结了三总JDBC的基本用法,对这几种用法做一个总结. 第一种

JDBC操作数据库的详细步骤

JDBC操作数据库的步骤: 1.注册驱动 告知JVM使用的是哪一个数据库的驱动 2.获得连接 使用JDBC中的类,完成对MySQL数据库的连接 3.获得语句执行平台 通过连接对象获取对SQL语句的执行者对象 4.执行sql语句 使用执行者对象,向数据库执行SQL语句 获取到数据库的执行后的结果 5.处理结果 6.释放资源  一堆close() 1.注册驱动,发射技术,将驱动加入到内容 使用java.sql.DriverManager类静态方法 registerDriver(Driver driv

Spark Streaming中的操作函数分析

根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象可以调用多种操作,主要分为以下几类 Transformations Window Operations Join Operations Output Operations 一.Transformations 1.map(func) map操作需要传入一个函数当做参数,具体调用形式为 val b = a.map(func) 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新