Structured-Streaming之窗口操作

Structured Streaming 之窗口事件时间聚合操作

Spark StreamingExactly Once 指的是:

  • 每条数据从输入源传递到 Spark 应用程序 Exactly Once
  • 每条数据只会分到 Exactly Once batch 处理
  • 输出端文件系统保证幂等关系

Structured Streaming 返回的是 DataFrame/DataSet,我们可以对其应用各种操作 - 从无类型,类似 SQL 的操作(例如 selectwheregroupBy)到类型化的 RDD 类操作(例如 mapfilterflatMap)。

基本操作:选择,投影,聚合

  1. case class DeviceData(device: String, deviceType: String,


  2. signal: Double, time: DateTime) 


  3. val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string } 

  4. val ds: Dataset[DeviceData] = df.as[DeviceData] // streaming Dataset with IOT device data 


  5. // Select the devices which have signal more than 10 

  6. df.select("device").where("signal > 10") // using untyped APIs  

  7. ds.filter(_.signal > 10).map(_.device) // using typed APIs 


  8. // Running count of the number of updates for each device type 

  9. df.groupBy("deviceType").count() // using untyped API 


  10. // Running average signal for each device type 

  11. import org.apache.spark.sql.expressions.scalalang.typed 

  12. ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // using typed API 

不支持的操作:

但是,不是所有适用于静态 DataFrames/DataSet 的操作在流式 DataFrames/DataSet 中受支持。从 Spark 2.0 开始,一些不受支持的操作如下:

  • 在流 DataFrame/DataSet 上还不支持多个流聚集(即,流 DF 上的聚合链)。
  • 不支持 limittake(N)
  • 不支持 Distinct
  • sort 操作仅在聚合后在完整输出模式下支持
  • 流和静态流的外连接支持是有条件的:
    • 不支持带有流 DataSet 的完全外连接
    • 不支持右侧的流的左外连接
    • 不支持左侧的流的右外部联接
  • 不支持两个流之间的任何 join
  • 此外,还有一些方法不能用于流DataSet,它们是将立即运行查询并返回结果的操作,这对流DataSet没有意义。相反,这些功能可以通过显式地启动流查询来完成。
  • count() - 无法从流 DataSet 返回单个计数。

    相反,使用 ds.groupBy.count() 返回包含运行计数的流DataSet

  • foreach() - 使用 ds.writeStream.foreach(...)(参见下一节)。
  • show() - 而是使用控制台接收器

如果您尝试任何这些操作,您将看到一个 AnalysisException 如“操作 XYZ 不支持与流 DataFrames/DataSet”。

事件时间上的窗口操作

事件时间是嵌入在数据本身的时间,对于许多应用程序,我们可能希望根据事件时间进行聚合操作,为此,Spark2.x 提供了基于滑动窗口的事件时间集合操作。基于分组的聚合操作和基于窗口的聚合操作是非常相似的,在分组聚合中,依据用户指定的分组列中的每个唯一值维护聚合值,在基于窗口的聚合的情况下,对于行的事件时间落入的每个窗口维持聚合值。

structured-streaming-window

  1. import spark.implicits._



  2. val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } 


  3. // Group the data by window and word and compute the count of each group 

  4. val windowedCounts = words.groupBy( 

  5. window($"timestamp", "10 minutes", "5 minutes"), 

  6. $"word" 

  7. ).count() 

该段代码用于用于统计每10分钟内,接受到的不同词的个数,其中window($"timestamp", "10 minutes", "5 minutes")的含义为:假设初始时间 t=12:00,定义时间窗口为10分钟,每5分钟窗口滑动一次,也就是每5分钟对大小为10分钟的时间窗口进行一次聚合操作,并且聚合操作完成后,窗口向前滑动5分钟,产生新的窗口,如上图的一些列窗口 12:00-12:10,12:05-12:15,12:10-12:20。

在这里每个word包含两个时间,word产生的时间和流接收到word的时间,这里的timestamp就是word产生的时间,在很多情况下,word产生后,可能会延迟很久才被流接收,为了处理这种情况,Structured Streaming 引进了Watermarking(时间水印)功能,以保证能正确的对流的聚合结构进行更新

structured-streaming-late-data

Watermarking的计算方法Watermarking

  • In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
  • After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger)

Watermarking表示多长时间以前的数据将不再更新,也就是说每次窗口滑动之前会进行Watermarking的计算,首先统计这次聚合操作返回的最大事件时间,然后减去所然忍受的延迟时间就是Watermarking,当一组数据或新接收的数据事件时间小于Watermarking时,则该数据不会更新,在内存中就不会维护该组数据的状态

mw1

Structured Streaming 支持两种更新模式:

  1. Update 删除不再更新的时间窗口,每次触发聚合操作时,输出更新的窗口

structured-streaming-watermark-update-mode

2. Append 当确定不会更新窗口时,将会输出该窗口的数据并删除,保证每个窗口的数据只会输出一次

structured-streaming-watermark-append-mode

3. Complete 不删除任何数据,在 Result Table 中保留所有数据,每次触发操作输出所有窗口数据

时间: 2024-10-07 22:40:22

Structured-Streaming之窗口操作的相关文章

Spark Structured Streaming框架(4)之窗口管理详解

1. 结构 1.1 概述 Structured Streaming组件滑动窗口功能由三个参数决定其功能:窗口时间.滑动步长和触发时间. 窗口时间:是指确定数据操作的长度: 滑动步长:是指窗口每次向前移动的时间长度: 触发时间:是指Structured Streaming将数据写入外部DataStreamWriter的时间间隔. 图 11 1.2 API 用户管理Structured Streaming的窗口功能,可以分为两步完成: 1) 定义窗口和滑动步长 API是通过一个全局的window方法

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Structured Streaming 编程指南 概述 快速示例 Programming Model (编程模型) 基本概念 处理 Event-time 和延迟数据 容错语义 API 使用 Datasets 和 DataFrames 创建 streaming DataFrames 和 streaming Datasets Input Sources (输入源) streaming DataFrames/Datasets 的模式接口和分区 streaming DataFrames/Dataset

2,StructuredStreaming的事件时间和窗口操作

推荐阅读:1,StructuredStreaming简介 使用Structured Streaming基于事件时间的滑动窗口的聚合操作是很简单的,很像分组聚合.在一个分组聚合操作中,聚合值被唯一保存在用户指定的列中.在基于窗口的聚合的情况下,对于行的事件时间的每个窗口,维护聚合值. 如前面的例子,我们运行wordcount操作,希望以10min窗口计算,每五分钟滑动一次窗口.也即,12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20 这些十分钟窗口中进行单词统计

Structured Streaming曲折发展史

Structured Streaming曲折发展史 1.1. Spark Streaming 在2.0之前,Spark Streaming作为核心API的扩展,针对实时数据流,提供了一套可扩展.高吞吐.可容错的流式计算模型.Spark Streaming会接收实时数据源的数据,并切分成很多小的batches,然后被Spark Engine执行,产出同样由很多小的batchs组成的结果流.本质上,这是一种micro-batch(微批处理)的方式处理,这种设计让Spark Streaming面对复杂

Spark Streaming vs. Structured Streaming

简介 Spark Streaming Spark Streaming是spark最初的流处理框架,使用了微批的形式来进行流处理. 提供了基于RDDs的Dstream API,每个时间间隔内的数据为一个RDD,源源不断对RDD进行处理来实现流计算 Structured Streaming Spark 2.X出来的流框架,采用了无界表的概念,流数据相当于往一个表上不断追加行. 基于Spark SQL引擎实现,可以使用大多数Spark SQL的function 区别 1. 流模型 Spark Stre

Spark Structured Streaming框架(3)之数据输出源详解

Spark Structured streaming API支持的输出源有:Console.Memory.File和Foreach.其中Console在前两篇博文中已有详述,而Memory使用非常简单.本文着重介绍File和Foreach两种方式,并介绍如何在源码基本扩展新的输出方式. 1. File Structured Streaming支持将数据以File形式保存起来,其中支持的文件格式有四种:json.text.csv和parquet.其使用方式也非常简单只需设置checkpointLo

学习Spark2.0中的Structured Streaming(一)

转载自:http://lxw1234.com/archives/2016/10/772.htm Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL).Structured Streaming顾名思义,它将数据源和计算结果都映射成一张"结构化"的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率. Sp

Spark Streaming中的操作函数分析

根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象可以调用多种操作,主要分为以下几类 Transformations Window Operations Join Operations Output Operations 一.Transformations 1.map(func) map操作需要传入一个函数当做参数,具体调用形式为 val b = a.map(func) 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新

Structured Streaming教程(1) —— 基本概念与使用

近年来,大数据的计算引擎越来越受到关注,spark作为最受欢迎的大数据计算框架,也在不断的学习和完善中.在Spark2.x中,新开放了一个基于DataFrame的无下限的流式处理组件--Structured Streaming,它也是本系列的主角,废话不多说,进入正题吧! 简单介绍 在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设计上. 在过去使用streaming时,我们很容易的理解