spark窗口操作

流数据的窗口操作:窗口操作,即把几个批次的数据整合到一个窗口里计算,并且窗口根据步长不断滑动。

本质:把小批次,小颗粒的数据任意进行大批次和大颗粒的数据统计,意味着批次采集周期不用设置太大,可以通过滑动窗口来调整数据出现的粒度。

code:

package com.home.spark.streaming

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * @Description:
  *  TODO 窗口操作,即把几个批次的数据整合到一个窗口里计算,并且窗口根据步长不断滑动
  *   所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。
  *   窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。
  *   如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。
  *   而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,
  *   并且我们只希望每两个批次计算一次窗口结果, 就应该把滑动步长设置为 20 秒。
  **/
object Ex_window {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark streaming wordcount")

    conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    //环境对象,设置采集周期
    val scc: StreamingContext = new StreamingContext(conf, Seconds(5))
    // TODO: 可以通过ssc.sparkContext 来访问SparkContext或者通过已经存在的SparkContext来创建StreamingContext

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.44.10:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )

    val topics = Array("test")

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](
        topics,
        kafkaParams
      )
    )

    //采集周期为5秒,窗口为15秒(包含三个采集批次),滑动步长为5秒,即每个批次滑动一次。
    val words: DStream[String] = kafkaStream.flatMap(t=>t.value().split(" ")).window(Seconds(15),Seconds(5))

    //    val words: DStream[String] = socketStream.flatMap(_.split(" "))

    val pairs = words.map(word => (word, 1))

    val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)

    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print

    // Start the computation
    // 通过 streamingContext.start()来启动消息采集和处理
    scc.start()

    // Wait for the computation to terminate
    // 通过streamingContext.stop()来手动终止处理程序
    scc.awaitTermination()

  }
}

原文地址:https://www.cnblogs.com/asker009/p/12097912.html

时间: 2024-10-20 12:24:33

spark窗口操作的相关文章

Structured-Streaming之窗口操作

Structured Streaming 之窗口事件时间聚合操作 Spark Streaming 中 Exactly Once 指的是: 每条数据从输入源传递到 Spark 应用程序 Exactly Once 每条数据只会分到 Exactly Once batch 处理 输出端文件系统保证幂等关系 Structured Streaming 返回的是 DataFrame/DataSet,我们可以对其应用各种操作 - 从无类型,类似 SQL 的操作(例如 select,where,groupBy)到

uCGUI窗口操作点滴记录

一.窗口操作的要点 0.调试时观察的全局变量WM__NumWindows.WM__NumInvalidWindows.WM__FirstWin.NextDrawWin. 1. 创建一个窗口的时候,会给此窗口发送“创建”消息,从而执行它的回到函数:如果创建窗口的状态标志为“可视(WM_CF_SHOW)”,那么在执行GUI_Exec函数时会对窗口进行重绘.如果创建窗口的标志带有WM_CF_ACTIVATE,那么这个窗口在创建的时候也会被激活,否则不激活.2. WM_SendMessage()发送消息

window.open()方法用于子窗口数据回调至父窗口,即子窗口操作父窗口

window.open()方法用于子窗口数据回调至父窗口,即子窗口操作父窗口 项目中经常遇到一个业务逻辑:在A窗口中打开B窗口,在B窗口中操作完以后关闭B窗口,同时自动刷新A窗口(或局部更新A窗口)(或将数据传回A窗口) 以下是从实际项目中截取出来和window.open()方法相关的代码,业务逻辑如下: 1. 点击父窗口的div标签(id="addMatchSchedule"),出发点击事件,打开子窗口: 2. 点击子窗口的button按钮,触发点击时间,即调用addSchduleI

js子窗口操作父窗口的标签

======================================父窗体 <input id="aaaa" type="button"/> function upfile()         {                         resultValue = window.showModelessDialog("ceshi.aspx?file=DownFile", window, "dialogWidt

Spark SQL操作详细讲解

一. Spark SQL和SchemaRDD 关于Spark SQL的前生就不再多说了,我们只关注它的操作.但是,首先要搞明白一个问题,那就是究竟什么是SchemaRDD呢?从Spark的Scala API可以知道org.apache.spark.sql.SchemaRDD和class SchemaRDD extends RDD[Row] with SchemaRDDLike,我们可以看到类SchemaRDD继承自抽象类RDD.官方文档的定义是"An RDD of Row objects tha

Q窗口操作函数(窗口最大化,全屏,隐藏最大化最小化按钮)

//Qt主窗口没有最小化,最大化按钮且最大化显示  int main(int argc, char *argv[]) { QApplication a(argc, argv); TestQtForWinCE w; w.setWindowFlags(w.windowFlags()& ~Qt::WindowMaximizeButtonHint& ~Qt::WindowMinimizeButtonHint); w.showMaximized(); return a.exec(); } 这里的&q

CKFinder 弹出窗口操作并设置回调函数

CKFinder 弹出窗口操作并设置回调函数 官方例子参考CKFinderJava-2.4.1/ckfinder/_samples/popup.html 写一个与EXT集成的小例子 Ext.define("MyButton", { extend : "Ext.Button", text : "ckfinder", initComponent : function() { var me = this; Ext.apply(me, { handler

Vim 缓冲区与窗口 操作

##############缓冲区 :e(:open) 打开新缓冲区 :ls (:buffers) 列出列表内所有缓冲区/bs /bv /be(BurExplore快捷键) :buffer 2 切换缓冲区:bnext/bn (Ctrl 6) 下一缓冲区:bprevious/bp 上衣缓冲区:bfirst/bf   第一个缓冲区:blast/bl 最后一个缓冲区 :sbuffer 3 在新窗口打开缓冲区3 :bdelete 4 将缓冲区移至列表外:bwipeout 4 关闭缓冲区 ########

VIM窗口操作

开窗 横向切割(水平排列)窗口 :new+窗口名(保存后就是文件名) 窗口名可选 :split+窗口名,也可以简写为:sp+窗口名 当前文件分别显示到了两个窗口,内容完全相同,并保持同步. 纵向切割(垂直排列)窗口 :vsplit+窗口名,也可以简写为:vsp+窗口名 切窗 Ctrl-w w 向后切换窗口,窗口操作都使用Ctrl-w开头. Ctrl-w p 切换回上一个窗口,注意不是与w对应的操作. Ctrl-w h 切换到左边窗口,如果有的话,下同 Ctrl-w l 切换到右边窗口 Ctrl-