Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

一、Flink流处理简介

Flink流处理的API叫做DataStream,可以在保证Exactly-Once的前提下提供高吞吐、低延时的实时流处理。用Flink作为流处理框架成功的案例可参考Flink母公司–Data Artisans官方blog中的2篇文章:

How we selected Apache Flink as our Stream Processing Framework at the Otto Group Business Intelligence Department

RBEA: Scalable Real-Time Analytics at King

二、Flink中的Time模型

Flink中提供了3种时间模型:EventTime、ProcessingTime、与Ingestion Time。底层实现上分为2种:Processing Time与Event Time,而Ingestion Time本质上也是一种Event Time,可以通过官方文档上的一张图展现是3者的区别:

Event Time:事件产生的时间,即数据产生时自带时间戳,例如‘2016/06/17 11:04:00.960’

Ingestion Time:数据进入到Flink的时间,即数据进入source operator时获取时间戳

Processing Time:系统时间,与数据本身的时间戳无关,即在window窗口内计算完成的时间(默认的Time)

关于Flink中的Time,可以参考官方文档中的说明:Event Time

关于Event Time,需要指出的是:数据产生的时间,编程时首先就是要告诉Flink,哪一列作为Event Time列,同时分配时间戳(TimeStamp)并发出水位线(WaterMark),来跟踪Event Time。简单理解,就是以Event Time列作为时间。水位线既然是用来标记Event Time的,那么Event Time在产生时有可能因为网络或程序错误导致的时间乱序,即Late Element的产生,因此WaterMark分为有序与无序2种:

关于Late Element,举个例子说明:数据随着时间的流逝而产生,即数据的产生本是升序的,当Flink采用Event Time作为时间模型时,理论上也应该是升序的数据不断的进行计算。但是突然有个“延迟的”数据进入到了Flink,此时时间窗口已过,那么这个“延迟的”数据就不会被正确的计算。

对于这些数据,流处理的可能无法实时正确计算,因为WarterMark不可能无限制的等待Late Element的到来,所以可以通过之后的批处理(batch)对已经计算的数据进行更正。

关于流计算、时间模型与窗口的说明,可以参考:

Streaming 101

Streaming 102

三、Flink流处理编程的步骤

共5个步骤:

1、获取DataStream的运行环境

2、从Source中创建DataStream

3、在DataStream上进行transformation操作

4、将结果输出

5、执行流处理程序

四、程序说明

说明:
IDE:IntelliJ IDEA Community Edition(From JetBrains)
开发语言: Scala 2.10
运行环境:Flink 1.0.3 集群(1个JobManager+2个TaskManager)
程序提交:客户端CLI
管理工具:maven 3.3.9

五、程序演示–体会Event Time

关键点:

设置Event time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

复写map方法,实现稍微复杂点的map transaction:

map(new EventTimeFunction)

分配timestamp以及watermark:

val timeValue = parsedStream.assignAscendingTimestamps(_._2)

在dataStream上运行keyBy操作,产生keyedStream,继而在keyedStream上运行window操作,产生windowedStream,此时,windowedStream包含的元素主要包含3方面:K->key,W->window,T->Iterable[(..)],即每个key在特定窗口内的元素的集合。不同的stream之间的互相调用,可以参考:

Flink 原理与实现:数据流上的类型和操作

在windowedStream上聚合sum:

val sumVolumePerMinute = timeValue
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .sum(3)
      .name("sum volume per minute")

运行程序,测试结果:

输入:

600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085
......

输出如下:

(60000,20160520093000960,600)
(60000,20160520093101000,300)
...

可以看出,结果就是按照Event Time的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。

Event Time Test的详细完整代码如下:

import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * 这是一个简单的Flink DataStream程序,实现每分钟的累计成交量
  * source:通过SocketStream模拟kafka消费数据
  * sink:直接print输出到local,以后要实现sink到HDFS以及写到Redis
  * 技术点:
  *        1、采用EventTime统计每分钟的累计成交量,而不是系统时钟(processing Time)
  *        2、将输入的时间合并并生成Long类型的毫秒时间,以此作为Timestamp,生成Timestamp和WaterMark
  *        3、采用TumblingEventTimeWindow作为窗口,即翻滚窗口,不重叠的范围内实现统计
  */
object TransactionSumVolume1 {
  case class Transaction(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,
                         nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,
                         nAskOrder:Long, nBidOrder:Long, localTime:Long
                        )

  def main(args: Array[String]): Unit = {

    /**
      * when Running the program, you should input 2 parameters: hostname and port of Socket
      */
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt

    /**
      * Step 1. Obtain an execution environment for DataStream operation
      * set EventTime instead of Processing Time
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    /**
      * Step 2. Create DataStream from socket
      */
    val input = env.socketTextStream(hostName,port)

    /**
      * Step 3. Implement ‘分钟成交量‘ logic
      */

    /**
      * parse input stream to a new Class which is implement the Map function
      */
    val parsedStream = input
      .map(new EventTimeFunction)

    /**
      * assign Timestamp and WaterMark for Event time: eventTime(params should be a Long type)
      */
    val timeValue = parsedStream.assignAscendingTimestamps(_._2)

    val sumVolumePerMinute = timeValue
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .sum(3)
      .name("sum volume per minute")

    /**
      * Step 4. Sink the final result to standard output(.out file)
      */
    sumVolumePerMinute.map(value => (value._1,value._3,value._4)).print()

    /**
      * Step 5. program execution
      */

    env.execute("SocketTextStream for sum of volume Example")

  }

  class EventTimeFunction extends MapFunction[String, (Long, Long, String, Long)] {

    def map(s: String): (Long, Long, String, Long) = {

      val columns = s.split(",")

      val transaction : Transaction = Transaction(columns(0),columns(1).toLong,columns(2),columns(3),columns(4).toLong,columns(5).toLong,
        columns(6).toLong,columns(7).toLong,columns(8).toLong,columns(9).toInt,columns(9),columns(10),columns(11).toLong,
        columns(12).toLong,columns(13).toLong)

      val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")

      val volume : Long = transaction.nVolume
      val szCode : Long = transaction.szCode

      if (transaction.nTime.length == 8 ) {
        val eventTimeString = transaction.nAction + ‘0‘ + transaction.nTime
        val eventTime : Long= format.parse(eventTimeString).getTime
        (szCode, eventTime, eventTimeString, volume)
      }else {
        val eventTimeString = transaction.nAction + transaction.nTime
        val eventTime = format.parse(eventTimeString).getTime
        (szCode, eventTime, eventTimeString, volume)
      }

    }
  }

}

六、程序演示–体会Processing Time

关键点:

设置TumblingProcessingTimeWindow,由于默认的Time Characteristic就是Processing Time,因此不用特别指定,在windowed assign时,只需指定系统自带的timeWindow即可:

timeWindow(Time.seconds(15))

在windowedStream之后,需要进行聚合操作,产生新的DataStream。系统提供了sum、reduce、fold等操作,但是如果遇到窗口内的计算非常复杂的情况,则需要采用apply{…}方法。windowedStream.apply{}的方法可参考源码:org.apache.flink.streaming.api.scala.WindowedStream.scala

提供了6种不同的方法,详情见:

WindowedStream.scala

这个测试就是在windowedStream上调用了apply方法,实现了稍微复杂的运算:

.apply{ (k : Long, w : TimeWindow, T: Iterable[(Long, Long, Long)], out : Collector[(Long,String,String,Double)]) =>
      var sumVolume : Long = 0
      var sumTurnover : Long = 0
      for(elem <- T){
        sumVolume = sumVolume + elem._2
        sumTurnover = sumTurnover + elem._3
      }
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      val vwap : Double = BigDecimal(String.valueOf(sumTurnover))
        ./ (BigDecimal(String.valueOf(sumVolume)))
        .setScale(2,BigDecimal.RoundingMode.HALF_UP)
        .toDouble

      out.collect((k,format.format(w.getStart),format.format(w.getEnd),vwap))
    }

运行程序,测试结果:

输入(由于是Processing Time,输入时要注意时间间隔,超过15秒的就会产生新窗口,我的操作是前2条数据同时输入,隔一段时间后输入第3条数据):

600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085
......

结果如下:

(60000,2016-06-16 17:56:00.000,2016-06-16 17:56:15.000,3.0)
(60000,2016-06-16 17:58:15.000,2016-06-16 17:58:30.000,4.0)

可以看到,这个结果跟事件的时间没有任何关系,只跟系统处理完成的时间有关。

完整的代码如下:

import java.text.SimpleDateFormat

import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**
  * 这个Flink DataStream程序,实现“每15秒的加权平均价--VWAP”
  * source:通过SocketStream模拟kafka消费数据
  * sink:直接print输出到local,以后要实现sink到HDFS以及写到Redis
  * 技术点:
  *        1、采用默认的Processing Time统计每15秒钟的加权平均价
  *        2、采用TumblingProcessingTimeWindow作为窗口,即翻滚窗口,系统时钟,不重叠的范围内实现统计
  *        3、在WindowedStream上实现自定义的apply算法,即加权平均价,而非简单的Aggregation
  */

object TransactionVWap {
  case class Transaction(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,
                         nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,
                         nAskOrder:Long, nBidOrder:Long, localTime:Long
                        )

  def main(args: Array[String]): Unit = {

    /**
      * when Running the program, you should input 2 parameters: hostname and port of Socket
      */
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt

    /**
      * Step 1. Obtain an execution environment for DataStream operation
      * set EventTime instead of Processing Time
      */
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //Processing time is also the Default TimeCharacteristic
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    /**
      * Step 2. Create DataStream from socket
      */
    val input = env.socketTextStream(hostName,port)

    /**
      * Step 3. Implement ‘每15秒加权平均价-VWAP‘ logic
      * Note: windowedStream contains 3 attributes: T=>elements, K=>key, W=>window
      */

    val sumVolumePerMinute = input
      //transform Transaction to tuple(szCode, volume, turnover)
      .map(new VwapField)
      //partition by szCode
      .keyBy(_._1)
      //building Tumbling window for 15 seconds
      .timeWindow(Time.seconds(15))
      //compute VWAP in window
      .apply{ (k : Long, w : TimeWindow, T: Iterable[(Long, Long, Long)], out : Collector[(Long,String,String,Double)]) =>
      var sumVolume : Long = 0
      var sumTurnover : Long = 0
      for(elem <- T){
        sumVolume = sumVolume + elem._2
        sumTurnover = sumTurnover + elem._3
      }
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      val vwap : Double = BigDecimal(String.valueOf(sumTurnover))
        ./ (BigDecimal(String.valueOf(sumVolume)))
        .setScale(2,BigDecimal.RoundingMode.HALF_UP)
        .toDouble

      out.collect((k,format.format(w.getStart),format.format(w.getEnd),vwap))
    }
      .name("VWAP per 15 seconds")

    /**
      * Step 4. Sink the final result to standard output(.out file)
      */
    sumVolumePerMinute.print()

    /**
      * Step 5. program execution
      */

    env.execute("SocketTextStream for sum of volume Example")

  }

  class VwapField extends MapFunction[String, (Long,  Long, Long)] {

    def map(s: String): (Long, Long, Long) = {

      val columns = s.split(",")

      val transaction : Transaction = Transaction(columns(0),columns(1).toLong,columns(2),columns(3),columns(4).toLong,columns(5).toLong,
        columns(6).toLong,columns(7).toLong,columns(8).toLong,columns(9).toInt,columns(9),columns(10),columns(11).toLong,
        columns(12).toLong,columns(13).toLong)

      val volume : Long = transaction.nVolume
      val szCode : Long = transaction.szCode
      val turnover : Long = transaction.nTurnover

      (szCode, volume, turnover)
    }

  }

}

七、总结

何时用Event Time,何时用Processing Time,这个要看具体的业务场景。

同时,对于Event Time中的Late Element,大家可以自己模拟输入,看看结果如何。

自定义function与operator都应该是有状态的,以便恢复,这里简化,并没有设置state。

参考文档

1.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101

2.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

3.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html

4.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html

5.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html

6.http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-9/

7.http://www.cnblogs.com/fxjwind/p/5434572.html

8.https://github.com/apache/flink/

9.http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/

10.http://data-artisans.com/blog/

11.http://dataartisans.github.io/flink-training/

时间: 2024-10-26 01:51:08

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime的相关文章

Flink流计算编程--在双流中体会joinedStream与coGroupedStream

一.joinedStream与coGroupedStream简介 在实际的流计算中,我们经常会遇到多个流进行join的情况,Flink提供了2个Transformations来实现. 如下图: 注意:Join(Cogroups) two data streams on a given key and a common window.这里很明确了,我们要在2个DataStream中指定连接的key以及window下来运算. 二.SQL比较 我们最熟悉的SQL语言中,如果想要实现2个表join,可以

flink流计算随笔(3)

Stateful Computations over Data Streams(在数据流的有状态计算)Apache Flink是一个用于分布式流和批处理数据的开源平台.Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分布.通信和容错能力.Flink在流引擎之上构建批处理,覆盖本地迭代支持.托管内存和程序优化.通常在程序中的转换和数据流中的操作符之间存在一对一的对应关系.然而,有时一个转换可能包含多个转换操作符. 在串流连接器和批处理连接器文档中记录了源和汇(Sources a

Flink流计算随笔(1)

相比 Spark Stream.Kafka Stream.Storm 等,为什么阿里会选择 Flink 作为新一代流式计算引擎?前期经过了哪些调研和对比? 大沙:我们是 2015 年开始调研新一代流计算引擎的.我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算的引擎.Spark streaming 的本质还是一款基于 microbatch 计算的引擎.这种引擎一个天生的缺点就是每个 microbatch 的调度开销比较大,当我们要求越低的延迟

flink流计算随笔(4)

Flink中的程序本质上是并行的和分布式的.在执行期间,流有一个或多个流分区,每个操作符有一个或多个操作符子任务.操作符子任务相互独立,在不同的线程中执行,可能在不同的机器或容器上执行. 运算符子任务的数量是特定运算符的并行度.一个流的并行性总是它的生产操作符的并行性.同一程序的不同运算符可能具有不同级别的并行性. 流可以在两个操作符之间以一对一(或转发)模式传输数据,也可以在重分发模式中传输数据: 一对一One-to-one流(例如上图中源和map()运算符之间的流)保持元素的分区和顺序.这意

flink流计算随笔(6)

?生成,编译模板工程 MacBook-Air:SocketWindowWordCount myhaspl$ bash <(curl https://flink.apache.org/q/sbt-quickstart.sh) % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 11510 100 11510 0 0 4499 0 0:00:02

flink流计算随笔(2)

MACOS下安装flink: $ brew install apache-flink ... $ flink --version $brew upgrade MACOS下启动flink: $cd /usr/local/Cellar/apache-flink/1.6.0 $./libexec/bin/start-cluster.sh /* * Licensed to the Apache Software Foundation (ASF) under one * or more contribut

流计算及在特来电监控引擎中的实践

随着云计算的深入落地,大数据技术有了坚实的底层支撑,不断向前发展并日趋成熟,无论是传统企业还是互联网公司,都不再满足于离线批处理计算,而是更倾向于应用实时流计算,要想在残酷的企业竞争中立于不败之地,企业数据必须被快速处理并输出结果,流计算无疑将是企业Must Have的大杀器.作为充电生态网的领军企业,特来电在流计算方面很早便开始布局,下面笔者抛砖引玉的谈一下流计算及在特来电监控引擎中的应用实践. 一.由Bit说开去 作为计算机信息中的最小单位,Bit就像工蚁一样忙碌,任一时刻都只能处于以下三种

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP