急中生智~利用Spark core完成"ETL"!

背景介绍:
今天接到老板分配的一个小任务:开发一个程序,实现从数据库中抽取数据并生成报表的功能(这是我们数据库审计平台准备上线的一个功能)。既然是要生成报表,那么首先得有数据,于是便想到从该业务系统的测试环境抽取业务表的数据,然后装载至自己云主机上的Mysql中。
本来以为只要"select ...into outfile"和"load data infile..."两个命令就可以搞定的,可是还是出了意外。测试环境导出的
txt文件在云主机load时,报了"Row 1 doesn‘t contain data for all columns"这样的warning,表中的数据自然也是凌乱且不完整的。
仔细分析,感觉可能是两个方面出了问题:
1.由于测试环境的网段是隔离的,所以为了拿到"select ...into outfile"时生成的数据,我是打开CRT的日志,然后执行
"cat xxx.txt",变相地将数据获取到了本地,然后上传至云主机的;
2.测试环境的Mysql和云主机上Mysql的小版本不一致。
这两个问题看似都没法解决,现在只有文本文件,怎么办?使用Spark不就得了!
之前也写过一篇使用Spark分析Mysql慢日志的博文,自己也对Spark core的各种算子比较熟悉,所以决定试一试。

实战演练:
表结构如下:

mysql> desc claims_case_loss_document;
+---------------+-------------+------+-----+---------+----------------+
| Field         | Type        | Null | Key | Default | Extra          |
+---------------+-------------+------+-----+---------+----------------+
| id            | int(11)     | NO   | PRI | NULL    | auto_increment |
| case_id       | varchar(22) | NO   |     | NULL    |                |
| case_times    | varchar(2)  | NO   |     | NULL    |                |
| document_list | text        | NO   |     | NULL    |                |
| create_time   | timestamp   | YES  |     | NULL    |                |
| update_time   | timestamp   | YES  |     | NULL    |                |
+---------------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)

文本结构如下:

1147    90100002700021437455    1       100100_收款方账户信息;001003_事故证明;001001_驾驶证;100000_收款方×××明;001002_索赔申请书     2017-11-16 12:08:08     2017-11-16 12:08:08

观察文本结构可知,每个字段间都有数个空格,而且两两字段间的空格数并不一致,所以得先使用Spark core将文本中字段间的空格数都变为1,以便后续切分。
闲话少说,直接上程序!(以下程序均使用scala在eclipse ide for scala中编写和执行)

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import java.sql.DriverManager

object insertToMysql {
  def main(args: Array[String]): Unit = {
    val t1=System.nanoTime()
    val conf = new SparkConf()
        .setAppName("insertToMysql")
        .setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text001.txt", 1);
    val words = lines.map { line => line.split(" ") }
    val wordsNotNull = words.map{ word =>
                       val wordArray_raw = new ArrayBuffer[String]()
                       val wordArray = new ArrayBuffer[String]()
                       for(i<-0 until word.length){
                         if (word(i)!=""){
                           wordArray_raw+=word(i)
                         }
                       }
                       for(i<-0 until wordArray_raw.length-4){
                         wordArray+=wordArray_raw(i)
                       }
                       wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
                       wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
                       wordArray
                      }

    wordsNotNull.foreach { word =>
                           Class.forName("com.mysql.cj.jdbc.Driver")
                           val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
                           try {
                                val statement = conn.createStatement()
                                val sql="insert into claims_case_loss_document values ("+
                                        word(0)+","+
                                        "‘"+word(1)+"‘"+","+
                                        "‘"+word(2)+"‘"+","+
                                        "‘"+word(3)+"‘"+","+
                                        "‘"+word(4)+"‘"+","+
                                        "‘"+word(5)+"‘"+")"
                                //执行插入
                                //println(sql)
                                statement.executeUpdate(sql)
                                } catch{
                                        case e:Exception =>e.printStackTrace
                                       }
                                  finally {
                                          conn.close
                                          }
                         }
    val t2=System.nanoTime()
        //打印程序运行时间
    println((t2-t1)/1000000000 +"s")
  }
}

在插入的过程中,第一条记录总是会报错(后续语句插入正常),在eclipse中将报错的insert语句手工粘贴至mysql执行,仍报相同错误:

从报错看是遇到了bug,并且1147这个值有问题,将相邻语句放入notePad对比:

从图中可看出,1147的千位上的1确实发生了异常改变,而第二条语句中的1148是正常的,猜测可能是某个未知bug导致了第一条记录发生了异常改变。这个猜测在后续得到了证实:当把1147所在行从文本中删除后(此时1148所在行为第一条记录),1148所在行也报出同样的错误,而后续语句均可正常插入。
由于数据是作分析用的,所以丢失一条无伤大雅,而且这个bug实在诡异,这里就不再深究了。

细心的童鞋在看了代码后应该会问:数据插入的效率如何?实不相瞒,效率很差!5000条的数据足足用了近半个小时,即使是在这样的OLAP场景下,这样的效率也是不可容忍的!
仔细研究代码可发现,在对RDD调用foreach方法进行插入的时候,每一条记录都要创建一个连接,并且每一次insert都会在Mysql中触发一次commit操作(autocommit参数默认是打开的),这些都是很消耗资源的操作,插入效率自然很差。
发现这些问题后,针对代码进行了修改:

package cn.spark.study.sql

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer

object insert2Mysql {
    def main(args: Array[String]): Unit = {
    val t1=System.nanoTime()
    val conf = new SparkConf()
        .setAppName("insert2Mysql")
        .setMaster("local")
    val sc = new SparkContext(conf)
    //textFile方法只能读取字符集为utf-8的文件,否则中文会乱码。windows下,将文件另存为时,可以选择utf-8字符集
    //也可在代码中实施转换,但比较繁琐
    val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text01.txt", 1);
    val words = lines.map { line => line.split(" ") }
    val wordsNotNull = words.map{ word =>
                       val wordArray_raw = new ArrayBuffer[String]()
                       val wordArray = new ArrayBuffer[String]()
                       for(i<-0 until word.length){
                         if (word(i)!=""){
                           wordArray_raw+=word(i)
                         }
                       }
                       for(i<-0 until wordArray_raw.length-4){
                         wordArray+=wordArray_raw(i)
                       }
                       wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
                       wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
                       wordArray
                      }

    val sqlRDD=wordsNotNull.map{ word =>
                                    val sql="insert into claims_case_loss_document values ("+
                                             word(0)+","+
                                             "‘"+word(1)+"‘"+","+
                                             "‘"+word(2)+"‘"+","+
                                             "‘"+word(3)+"‘"+","+
                                             "‘"+word(4)+"‘"+","+
                                             "‘"+word(5)+"‘"+")"
                                    sql
                                  }

    val sqlArray=sqlRDD.toArray()

    //加载驱动
    Class.forName("com.mysql.cj.jdbc.Driver")
    val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
    try {
        conn.setAutoCommit(false)
        val statement = conn.createStatement()
        //这里有bug,处理出来的第一行格式都会报ERROR 1054 (42S22): Unknown column ‘?1147‘ in ‘field list‘
        //为了避免程序跳出循环,所以循环从1开始,即从第2条开始插入
        for(i<-1 until sqlArray.length){
           //执行插入
          println(sqlArray(i))
          statement.executeUpdate(sqlArray(i))
          }
        conn.commit()
        }
    catch{
          case e:Exception =>e.printStackTrace
          }
    finally{
            conn.close
            }

    val t2=System.nanoTime()
    println((t2-t1)/1000000000 +"s")
  }
}

修改后的代码规避了上述缺陷,在同样插入5000条数据的情况下,只用了221s!效率大大提升!
到Mysql验证数据:

mysql> select count(*) from claims_case_loss_document;
+----------+
| count(*) |
+----------+
|     4999 |
+----------+
1 row in set (0.00 sec)

mysql> select * from claims_case_loss_document limit 1\G
*************************** 1. row ***************************
           id: 1148
      case_id: 90100002700021437450
   case_times: 1
document_list: 100100_收款方账户信息;001003_事故证明;001001_驾驶证;100000_收款方×××明;001002_索赔申请书
  create_time: 2017-11-16 12:08:08
  update_time: 2017-11-16 12:08:08
1 row in set (0.00 sec)

至此,问题圆满解决!整个过程和数据仓库领域的ETL很接近,抽取-转换-装载,三个环节都有涉及,只是没有使用
kettle之类的工具罢了。

总结:
在大数据时代,DBA应该积极做出改变,掌握一定开发技能,以便更好地适应时代变化,切不可固守自己的一亩三分地!

最后,给我们上海分组自研的数据库审计平台打个广告 ^.^
数据库审计平台是我们分组历时两年打造的产品,可用于Mysql、Oracle、Postgres等多种数据库,具备以下核心工能:
1.审计违规sql,前端一键生成报告
2.对相同功能点的sql可实现自动归类,方便后续统一整改
3.内嵌Percona toolkit,前端一键调用
4.一键抓取低效sql,并自动给出优化建议
还有很多很酷的功能就不一一介绍了,总之,谁用谁说好!感兴趣的DBA童鞋可以留言,可免费试用哦!

原文地址:http://blog.51cto.com/13476134/2115018

时间: 2024-08-30 15:38:19

急中生智~利用Spark core完成"ETL"!的相关文章

【Spark Core】任务运行机制和Task源代码浅析1

引言 上一小节<TaskScheduler源代码与任务提交原理浅析2>介绍了Driver側将Stage进行划分.依据Executor闲置情况分发任务,终于通过DriverActor向executorActor发送任务消息. 我们要了解Executor的运行机制首先要了解Executor在Driver側的注冊过程.这篇文章先了解一下Application和Executor的注冊过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor运行的Task分为ShuffleMa

【Spark Core】任务执行机制和Task源码浅析1

引言 上一小节<TaskScheduler源码与任务提交原理浅析2>介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息. 我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程. 1. Task类及其相关 1.1 Task类 Spark将由Executor执行的Task分为ShuffleMap

Spark Core源代码分析: Spark任务模型

概述 一个Spark的Job分为多个stage,最后一个stage会包含一个或多个ResultTask,前面的stages会包含一个或多个ShuffleMapTasks. ResultTask运行并将结果返回给driver application. ShuffleMapTask将task的output依据task的partition分离到多个buckets里.一个ShuffleMapTask相应一个ShuffleDependency的partition,而总partition数同并行度.redu

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

spark core源码分析6 Spark job的提交

本节主要讲解SparkContext的逻辑 首先看一个spark自带的最简单的例子: object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = ma

利用Spark mllab进行机器学习的基本操作(聚类,分类,回归分析)

Spark作为一种开源集群计算环境,具有分布式的快速数据处理能力.而Spark中的Mllib定义了各种各样用于机器学习的数据结构以及算法.Python具有Spark的API.需要注意的是,Spark中,所有数据的处理都是基于RDD的. 首先举一个聚类方面的详细应用例子Kmeans: 下面代码是一些基本步骤,包括外部数据,RDD预处理,训练模型,预测. #coding:utf-8 from numpy import array from math import sqrt from pyspark

TypeError: Error #1034: 强制转换类型失败:无法将 mx.controls::[email&#160;protected] 转换为 spark.core.IViewport。

1.错误描述 TypeError: Error #1034: 强制转换类型失败:无法将 mx.controls::[email protected] 转换为 spark.core.IViewport. at mx.binding::Binding/defaultDestFunc()[E:\dev\4.0.0\frameworks\projects\framework\src\mx\binding\Binding.as:270] at Function/http://adobe.com/AS3/2

这些组件分别处理Spark Core提供内存计算框架

Spark不仅支持Scala编写应用程序,而且支持Java和Python等语言进行编写,特别是Scala是一种高效.可拓展的语言,能够用简洁的代码处理较为复杂的处理工作. l通用性强 Spark生态圈即BDAS(伯克利数据分析栈)包含了Spark Core.Spark SQL.Spark Streaming.MLLib和GraphX等组件,这些组件分别处理Spark Core提供内存计算框架.SparkStreaming的实时处理应用.Spark SQL的即席查询.MLlib或MLbase的机器

Spark调研笔记第7篇 - 应用实战: 如何利用Spark集群计算物品相似度

本文是Spark调研笔记的最后一篇,以代码实例说明如何借助Spark平台高效地实现推荐系统CF算法中的物品相似度计算. 在推荐系统中,最经典的推荐算法无疑是协同过滤(Collaborative Filtering, CF),而item-cf又是CF算法中一个实现简单且效果不错的算法. 在item-cf算法中,最关键的步骤是计算物品之间的相似度.本文以代码实例来说明如何利用Spark平台快速计算物品间的余弦相似度. Cosine Similarity是相似度的一种常用度量,根据<推荐系统实践>一