第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密

Spark Streaming的DStream为我们提供了一个updateStateByKey方法,它的主要功能是可以随着时间的流逝在Spark Streaming中为每一个key维护一份state状态,通过更新函数对该key的状态不断更新。对每一个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新(对每个新出现的key,会同样执行state的更新函数操作),但是如果通过更新函数对state更新后返回none的话,此时刻key对应的state状态被删除掉,需要特别说明的是state可以是任意类型的数据结构,这就为我们的计算带来无限的想象空间;

重点来了!!!如果要不断的更新每个key的state,就一定会涉及到状态的保存和容错,这个时候就需要开启checkpoint机制和功能,需要说明的是checkpoint可以保存一切可以存储在文件系统上的内容,例如:程序未处理的数据及已经拥有的状态。

补充说明:关于流式处理对历史状态进行保存和更新具有重大实用意义,例如进行广告(投放广告和运营广告效果评估的价值意义,热点随时追踪、热力图)

简单的来说,如果我们需要进行wordcount,每个batchInterval都会计算出新的一批数据,这批数据如何更新到以前计算的结果上?updateStateByKey就能实现此功能。

函数定义如下:

def updateStateByKey[S: ClassTag](
    updateFunc: (Seq[V], Option[S]) => Option[S]
  ): DStream[(K, S)] = ssc.withScope {
  updateStateByKey(updateFunc, defaultPartitioner())
}

updateStateByKey 需要传入一个函数,该函数有两个参数Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key对应的以前的值。返回的时一个key的最新值。

下面我们用实例演示:

package com.dt.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 * Created by Administrator on 2016/5/3.
 */
object UpdateStateByKeyDemo {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")
    val ssc = new StreamingContext(conf,Seconds(20))
    //要使用updateStateByKey方法,必须设置Checkpoint。
    ssc.checkpoint("/checkpoint/")
    val socketLines = ssc.socketTextStream("spark-master",9999)

    socketLines.flatMap(_.split(",")).map(word=>(word,1))
      .updateStateByKey(
        (currValues:Seq[Int],preValue:Option[Int]) =>{
       val currValue = currValues.sum
         Some(currValue + preValue.getOrElse(0))
    }).print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

打包上传至spark集群。

打开nc,发送测试数据

r[email protected]:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark

运行spark 程序

[email protected]:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo  --master spark://spark-master:7077 ./spark.jar

查看运行结果:

-------------------------------------------
Time: 1462282180000 ms
-------------------------------------------
(scala,1)
(hive,1)
(spark,2)
(hadoop,2)
(Hbase,1)

我们在nc中再输入一些数据

[email protected]:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark
hadoop,spark,scala,hive
hadoop,Hbase,spark

再次查看结果:

-------------------------------------------
Time: 1462282200000 ms
-------------------------------------------
(scala,2)
(hive,2)
(spark,4)
(hadoop,4)
(Hbase,2)

可见,它将我们两次统计结果合并了。

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-12-21 21:50:03

第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密的相关文章

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

第93课:Spark Streaming updateStateByKey案例实战和内幕源码解密

本节课程主要分二个部分: 一.Spark Streaming updateStateByKey案例实战二.Spark Streaming updateStateByKey源码解密 第一部分: updateStateByKey的主要功能是随着时间的流逝,在Spark Streaming中可以为每一个可以通过CheckPoint来维护一份state状态,通过更新函数对该key的状态不断更新:对每一个新批次的数据(batch)而言,Spark Streaming通过使用updateStateByKey

第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密

一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming; import org.apache.spark.SparkConf; import o

第85讲:基于HDFS的SparkStreaming案例实战和内幕源码解密

一:Spark集群开发环境准备 启动HDFS,如下图所示: 通过web端查看节点正常启动,如下图所示: 2.启动Spark集群,如下图所示: 通过web端查看集群启动正常,如下图所示: 3.启动start-history-server.sh,如下图所示: 二:HDFS的SparkStreaming案例实战(代码部分) package com.dt.spark.SparkApps.sparkstreaming; import org.apache.spark.SparkConf; import o

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

第87课:Flume推送数据到SparkStreaming案例实战和内幕源码解密

本期内容: 1. Flume on HDFS案例回顾 2. Flume推送数据到Spark Streaming实战 3. 原理绘图剖析 1. Flume on HDFS案例回顾 上节课要求大家自己安装配置Flume,并且测试数据的传输.我昨天是要求传送的HDFS上. 文件配置: ~/.bashrc: export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf PA

第87讲:Flume推送数据到SparkStreaming案例实战和内幕源码解密

本期内容: 1. Flume on HDFS案例回顾 2. Flume推送数据到Spark Streaming实战 3. 原理绘图剖析 1. Flume on HDFS案例回顾 上节课要求大家自己安装配置Flume,并且测试数据的传输.我昨天是要求传送的HDFS上. 文件配置: ~/.bashrc: export FLUME_HOME=/usr/local/flume/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf PA

第93讲:Spark Streaming updateStateByKey案例实战和内幕源码

本节课程主要分二个部分: 一.Spark Streaming updateStateByKey案例实战 二.Spark Streaming updateStateByKey源码解密 第一部分: updateStateByKey它的主要功能是随着时间的流逝,在Spark Streaming中可以为每一个key可以通过CheckPoint来维护一份state状态,通过更新函数对该key的状态不断更新:在更新的时候,对每一个新批次的数据(batch)而言,Spark Streaming通过使用upda

蘑菇云行动前传第16课:Scala implicits编程彻底实战及Spark源码鉴赏

package com.dtspark.scala.basics /** * Implicits隐式转换实战 */ class Man(val name:String) /*object Man{ implicit def man2SuperMan(man:Man)=new SuperMan(man.name) }*/ object implicits{ implicit def man2SuperMan(man:Man)=new SuperMan(man.name) } class Super