第97课:Spark Streaming 结合Spark SQL 案例

代码如下:

package com.dt.spark.streaming

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}

/**
 * 使用SparkStreaming结合SparkSQL对日志进行分析。
 * 假设电商网站点击日志格式(简化)如下:
 * userid,itemId,clickTime
 * 需求:处理10分钟内item点击次数排序Top10,并且将商品名称显示出来。商品itemId与商品名称的对应关系存放在MySQL数据库中
 * Created by dinglq on 2016/5/4.
 */
object LogAnalyzerStreamingSQL {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")

    val sc = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //从数据库中加载itemInfo表
    val itemInfoDF = sqlContext.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://spark-master:3306/spark",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"iteminfo",
      "user"->"root",
      "password"-> "vincent"
      )).load()

    itemInfoDF.registerTempTable("itemInfo")

    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

    val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")

    val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()

    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +
          " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +
          " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "
        val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)

        // Persist top ten table for this window to HDFS as parquet file

        topTenClickItemLast10Minus.show()
      }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

case class AccessLog(userId: String, itemId: String, clickTime: String) {
}

object AccessLog {

  def parseLogLine(log: String): AccessLog = {
    val logInfo = log.split(",")
    if (logInfo.length == 3) {
      AccessLog(logInfo(0),logInfo(1), logInfo(2))
    }
    else {
      AccessLog("0","0","0")
    }
  }
}

MySQL中表的内容如下:

mysql> select * from spark.iteminfo;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001    | phone    |
| 002    | computer |
| 003    | TV       |
+--------+----------+
3 rows in set (0.00 sec)

在D创建目录logs_incoming

运行Spark Streaming 程序。

新建文件,内容如下:

0001,001,2016-05-04 22:10:20
0002,001,2016-05-04 22:10:21
0003,001,2016-05-04 22:10:22
0004,002,2016-05-04 22:10:23
0005,002,2016-05-04 22:10:24
0006,001,2016-05-04 22:10:25
0007,002,2016-05-04 22:10:26
0008,001,2016-05-04 22:10:27
0009,003,2016-05-04 22:10:28
0010,003,2016-05-04 22:10:29
0011,001,2016-05-04 22:10:30
0012,003,2016-05-04 22:10:31
0013,003,2016-05-04 22:10:32

将文件保存到目录logs_incoming 中,观察Spark程序的输出:

+------+--------+---+
|itemid|itemname|cnt|
+------+--------+---+
|   001|   phone|  6|
|   003|      TV|  4|
|   002|computer|  3|
+------+--------+---+

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-12-22 08:08:38

第97课:Spark Streaming 结合Spark SQL 案例的相关文章

第82讲:Spark Streaming第一讲:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 3.案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手

Spark Streaming结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收实时数据与关系型数据库中的表进行相关的查询操作: 使用技术:Spark Streaming + Spark JDBC External DataSources 代码雏形: package com.luogankun.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{ Seconds, StreamingContext } import

(版本定制)第1课:Spark Streaming另类在线实验及Spark Streaming本质理解

本节课内容: 1.Spark Streaming另类在线实验解析 2.Spark Streaming本质理解 Spark Streaming是Spark Core上的一个子框架,如果我们能够完全精通这个子框架,我们就能够更好的驾驭Spark.Spark Streaming和Spark SQL是目前最流行的框架,从研究角度而言,Spark SQL有太多涉及到SQL优化的问题,不太适合用来深入研究.而Spark Streaming和其他的框架不同,它更像是Spark Core的一个应用程序.如果我们

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL、DataFrame与Spark Streaming

主要内容 Spark SQL.DataFrame与Spark Streaming 1. Spark SQL.DataFrame与Spark Streaming 源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala import org.apache.spark.SparkConf

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: Spark Streaming数据清理原因和现象 Spark Streaming数据清理代码解析 对Spark Streaming解析了这么多课之后,我们越来越能感知,Spark Streaming只是基于Spark Core的一个应用程序,因此掌握Spark Streaming对于我们怎么编写Spark应用是绝对有好处的. Spark Streaming 不像Spark Core的应用程序,Spark Core的应用的数据是存储在底层文件系统,如HDFS等别的存储系统中,而Spar

第82课 Spark Streaming第一课 案例动手实战并在电光石火间理解其工作原理

本课内容提要: (1)什么是流处理以及Spark Streaming主要介绍 (2)Spark Streaming初体验 一.什么是流处理以及Spark Streaming主要介绍 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.

第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手机.

83课:Scala和Java二种方式实战Spark Streaming开发

一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming 基于Spark Core进行计算,需要注意事项: 设置本地master,如果指定local的话,必须配置至少二条线程,也可通过sparkconf来设置,因为Spark Streaming应用程序在运行的时候,至少有一条线程用于不断的循环接收数据,并且至少有一条线程用于处理接收的数据(否则的话无法有