spark streaming windowDuration,slideDuration,batchDuration?

spark streaming 不同于sotm,是一种准实时处理系统。storm 中,把批处理看错是时间教程的实时处理。而在spark streaming中,则反过来,把实时处理看作为时间极小的批处理。

1、三个时间参数

spark streaming 中有三个关于时间的参数,分别如下:

窗口时间windowDuration?:当前窗口要统计多长时间的数据,是批量时间的整数倍

滑动时间slideDuration?:要多长时间更新一次结果,是批量时间的整数倍

批量时间batchDuration?:多长时间创建一个批次,与实际业务无关,只与数据量有关,数据量大则可以设置短一些,数据量小则设置长一些,但必须小于其他两个时间,

2、该怎么设置?

为方便理解,就拿咱们最常见的日启、日活、周启、周活作为示例

注:1、实际中日启、日活、周启、周活更多是用批处理,此处只是拿来方便大家理解

2、此处不是严格意义上的日启、周启。此处的日:最近24小时,周:最近7天

案例1:每隔一小时,统计产品的日启、日活,

窗口时间:1日,滑动时间:1小时,批量时间:1小时、半小时、15分钟、10分钟、5分钟、2分钟均可,视数据量大小而定

案例2:每天统计最近七天累计启动、活跃

窗口时间:7日,滑动时间:1日 批量时间:一小时、半小时、10分钟、5分钟

3、实战

为了理解上边参数是怎么设置的,我们对假定现在有个需求,需要对输入的字母进行计数。

使用nc -lk 9999 模拟生产者,发送数据,streaming 通过socket接收数据

实战1:每10秒统计当前输入的字符

适用:彻底非累加业务

  import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
  import org.apache.spark.SparkConf

  val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
  //10秒创建一个批次
  val ssc = new StreamingContext(sparkConf, Seconds(10))
  val lines = ssc.socketTextStream("localhost", 9999)
    val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l))
    val current_stream = wordCounts.reduceByKey(_ + _)
    current_stream.print()
    current_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_current.")

    ssc.start()
    ssc.awaitTermination()

启动生产者 nc -lk 9999

在spark-shell中输入上边代码

在nc 的终端下,

输入字符操作1、第一个10秒,输入a,第二个10秒输入b,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件,期望结果   【    (a,1) (b,1) 】  原因:我们当前仅输入了a、b

输入字符操作2、第四个10秒,输入c,第五个10秒输入d,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件   期望 【 (c,1) (d,1)】 原因:我们当前输入了c、d

输入字符操作3、这时,不需要操作,等待30秒,在spark-shell中确认第三次计算完成后,查看新产生文件 期望 【 】 原因:当前我们没有输入, 所以没有任何字符可以统计

实战2、每10秒统计历史所有输入的字符。

适用范围:计算历史(包含窗口之外)累计数据,经常用于统计“总装机量”之类

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
import org.apache.spark.SparkConf

val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
  //10秒创建一个批次
  val ssc = new StreamingContext(sparkConf, Seconds(10))
//累加所有经过的数据
    val updateFunc = (values: Seq[Long], state: Option[Long]) => {
      val currentCount = values.foldLeft(0l)(_ + _)
      val previousCount = state.getOrElse(0l)
      Some(currentCount + previousCount)
    }
    ssc.checkpoint("socket_wordcount_history")
    val lines = ssc.socketTextStream("localhost", 9999)
    val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1l))
    val history_stream = wordCounts.updateStateByKey[Long](updateFunc)//合并当前数据和历史数据

    history_stream.print()
    history_stream.repartition(1).saveAsTextFiles("/data/socket_wordcount_history.")

    ssc.start()
    ssc.awaitTermination()

启动生产者 nc -lk 9999

在spark-shell中输入上边代码

在nc 的终端下,

输入字符操作1、第一个10秒,输入a,第二个10秒输入b,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件,期望结果   【    (a,1) (b,1) 】  原因:我们当前输入了a、b

输入字符操作2、第四个10秒,输入c,第五个10秒输入d,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件   期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:我们当前输入了c、d,历史输入过 a、b

输入字符操作3、这时,不需要操作,等待30秒,在spark-shell中确认第三次计算完成后,查看新产生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:当前我们没有输入,但是,历史曾经输入过a、b、c、d

输入字符操作4、这时,仍不需要操作,等待30秒,在spark-shell中确认第四次计算完成后,查看新产生文件 期望 【 (a,1) (b,1) (c,1) (d,1)】原因: 当前我们没有输入,但是,历史曾经输入过a、b、c、d

之后,即使没有输入abcd,统计结果仍包含abcd这四个字符各1次

实战3、每隔30秒,统计最近1分钟输入的字母。窗口内历史累加

(适用范围:非累加业务,这里的累加指的是超出window范围)

sc.stop

import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}import org.apache.spark.SparkConf

val updateFunc = (values: Seq[Long], state: Option[Long]) => {
val currentCount = values.foldLeft(0l)(_ + _)
val previousCount = state.getOrElse(0l)
Some(currentCount + previousCount)
}
val sparkConf = new SparkConf().setAppName("socket-streaming-wordcount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(10))   //10秒创建一个批次
ssc.checkpoint("socket-kafka-wordcount_recent")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
val stateDstream = wordCounts.reduceByKeyAndWindow(_ + _, _ - _, Minutes(1), Seconds(30))  //每30秒算一次,数据范围为最近一分钟内收到的数据  另外,使用window时,需要设置checkpoint

stateDstream.print()
stateDstream.repartition(1).saveAsTextFiles("/data/socket-streaming-wordcount.log")

ssc.start()
ssc.awaitTermination()

启动生产者 nc -lk 9999

在spark-shell中输入上边代码

在nc 的终端下,

输入字符操作1、第一个10秒,输入a,第二个10秒输入b,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件,期望结果   【    (a,1) (b,1) 】  原因:最近1分钟,我们只输入了a、b

输入字符操作2、第四个10秒,输入c,第五个10秒输入d,等待10秒,在spark-shell中确认第二次计算完成后,查看新产生文件   期望 【 (a,1) (b,1) (c,1) (d,1)】 原因:最近1分钟,我们只输入了a、b、c、d

输入字符操作3、这时,不需要操作,等待30秒,在spark-shell中确认第三次计算完成后,查看新产生文件 期望 【 (a,0) (b,0) (c,1) (d,1)】 原因:最近1分钟,我们只输入了c、d ,1分钟之前输入的a、b将不再在统计范围之内

输入字符操作4、这时,仍不需要操作,等待30秒,在spark-shell中确认第四次计算完成后,查看新产生文件 期望 【 (a,0) (b,0) (c,0) (d,0)】原因:最近1分钟,我们没有任何输入,1 分钟之前输入的a、b、c、d将不再在统计范围之内

时间: 2024-10-27 03:10:47

spark streaming windowDuration,slideDuration,batchDuration?的相关文章

Spark Streaming中空RDD处理及流处理程序优雅的停止

本期内容 : Spark Streaming中的空RDD处理 Spark Streaming程序的停止 由于Spark Streaming的每个BatchDuration都会不断的产生RDD,空RDD有很大概率的,如何进行处理将影响其运行的效率.资源的有效使用. Spark Streaming会不断的接收数据,在不清楚接收的数据处理到什么状态,如果你强制停止掉的话,会涉及到数据不完整操作或者一致性相关问题. 一. Spark Streaming中的空RDD处理 : ForEachRDD是产生Ds

Spark Streaming发行版笔记16:数据清理内幕彻底解密

本讲从二个方面阐述: 数据清理原因和现象 数据清理代码解析 Spark Core从技术研究的角度讲 对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序. Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器.广播变量,所以需要对对象及 元数据需要定期清理.每个batch duration运行时不断触发job后需要清理rdd和元数据.Clinet模式 可以看到打印的日志,从文件日志也可以看到清理日志内容. 现在要看其背后的事情: Sp

Spark版本定制七:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

本期内容: 1,JobScheduler内幕实现 2,JobScheduler深度思考 摘要:JobScheduler是Spark Streaming整个调度的核心,其地位相当于Spark Core上的调度中心中的DAGScheduler!           一.JobScheduler内幕实现 问:JobScheduler是在什么地方生成的? 答:JobScheduler是在StreamingContext实例化时产生的,从StreamingContext的源码第183行中可以看出:    

Spark入门实战系列--7.Spark Streaming(下)--实时流计算Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} impor

Spark版本定制第5天:案列解析Spark Streaming运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一切不能进行实时流处理的数据都是无效的数据.在流处理时代,SparkStreaming有着强大吸引力,而且发展前景广阔,加之Spark的生态系统,Streaming可以方便调用其他的诸如SQL,MLlib等强大框架,它必将一统天下. Spark Streaming运行时与其说是Spark Core上的一个流式处理框架,不如说是Spark Core上的一个最复杂的应用程序.如果可以掌

Spark入门实战系列--7.Spark Streaming(下)--Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} import

Spark Streaming源码解读之Job详解

一:Spark Streaming Job生成深度思考 1. 做大数据例如Hadoop,Spark等,如果不是流处理的话,一般会有定时任务.例如10分钟触发一次,1个小时触发一次,这就是做流处理的感觉,一切不是流处理,或者与流处理无关的数据都将是没有价值的数据,以前做批处理的时候其实也是隐形的在做流处理. 2. JobGenerator构造的时候有一个核心的参数是jobScheduler, jobScheduler是整个作业的生成和提交给集群的核心,JobGenerator会基于DStream生

Spark Streaming源代码学习总结(一)

1.Spark Streaming 代码分析: 1.1 演示样例代码DEMO: 实时计算的WorldCount: import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevel object NetworkWordCount { def mai

Spark版本定制八:Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

本期内容: 1.DStream与RDD关系彻底研究 2.Streaming中RDD的生成彻底研究 一.DStream与RDD关系彻底研究 课前思考: RDD是怎么生成的? RDD依靠什么生成?根据DStream来的 RDD生成的依据是什么? Spark Streaming中RDD的执行是否和Spark Core中的RDD执行有所不同? 运行之后我们对RDD怎么处理? ForEachDStream不一定会触发Job的执行,但是它一定会触发job的产生,和Job是否执行没有关系: 对于DStream