第95课:通过Spark Streaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战

背景描述:在社交网络(例如微博)、电子商务(例如京东)、搜索引擎(例如百度)等人们核心关注的内容之一就是我所关注的内容中大家正在最关注什么或者说当前的热点是什么,这在实际企业级应用中是非常有价值的。例如我们关系过去30分钟大家正在热搜索什么,并且每5分钟更新一次,这就使得热点内容是动态更新,当然也是更有价值。

我们知道在SparkStreaming中可以设置batchInterval,让SparkStreaming每隔batchInterval时间提交一次Job,假设batchInterval设置为5秒,那如果需要对1分钟内的数据做统计,该如何实现呢?SparkStreaming中提供了window的概念。我们看下图:

window可以包含多个batchInterval(例如5秒),但是必须为batchInterval的整数倍例如1分钟。另外window可以移动,称之为滑动时间间隔,它也是batchInterval的整数倍,例如10秒。一般情况滑动时间间隔小于window的时间长度,否则会丢失数据。

SparkStreaming提供了如下与window相关的方法:

我们可以使用reduceByKeyAndWindow操作来做具体实现热词排序

package com.dt.spark.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * 使用Scala开发集群运行的Spark来实现在线热搜索词
 *
 * @author DT大数据梦工厂
 * 新浪微博:http://weibo.com/ilovepains/
 * 
 */
object OnlineHottestItems {
    def main(args: Array[String]){
      /**
       * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
       * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
       * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
       * 只有1G的内存)的初学者       * 
       */
      val conf = new SparkConf() //创建SparkConf对象
      conf.setAppName("OnlineHottestItems") //设置应用程序的名称,在程序运行的监控界面可以看到名称
      conf.setMaster("spark://Master:7077") //此时,程序在Spark集群

      /**
        * 此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔
        * 一定是该Batch Interval的整数倍
        */
      val ssc = new StreamingContext(conf, Seconds(5))
      

      val hottestStream = ssc.socketTextStream("Master", 9999)

      /**
        * 用户搜索的格式简化为item,time在这里我们由于要计算出热点内容,所以只需要提取出item即可
        * 提取出的item然后通过map转换为(item,1)格式
        */
      val searchPair = hottestStream.map(_.split(",")(0)).map(item => (item, 1))
      val hottestDStream = searchPair.reduceByKeyAndWindow((v1:Int, v2:Int) => v1 + v2, Seconds(60), Seconds(20))

      hottestDStream.transform(hottestItemRDD => {
        val top3 = hottestItemRDD.map(pair => (pair._2, pair._1)).sortByKey(false).
          map(pair => (pair._2, pair._1)).take(3)

        ssc.sparkContext.makeRDD(top3)
            }).print()
      ssc.start()
      ssc.awaitTermination()

    }
}

运行程序

[email protected]:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.OnlineHottestItems --master spark://spark-master:7077 ./spark.jar

打开netcat发送数据

[email protected]:~# nc -lk 9999
Spark,11111
Spark,2222
hadoop,13143
scala,34343
hadoop,23232      
Spark,11111
Spark,2222
hadoop,13143
scala,34343
hadoop,23232 
java,34343

打印结果

-------------------------------------------
Time: 1462199230000 ms
-------------------------------------------
(Spark,4)
(hadoop,4)
(scala,2)

后续可以在nc中继续输入数据,观察程序打印结果的变化。

时间: 2024-09-29 03:08:28

第95课:通过Spark Streaming的window操作实战模拟新浪微博、百度、京东等热点搜索词案例实战的相关文章

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

第3课:解读spark –streaming运行机制

感谢DT大数据梦工厂支持提供以下内容,DT大数据梦工厂专注于Spark发行版定制.详细信息请查看 联系邮箱[email protected] 电话:18610086859 QQ:1740415547 微信号:18610086859 定制班:第三课 解读spark –streaming运行机制 一 从实战出发 首先我们运行以下的程序,然后通过这个程序的运行过程进一步加深理解Spark Streaming流处理的Job的执行的过程,代码如下:   def main(args: Array[Strin

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

本期内容: 1.Spark Streaming元数据清理详解 2.Spark Streaming元数据清理源码解析 一.如何研究Spark Streaming元数据清理 操作DStream的时候会产生元数据,所以要解决RDD的数据清理工作就一定要从DStream入手.因为DStream是RDD的模板,DStream之间有依赖关系. DStream的操作产生了RDD,接收数据也靠DStream,数据的输入,数据的计算,输出整个生命周期都是由DStream构建的.由此,DStream负责RDD的整个

Spark Streaming通过JDBC操作数据库

本文记录了学习使用Spark Streaming通过JDBC操作数据库的过程,源数据从Kafka中读取. Kafka从0.10版本提供了一种新的消费者API,和0.8不同,因此Spark Streaming也提供了两种API与之对应,其中spark-streaming-kafka-0-8支持Kafka 0.8.2.1以后的Broker:spark-streaming-kafka-0-10支持0.10.0以上Broker,处于实验阶段.两者的对比如下表所示. |spark-streaming-ka

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a

第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

上一课我们讲解了Receiver启动的流程.Receiver是通过ReceiverSupervisor的start方法启动的: /** Start the supervisor */ def start() {   onStart()   startReceiver() } 首先会调用ReceiverSupervisor的onStart()方法, override protected def onStart() {   registeredBlockGenerators.foreach { _.

第89课:Spark Streaming on Kafka解析和安装实战

本课分2部分讲解: 第一部分,讲解Kafka的概念.架构和用例场景: 第二部分,讲解Kafka的安装和实战. 由于时间关系,今天的课程只讲到如何用官网的例子验证Kafka的安装是否成功.后续课程会接着讲解如何集成Spark Streaming和Kafka. 一.Kafka的概念.架构和用例场景 http://kafka.apache.org/documentation.html#introdution 1.Kafka的概念 Apache Kafka是分布式发布-订阅消息系统.它最初由Linked

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con