Spark踩坑记——数据库(Hbase+Mysql)转

转自:http://www.cnblogs.com/xlturing/p/spark.html

前言

在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值。最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己踩到的一些坑进行记录。

Spark Streaming持久化设计模式

DStreams输出操作

  • print:打印driver结点上每个Dstream中的前10个batch元素,常用于开发和调试
  • saveAsTextFiles(prefix, [suffix]):将当前Dstream保存为文件,每个interval batch的文件名命名规则基于prefix和suffix:"prefix-TIME_IN_MS[.suffix]".
  • saveAsObjectFiles(prefix, [suffix]):将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
  • saveAsHadoopFiles(prefix, [suffix]):将Dstream以hadoop文件的形式进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
  • foreachRDD(func):最通用的输出操作,可以对从数据流中产生的每一个RDD应用函数_fun_。通常_fun_会将每个RDD中的数据保存到外部系统,如:将RDD保存到文件,或者通过网络连接保存到数据库。值得注意的是:_fun_执行在跑应用的driver进程中,并且通常会包含RDD action以促使数据流RDD开始计算。

使用foreachRDD的设计模式

dstream.foreachRDD对于开发而言提供了很大的灵活性,但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是:建立远程连接->通过连接传输数据到远程系统->关闭连接。针对这个流程我们很直接的想到了下面的程序代码:

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

spark踩坑记——初试中,对spark的worker和driver进行了整理,我们知道在集群模式下,上述代码中的connection需要通过序列化对象的形式从driver发送到worker,但是connection是无法在机器之间传递的,即connection是无法序列化的,这样可能会引起_serialization errors (connection object not serializable)_的错误。为了避免这种错误,我们将conenction在worker当中建立,代码如下:

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}

似乎这样问题解决了?但是细想下,我们在每个rdd的每条记录当中都进行了connection的建立和关闭,这会导致不必要的高负荷并且降低整个系统的吞吐量。所以一个更好的方式是使用_rdd.foreachPartition_即对于每一个rdd的partition建立唯一的连接(注:每个partition是内的rdd是运行在同一worker之上的),代码如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

这样我们降低了频繁建立连接的负载,通常我们在连接数据库时会使用连接池,把连接池的概念引入,代码优化如下:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

通过持有一个静态连接池对象,我们可以重复利用connection而进一步优化了连接建立的开销,从而降低了负载。另外值得注意的是,同数据库的连接池类似,我们这里所说的连接池同样应该是lazy的按需建立连接,并且及时的收回超时的连接。
另外值得注意的是:

  • 如果在spark streaming中使用了多次foreachRDD,它们之间是按照程序顺序向下执行的
  • Dstream对于输出操作的执行策略是lazy的,所以如果我们在foreachRDD中不添加任何RDD action,那么系统仅仅会接收数据然后将数据丢弃。

Spark访问Hbase

上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式,这里我们阐述如何将Dstream输出到Hbase集群。

Hbase通用连接类

Scala连接Hbase是通过zookeeper获取信息,所以在配置时需要提供zookeeper的相关信息,如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory

object HbaseUtil extends Serializable {
  private val conf = HBaseConfiguration.create()
  private val para = Conf.hbaseConfig // Conf为配置类,获取hbase的配置
  conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181"))
  conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1"))  // hosts
  private val connection = ConnectionFactory.createConnection(conf)

  def getHbaseConn: Connection = connection
}

根据网上资料,Hbase的连接的特殊性我们并没有使用连接池

Hbase输出操作

我们以put操作为例,演示将上述设计模式应用到Hbase输出操作当中:

dstream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
        val connection = HbaseUtil.getHbaseConn // 获取Hbase连接
        partitionRecords.foreach(data => {
            val tableName = TableName.valueOf("tableName")
            val t = connection.getTable(tableName)
            try {
              val put = new Put(Bytes.toBytes(_rowKey_)) // row key
              // column, qualifier, value
              put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes)
              Try(t.put(put)).getOrElse(t.close())
              // do some log(显示在worker上)
            } catch {
              case e: Exception =>
                // log error
                e.printStackTrace()
            } finally {
              t.close()
            }
      })
    })
    // do some log(显示在driver上)
  }
})

关于Hbase的其他操作可以参考Spark 下操作 HBase(1.0.0 新 API)

填坑记录

重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题:

  • 由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1(任意),我们在hosts中需要配置

    127-0-0-1 127.0.0.1
  • 在单机情况下,我们只需要配置一台zookeeper所在Hbase的hosts即可,但是当切换到Hbase集群是遇到一个诡异的bug
    问题描述:在foreachRDD中将Dstream保存到Hbase时会卡住,并且没有任何错误信息爆出(没错!它就是卡住,没反应)
    问题分析:由于Hbase集群有多台机器,而我们只配置了一台Hbase机器的hosts,这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里
    解决方式:对每个worker上的hosts配置了所有hbase的节点ip,问题解决

Spark访问Mysql

同访问Hbase类似,我们也需要有一个可序列化的类来建立Mysql连接,这里我们利用了Mysql的C3P0连接池

MySQL通用连接类

import java.sql.Connection
import java.util.Properties

import com.mchange.v2.c3p0.ComboPooledDataSource

class MysqlPool extends Serializable {
  private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true)
  private val conf = Conf.mysqlConfig
  try {
    cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8"));
    cpds.setDriverClass("com.mysql.jdbc.Driver");
    cpds.setUser(conf.get("username").getOrElse("root"));
    cpds.setPassword(conf.get("password").getOrElse(""))
    cpds.setMaxPoolSize(200)
    cpds.setMinPoolSize(20)
    cpds.setAcquireIncrement(5)
    cpds.setMaxStatements(180)
  } catch {
    case e: Exception => e.printStackTrace()
  }
  def getConnection: Connection = {
    try {
      return cpds.getConnection();
    } catch {
      case ex: Exception =>
        ex.printStackTrace()
        null
    }
  }
}
object MysqlManager {
  var mysqlManager: MysqlPool = _
  def getMysqlManager: MysqlPool = {
    synchronized {
      if (mysqlManager == null) {
        mysqlManager = new MysqlPool
      }
    }
    mysqlManager
  }
}

我们利用c3p0建立Mysql连接池,然后访问的时候每次从连接池中取出连接用于数据传输。

Mysql输出操作

同样利用之前的foreachRDD设计模式,将Dstream输出到mysql的代码如下:

dstream.foreachRDD(rdd => {
    if (!rdd.isEmpty) {
      rdd.foreachPartition(partitionRecords => {
        //从连接池中获取一个连接
        val conn = MysqlManager.getMysqlManager.getConnection
        val statement = conn.createStatement
        try {
          conn.setAutoCommit(false)
          partitionRecords.foreach(record => {
            val sql = "insert into table..." // 需要执行的sql操作
            statement.addBatch(sql)
          })
          statement.executeBatch
          conn.commit
        } catch {
          case e: Exception =>
            // do some log
        } finally {
          statement.close()
          conn.close()
        }
      })
    }
})

值得注意的是:

  • 我们在提交Mysql的操作的时候,并不是每条记录提交一次,而是采用了批量提交的形式,所以需要将conn.setAutoCommit(false),这样可以进一步提高mysql的效率。
  • 如果我们更新Mysql中带索引的字段时,会导致更新速度较慢,这种情况应想办法避免,如果不可避免,那就硬上吧(T^T)

部署

提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置:

<dependency><!-- Hbase -->
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-common</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.0.0</version>
</dependency>

<dependency><!-- Mysql -->
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.31</version>
</dependency>
<dependency>
    <groupId>c3p0</groupId>
    <artifactId>c3p0</artifactId>
    <version>0.9.1.2</version>
</dependency>

参考文献:

  1. Spark Streaming Programming Guide
  2. HBase介绍
  3. Spark 下操作 HBase(1.0.0 新 API)
  4. Spark开发快速入门
  5. kafka->spark->streaming->mysql(scala)实时数据处理示例
  6. Spark Streaming 中使用c3p0连接池操作mysql数据库
时间: 2024-11-05 13:49:54

Spark踩坑记——数据库(Hbase+Mysql)转的相关文章

[转]Spark 踩坑记:数据库(Hbase+Mysql)

https://cloud.tencent.com/developer/article/1004820 Spark 踩坑记:数据库(Hbase+Mysql) 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值. 最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己

spark踩坑——dataframe写入hbase连接异常

最近测试环境基于shc[https://github.com/hortonworks-spark/shc]的hbase-connector总是异常连接不到zookeeper,看下报错日志: 18/06/20 10:45:02 INFO RecoverableZooKeeper: Process identifier=hconnection-0x5175ab05 connecting to ZooKeeper ensemble=localhost:2181 18/06/20 10:45:02 IN

&lt;&lt;Python编程:从入门到实践&gt;&gt;踩坑记 Django

<<Python编程:从入门到实践>>踩坑记 Django Django Python 19.1.1.5 模板new_topic 做完书上的步骤后,对主题添加页面经行测试,但是浏览器显示 服务器异常. 个人采用的开发环境是virtual studio code , 测试起来很是难受,因为我配置的debug环境,断点操作没有作用. 经过我不断的测试,才发现我失败的原因是由于之前的误操作,先建立new_pizzas.py后改为new_pizzas.html的,错误就在这里.在我之后新建

Spring @Transactional踩坑记

@Transactional踩坑记 总述 ? Spring在1.2引入@Transactional注解, 该注解的引入使得我们可以简单地通过在方法或者类上添加@Transactional注解,实现事务控制. 然而看起来越是简单的东西,背后的实现可能存在很多默认规则和限制.而对于使用者如果只知道使用该注解,而不去考虑背后的限制,就可能事与愿违,到时候线上出了问题可能根本都找不出啥原因. 踩坑记 1. 多数据源 事务不生效 背景介绍 ? 由于数据量比较大,项目的初始设计是分库分表的.于是在配置文件中

踩坑记:httpComponents 的 EntityUtils

今天写的一个服务程序,有人报告获得的数据中文乱码,而我是用 apache 通过 httpComponents 去取得数据的,于是开启日志的 debug 级别. 在日志里果然发现中文不见了,有乱码出现: 2014-07-02 16:35:01.348 DEBUG [Wire.java:86] http-outgoing-8 << "<?xml version="1.0" encoding="UTF-8"?>... subject=&q

记一次 Spring 事务配置踩坑记

记一次 Spring 事务配置踩坑记 问题描述:(SpringBoot + MyBatisPlus) 业务逻辑伪代码如下.理论上,插入数据 t1 后,xxService.getXxx() 方法的查询条件会不满足,会查询不到数据.结果事与愿违,后一次的查询,居然查到了数据. void saveXxx(){  xxService.getXxx(); // 查到一条数据 data1  xxService.insert(); // 插入一条数据 t1  xxService.getXxx(); // 查到

阿里云ECS搭建Kubernetes集群踩坑记

阿里云ECS搭建Kubernetes集群踩坑记 [TOC] 1. 现有环境.资源 资源 数量 规格 EIP 1 5M带宽 ECS 3 2 vCPU 16 GB内存 100G硬盘 ECS 3 2 vCPU 16 GB内存 150G硬盘 SLB 2 私网slb.s1.small 2. 规划 坑: 上网问题,因为只有一个EIP,所有其它节点只能通过代理上网; 负载均衡问题,因为阿里不支持LVS,负载均衡TCP方式后端又不支持访问负载均衡,HTTP和HTTPS方式,只支持后端协议为HTTP; 为了避免上

HttpWebRequest 改为 HttpClient 踩坑记-请求头设置

HttpWebRequest 改为 HttpClient 踩坑记-请求头设置 Intro 这两天改了一个项目,原来的项目是.net framework 项目,里面处理 HTTP 请求使用的是 WebReauest,但是 WebRequest 已经不再推荐使用了,你如果在项目中使用的话,编译器会警告, WebRequest已过时,新项目要 .Net standard 重写就直接 HttpClient 来处理 HTTP 请求了,在改的过程中踩了几个坑,记录一下 请求头处理 HttpClient 通常

windows container 踩坑记

windows container 踩坑记 Intro 我们有一些服务是 dotnet framework 的,不能直接跑在 docker linux container 下面,最近一直在折腾把它部署在 windows container 下,折腾的有点恶心,记录一下. Windows Container 介绍 Windows Container 是微软在 Windows 上的虚拟化实践,它可以提供操作系统级别的虚拟化. 通过我们说的容器化大多是指 Linux Container,基于 linu