Spark SQL实现日志离线批处理

一、 基本的离线数据处理架构:

  1. 数据采集   Flume:Web日志写入到HDFS
  2. 数据清洗   脏数据 Spark、Hive、MR等计算框架来完成。 清洗完之后再放回HDFS
  3. 数据处理   按照需要,进行业务的统计和分析。 也通过计算框架完成
  4. 处理结果入库   存放到RDBMS、NoSQL中
  5. 数据可视化    通过图形化展示出来。  ECharts、HUE、Zeppelin

处理框图:

1 2 3 4 5 6 7为离线处理,其中5不一定是Hive(还有Spark SQL等) 6不一定是RDBMS(NoSQL)

执行时,可用调度框架Oozie、Azkaban,指定任务执行的时间

另外一条线是实时处理

拟定项目需求:

  1. 统计某时间段最受欢迎的某项的TopN和对应的访问次数
  2. 按地市统计最受欢迎  从IP提取城市信息
  3. 按访问流量统计

互联网日志一般包括有:

访问时间  访问URL  耗费流量   访问IP地址

从日志里提取以上我们需要的数据

假设我们现在仅有一台电脑供学习作为集群使用,为了防止内存溢出,有必要进行剪切日志:

用head -10000命令截取前10000条

数据量太大的话,在IDE中可能会报错

二、日志处理过程

数据清洗:

第一步: 从原始日志提取有用信息,本例中就是拿到时间、URL、流量、IP

  1. 读取日志文件,得到RDD,通过map方法,split成一个数组,然后选择数组中有用的几项(用断点的方法分析哪几项有用,并匹配相应的变量)
  2. 获取到的信息有可能因为某些问题,如线程问题而导致生成了带有错误的信息,第一步中一开始用了SimpleDateFormat(线程不安全)来转变时间格式,会导致某些时间转换错误。一般要改成FastDateFormat来做

实现代码:

//提取有用信息,转换格式
object SparkStatFormatJob {
  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("SparkStatFormatJob").master("local[2]").getOrCreate()
    val access = spark.sparkContext.textFile("/Users/kingheyleung/Downloads/data/10000_access.log")
    //access.take(10).foreach(println)
    access.map(line => {
      val splits = line.split(" ")
      val ip = splits(0)
      //用断点的方法,观察splits数组,找出时间、url、流量对应哪一个字段
      //创建时间类DateUtils,转换成常用的时间表达方式
      //把url多余的""引号清除掉
      val time = splits(3) + " " + splits(4)
      val url = splits(11).replaceAll("\"", "")
      val traffic = splits(9)
      //(ip, DateUtils.parse(time), url, traffic)  用来测试输出是否正常
      //把裁剪好的数据重新组合,用Tab分割
      DateUtils.parse(time) + "\t" + url + "\t" + traffic + "\t" + ip
    }).saveAsTextFile("file:///usr/local/mycode/immooclog/")
    spark.stop()
  }
}
//日期解析
object DateUtils {
  //输入格式
  val ORIGINAL_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:sss Z", Locale.ENGLISH)
  //输出格式
  val TARGET_TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(time:String) = {
    TARGET_TIME_FORMAT.format(new Date(getTime(time)))
  }
  def getTime(time:String) = {
    try {
      ORIGINAL_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
    } catch {
      case e : Exception => {
        0l
      }
    }
  }

一般日志处理需要进行分区

本例中按照日志中的访问时间进行分区

第二步:解析上一步得到的有用信息,我把它称为解析日志

其实就是把较为整洁的数据日志,解析出每个字段的含义,并把RDD转成DF

在此案例中,完成的是:

输入:访问时间  访问URL  耗费流量   访问IP地址  =>转变为输出:url、类型(本例中url的后缀有article还是video)、对应ID号、流量、ip、城市、时间、天(用于分组)

并且创建DataFrame(也就是定义Row和StructType,其中Row要和原日志的每个字段对应,而StructType是根据所需要的输出来定义就行)

实现代码:

//解析日志
object SparkStatCleanJob {
  def main(args: Array[String]) = {
    val spark = SparkSession.builder().appName("SparkStatCleanJob").master("local[2]").getOrCreate()
    val accessRDD = spark.sparkContext.textFile("file:///Users/kingheyleung/Downloads/data/access_10000.log")
    //RDD convert to DF, define Row and StructType
    val accessDF = spark.createDataFrame(accessRDD.map(line => LogConvertUtils.convertToRow(line)), LogConvertUtils.struct)
    //accessDF.printSchema()
    //accessDF.show(false)
    spark.stop()
  }
}
//RDD转换成DF的工具类
object LogConvertUtils {
  //构建Struct
  val struct = StructType(
    Array(
      StructField("url", StringType),
      StructField("cmsType", StringType),
      StructField("cmsId", LongType),
      StructField("traffic", LongType),
      StructField("ip", StringType),
      StructField("city", StringType),
      StructField("time", StringType),
      StructField("day", StringType)
    )
  )
  //提取信息,构建Row
  def convertToRow(line:String) = {

    try {
      val splits = line.split("\t")
      val url = splits(1)
      val traffic = splits(2).toLong
      val ip = splits(3)
      val domain = "http://www.imooc.com/"
      val cms = url.substring(url.indexOf(domain) + domain.length())
      val cmsSplits = cms.split("/")

      var cmsType = ""
      var cmsId = 0l
      //判断是否存在
      if (cmsSplits.length > 1) {
        cmsType = cmsSplits(0)
        cmsId = cmsSplits(1).toLong
      }
      val city = IpUtils.getCity(ip)     //通过Ip解析工具传进,具体看下面
      val time = splits(0)
      val day = time.substring(0, 10).replaceAll("-", "")

      //定义Row,与Struct一样
      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case e: Exception => Row(0)
    }
  }
}

注意:转换时一定要记得类型转换!!!!

进一步解析:对IP地址解析来获得城市信息

在这里,为了让IP地址转换成直观的城市信息,我使用了GitHub上的开源项目来实现:

https://github.com/wzhe06/ipdatabase.git

用Maven编译下载的项目

mvn clean package -DskipTests

安装jar包到自己的Maven仓库中:

mvn install:install-file -Dfile=路径.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar

在IDE里面的pom.xml添加dependency,参照GitHub主页上的pom.xml中的dependency

但是出现报错了:

java.io.FileNotFoundException:

file:/Users/rocky/maven_repos/com/ggstar/ipdatabase/1.0/ipdatabase-1.0.jar!/ipRegion.xlsx (No such file or directory)

根据提示,我们需要在项目源码中找到相应的文件拷进去IDE中的main/resources中!

存储清洗后的数据:

按day分区来进行存储  partitionBy

存储模式:mode(SaveMode.Overwrite)  覆盖存储

coalesce:据说生产中经常用,是项目的调优点,控制文件的输出大小,个数

三、统计功能实现

功能实现一:统计TopN视频

第一步:读取数据,read.format().load

第二步:

  1. 使用DataFrame API统计分析
  2. SQL API

最后把统计结果保存在MySQL数据库中

调优点:

读取parquet文件时,系统会默认解析各字段相应的数据类型,但有时候我们就只需要它是String类型,需要在SparkSession定义时添加:

config("spark.sql.sources.partitionColumnTypeInference.enabled, "false"")

变成只会按照原类型读入

两种方法:

若使用DataFrame API来做:

用$号时候需要导入隐式转换(这里是列名转换成列)!spark.implicits._

用到dataframe的count()函数要导入包:org.apache.spark.sql.functions._

若使用SQL API来做:

创建临时表createTempView

小心写SQL语句换行时不注意而忽略空格

实现代码:

//完成统计操作
object TopNStatJob {
  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("TopNStatJob")
      .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
      .master("local[2]").getOrCreate()
    val accessDF = spark.read.format("parquet").load("/Users/kingheyleung/Downloads/data/clean/")
    dfCountTopNVideo(spark, accessDF)
    sqlCountTopNVideo(spark, accessDF)
    //accessDF.printSchema()

    spark.stop()
  }
  def dfCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {
    /*
    * DF API
    * */

    //导入隐式转换, 留意$号的使用, 并且导入functions包,使agg聚合函数count能够使用,此处若不用$的话,就无法让times进行desc排序了
    import spark.implicits._
    val topNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
      .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
    topNDF.show(false)
  }
  def sqlCountTopNVideo(spark: SparkSession, accessDF: DataFrame): Unit = {
    /*
    * SQL API
    * */

    //创建临时表access_view,注意换行时,很容易忽略掉空格
    accessDF.createTempView("access_view")
    val topNDF = spark.sql("select day, cmsId, count(1) as times from access_view " +
      "where day == ‘20170511‘ and cmsType == ‘video‘ " +
      "group by day, cmsId " +
      "order by times desc")
    topNDF.show(false)
  }
}

在保存数据之前,需要写连接MySQL数据库的工具类,用到java.sql包

  1. 使用DriverManager,连接到mysql 3306
  2. 释放资源,connection和preparedstatement都要,注意处理异常

注意:若测试时拿不到连接,出现以下报错,那就是没有在dependency中添加或者选对mysql-connetor包

java.sql.SQLException: No suitable driver found for jdbc:mysql://localhost:3306/imooc_project?user=root&password=666

Error:scalac: error while loading <root>, Error accessing /Users/kingheyleung/.m2/repository/mysql/mysql-connector-java/5.0.8/mysql-connector-java-5.0.8.jar

我最终选的是5.1.40版本才对了

实现代码:

/*
* 连接MySQL数据库
* 操作工具类
* */
object MySQLUtils {
  //获得连接
  def getConnection(): Unit = {
    DriverManager.getConnection("jdbc:mysql://localhost:3306/imooc_project?user=root&password=666")
  }
  //释放资源
  def release(connection: Connection, pstmt: PreparedStatement): Unit = {
    try {
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      connection.close()
    }
  }
}

把统计数据保存到MySQL

  1. 在mysql中创建一张表,包含day,cms_Id,times三个字段(注意各自的数据类型,以及定义不允许为NULL,并把day和cms_Id作为PRI KEY)
  2. 创建模型类case class,三个输入参数,day、cms_Id,times
  3. 创建操作数据库DAO类,输入的参数是一个list,list装的是上面的模型类,目的是插入insert记录到数据库中,DAO中分以下几步:
  4. 首先,做jdbc连接的准备,创建connection和prepareStatement,把关闭连接也写好,用try catch finally抛出异常;
  5. 然后写sql语句,preparestatement需要赋值的地方用占位符放着;
  6. 进行对list遍历,把每个对象都放进pstmt中
  7. 调优点!!!遍历前把自动提交关掉,遍历中把pstmt加入批处理中,遍历完后执行批处理操作!最后手工提交连接

实现代码:

//课程访问次数实体类
case class VideoAccessStat(day: String, cmsId:Long, times: Long)

/*
* 各个维度统计的DAO操作
* */
object StatDAO {
  /*
  * 批量保存VideoAccessStat到数据库
  * */
  def insertDayAccessTopN(list: ListBuffer[VideoAccessStat]): Unit = {

    var connection: Connection = null  //jdbc的准备工作, 定义连接
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnection() //真正获取连接

      connection.setAutoCommit(false)   //为了实现批处理,要关掉默认的自动提交

      val sql = "insert into day_topn_video(day, cms_id, times) values (?, ?, ?)"  //占位符
      pstmt = connection.prepareStatement(sql)  //把SQL语句生成pstmt对象,后面才可以填充占位符中的数据

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.times)

        pstmt.addBatch()   //加入批处理
      }

      pstmt.execute()    //执行批量处理
      connection.commit()    //手工提交

    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }
}

为了对应以上的第3步,要把统计记录的DF生成一个个对象,放进list中:

  1. 创建模型类对应的list
  2. 对记录进行遍历,把记录的每个字段当做参数,创建模型类对象
  3. 把每个对象添加到list中
  4. 把list传进DAO类中

以下代码添加到上面的TopNJob类里面中就可以把之前生成到的topDF的结果记录保存到MySQL当中了:

try {
  topNDF.foreachPartition(partitionOfRecords => { //
    val list = new ListBuffer[VideoAccessStat]  //创建list来装统计记录

    //遍历每一条记录,取出来上面对应的三个字段day,cmsId,times
    partitionOfRecords.foreach(info => {
      val day = info.getAs[String]("day")   //后面的就是取出来的记录的每个字段
      val cmsId = info.getAs[Long]("cmsId")
      val times = info.getAs[Long]("times")

      //每一次循环创建一个VideoAccessStat对象,添加一次进入list中
      list.append(VideoAccessStat(day, cmsId, times))
    })
    //把list传进DAO类
    StatDAO.insertDayAccessTopN(list)
  })
} catch {
  case e: Exception => e.printStackTrace()
}

到此为止已经把项目需求一完成。

功能实现二:按照城市来找出topN视频

在功能一的基础上,运用row_number函数来实现

具体的实现代码:

  //先计算访问次数,并按照day,cmsId,city分组
  val cityAccessTopNDF = accessDF.filter(accessDF.col("day") === "20170511" && accessDF.col("cmsType") === "video")
    .groupBy("day", "cmsId", "city").agg(count("cmsId").as("times"))

  //进行分地市排序,使用到row_number函数,生成一个排名,定义为time_rank, 并且取排名前3
  cityAccessTopNDF.select(
    cityAccessTopNDF.col("day"),
    cityAccessTopNDF.col("cmsId"),
    cityAccessTopNDF.col("times"),
    cityAccessTopNDF.col("city"),
    row_number().over(Window.partitionBy(cityAccessTopNDF.col("city"))
      .orderBy(cityAccessTopNDF.col("times").desc)
    ).as("times_rank")
  ).filter("times_rank <= 3").show(false)
}

其他步骤和功能一一样,但是插入Mysql的时候报错,原因是MySQL不支持插入中文!!!!

首先可以在mysql命令行中用SET character来改:

SET character_set_client = utf8

可通过

show variables like ‘character_set_%’;

查看当前的character编码设置

然后在jdbc连接时,加上:

useUnicode=true&characterEncoding=utf8

改了之后,虽然能够导入MySQL了,而且不出现乱码,但只有一部分数据,并且在控制台报错:

com.mysql.jdbc.PreparedStatement.fillSendPacket

com.mysql.jdbc.PreparedStatement.execute

后来把批处理删掉竟然就可以把所有数据导入了:

pstmt.executeUpdate  //不使用批处理的pstmt插入

功能三:按流量来排序topN视频

和功能一几乎完全一样,只不过计算流量总和时用的不是count函数而是要用sum函数

为了代码的复用性,防止生成重复的数据,在StatDAO定义删除的函数:

def deleteDayData(day: String) = {

  var connection: Connection = null
  var pstmt: PreparedStatement = null
  var tables = Array("day_topn_video",
    "day_city_topn_video",
    "traffic_topn_video"
  )

  try {
    connection = MySQLUtils.getConnection()

    for (table <- tables) {
      val deleteSql = s"delete from $table where day = ?”  //Scala特殊处理
      pstmt = connection.prepareStatement(deleteSql)
      pstmt.setString(1, table)
      pstmt.setString(2, day)
      pstmt.executeUpdate()
    }
  } catch {
    case e: Exception => e.printStackTrace()
  } finally {
    MySQLUtils.release(connection, pstmt)
  }
}

需要注意的是,table在pstmt中的特殊用法!!

后续会对以上内容进行可视化处理、跑在YARN上的修改、性能调优

原文地址:https://www.cnblogs.com/kinghey-java-ljx/p/8543552.html

时间: 2024-08-01 18:26:28

Spark SQL实现日志离线批处理的相关文章

以慕课网日志分析为例 进入大数据 Spark SQL 的世界

详情请交流  QQ  709639943 01.以慕课网日志分析为例 进入大数据 Spark SQL 的世界 02.漫谈spring cloud分布式服务架构 03.Spring Cloud微服务实战视频课程 04.漫谈spring cloud 与 spring boot 基础架构 05.Java秒杀系统方案优化 高性能高并发实战 06.Java深入微服务原理改造房产销售平台 07.快速上手Linux 玩转典型应用 08.快速上手Ionic3 多平台开发企业级问答社区 09.Java Sprin

CK2255-以慕课网日志分析为例 进入大数据 Spark SQL 的世界

新年伊始,学习要趁早,点滴记录,学习就是进步! 随笔背景:在很多时候,很多入门不久的朋友都会问我:我是从其他语言转到程序开发的,有没有一些基础性的资料给我们学习学习呢,你的框架感觉一下太大了,希望有个循序渐进的教程或者视频来学习就好了.对于学习有困难不知道如何提升自己可以加扣:1225462853  获取资料. 下载地址:https://pan.baidu.com/s/1hsU5EIS 以慕课网日志分析为例 进入大数据 Spark SQL 的世界 本课程以"慕课网日志分析"这一大数据应

Spark SQL源码分析之核心流程

自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,而且发展速度异常迅猛,究其原因,个人认为有以下2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样可以应用于多种任务,流处理,批处理,包括机器学习里都可以引入Sql. 2.效率:因为Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型里. 前一段时间测试过Shark,并且对Spark

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark SQL案例实战(一)

作者:周志湖 放假了,终于能抽出时间更新博客了--. 1. 获取数据 本文通过将github上的Spark项目git日志作为数据,对SparkSQL的内容进行详细介绍 数据获取命令如下: [[email protected] spark]# git log --pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae"

Spark SQL笔记——技术点汇总

目录 · 概述 · 原理 · 组成 · 执行流程 · 性能 · API · 应用程序模板 · 通用读写方法 · RDD转为DataFrame · Parquet文件数据源 · JSON文件数据源 · Hive数据源 · 数据库JDBC数据源 · DataFrame Operation · 性能调优 · 缓存数据 · 参数调优 · 案例 · 数据准备 · 查询部门职工数 · 查询各部门职工工资总数,并排序 · 查询各部门职工考勤信息 概述 1. Spark SQL是Spark的结构化数据处理模块.

Parquet + Spark SQL

海量数据存储 推荐用 Parquet列式存储 替代 HDFS上的文件 下面两篇文章讲解 用Parquet列式存储来存储数据,主要是提高查询性能.和存储压缩 <Spark SQL下的Parquet使用最佳实践和代码实战>http://blog.csdn.net/sundujing/article/details/51438306<操作技巧:将 Spark 中的文本转换为 Parquet 以提升性能>http://www.ibm.com/developerworks/cn/analyt

Spark SQL源代码分析之核心流程

/** Spark SQL源代码分析系列文章*/ 自从去年Spark Submit 2013 Michael Armbrust分享了他的Catalyst,到至今1年多了,Spark SQL的贡献者从几人到了几十人,并且发展速度异常迅猛,究其原因,个人觉得有下面2点: 1.整合:将SQL类型的查询语言整合到 Spark 的核心RDD概念里.这样能够应用于多种任务,流处理,批处理,包含机器学习里都能够引入Sql. 2.效率:由于Shark受到hive的编程模型限制,无法再继续优化来适应Spark模型

第97课:Spark Streaming 结合Spark SQL 案例

代码如下: package com.dt.spark.streaming import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} /**  * 使用SparkStreaming结合SparkSQL对日志进行分析.  * 假设电商网站点击日志格式(简化)

Spark SQL下的Parquet使用最佳实践和代码实战

一:Spark SQL下的Parquet使用最佳实践 1,过去整个业界对大数据的分析的技术栈的Pipeline一般分为一下两种方式: A)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL) -> HDFS Parquet -> SparkSQL/impala -> Result Service(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用): B)Data Source -> Real time update