Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印

原文:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_timestamps_watermarks.html

本节与基于事件时间运行的程序相关。

要处理事件时间,流式传输程序需要相应地设置时间特性。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

分配时间戳(Assigning Timestamps)

为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。 这通常通过从元素中的某个字段访问/提取时间戳来完成。

时间戳分配与生成水印密切相关,水印告诉系统事件时间的进展。

有两种方法可以分配时间戳并生成水印:

  • 直接在数据流源中
  • 通过时间戳分配器/水印生成器:在Flink中,时间戳分配器还定义要输出的水印。

注意:时间戳和水印都指定为自1970-01-01以来的毫秒。

带时间戳和水印的源函数(Source Functions with Timestamps and Watermarks)

流源可以直接为它们生成的元素分配时间戳,它们也可以输出水印。 完成此操作后,不需要时间戳分配器。 请注意,如果使用时间戳分配器,则将覆盖源提供的任何时间戳和水印。

要直接为源中的元素分配时间戳,源必须在SourceContext上使用collectWithTimestamp(...)方法。 要生成水印,源必须调用emitWatermark(Watermark)功能。

下面是一个简单的(非检查点)源代码示例,用于分配时间戳并生成水印:

@Override
public void run(SourceContext<MyType> ctx) throws Exception {
    while (/* condition */) {
         MyType next = getNext();
        ctx.collectWithTimestamp(next, next.getEventTimestamp());

        if (next.hasWatermarkTime()) {
                 ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
        }
    }
}

时间戳分配器/水印生成器(Timestamp Assigners / Watermark Generators)

时间戳分配器获取流并生成带有时间戳元素和水印的新流。 如果原始流已经有时间戳和/或水印,则时间戳分配器会覆盖它们。

时间戳分配器通常在数据源之后立即指定,但并非严格要求这样做。 例如,常见的模式是在时间戳分配器之前解析(MapFunction)和过滤(FilterFunction)。 在任何情况下,需要在事件时间的第一个操作之前指定时间戳分配器(例如第一个窗口操作)。 作为一种特殊情况,当使用Kafka作为流式传输作业的源时,Flink允许在源(或消费者)本身内指定时间戳分配器/水印发射器。 有关如何执行此操作的更多信息,请参阅 Kafka Connector documentation

注意:本节的其余部分介绍了程序员必须实现的主要接口,以便创建自己的时间戳提取器/水印发射器。 要查看Flink附带的预先实现的提取器,请参阅 Pre-defined Timestamp Extractors / Watermark Emitters

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...); 

定时触发的水印(With Periodic Watermarks)

AssignerWithPeriodicWatermarks定期分配时间戳并生成水印(可能取决于流元素,或纯粹基于处理时间)。

生成水印的间隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定义。 每次调用分配器的getCurrentWatermark()方法,如果返回的水印非空且大于前一个水印,则会发出新的水印。

这里我们展示了两个使用周期性水印生成的时间戳分配器的简单示例。 请注意,Flink附带了一个BoundedOutOfOrdernessTimestampExtractor,类似于下面显示的BoundedOutOfOrdernessGenerator,您可以在这里阅读。

/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
} 

基于某个事件触发的水印(With Punctuated Watermarks)

要在某个事件指示可能生成新水印时生成水印,请使用AssignerWithPunctuatedWatermarks。 对于这个类,Flink将首先调用extractTimestamp(...)方法为元素分配时间戳,然后立即调用该元素上的checkAndGetNextWatermark(...)方法。

checkAndGetNextWatermark(...)方法被传入的时间戳,是在extractTimestamp(...)方法中分配的,并可以决定是否要生成水印。 每当checkAndGetNextWatermark(...)方法返回非空水印,并且该水印大于最近的前一个水印时,将输出新水印。

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
} 

注意:可以在每个事件上生成水印。 然而,因为每个水印在下游引起一些计算,所以过多的水印会降低性能。

每个Kafka分区的时间戳(Timestamps per Kafka Partition)

当使用Apache Kafka作为数据源时,每个Kafka分区可能具有简单的事件时间模式(升序时间戳或有界无序)。 但是,当从Kafka消费流时,多个分区通常并行消费,交替来自多个分区的事件,破坏了每个分区模式(这是Kafka的消费者客户端工作的固有方式)。

在这种情况下,你可以使用Flink的Kafka-partition-aware水印生成。 使用该功能,根据Kafka分区在Kafka消费者内部生成水印,并且每个分区水印的合并方式与在流打乱上合并水印的方式相同。

例如,如果事件时间戳在每个分区内严格升序,则使用升序时间戳水印生成器生成每分区水印将产生完美的整体水印。

下图显示了如何使用per-Kafka-partition水印生成,以及在这种情况下水印如何通过数据流传播。

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);

原文地址:https://www.cnblogs.com/sxpujs/p/11385678.html

时间: 2024-07-29 11:05:58

Flink 1.8 Generating Timestamps, Watermarks 生成时间戳, 水印的相关文章

生成时间戳随机数

<!DOCTYPE html> <html lang="en"> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title></title> <script src="https://code.jquery.com/jquery-3.1.1.min.js&

php生成文字水印和图片水印

生成文字水印 <?php //文字水印 /*打开图片*/ //1.配置图片路径 $src = "4.jpg"; //2.获取图片的信息(得到图片的基本信息) $info = getimagesize($src ); //3.通过获取图片类型 $type = image_type_to_extension($info[2],false); //4.在内存中创建一个图片类型一样的图像 $fun = "imagecreatefrom{$type}"; //5.图片复

php 与 java 生成时间戳却别

最近服务器有java却换到php环境,生成的时间戳转换成时间格式的出现异常,查询资料得知: PHP 的 time() 函数返回的结果是 Unix 时间戳,值的单位是秒:如:1463564861 Java 中 System.currentTimeMillis()  返回的结果,值的单位是毫秒.如:1463579759591 所以java中拿到服务端php返回的时间戳得乘以1000,才能正确转换成对应的时间.

C# 生成时间戳

编写网络程序中难免用到一些时间戳. 早前不知道哪里复制过一个代码,如下: public static string GetTimeStamp() { TimeSpan ts = DateTime.UtcNow - new DateTime(1970, 1, 1, 0, 0, 0, 0); return Convert.ToInt64(ts.TotalSeconds).ToString(); }   不过事后发现,生成出来的与实际情况不符,有些出入. 上面代码生成的时间戳只有10位.而要模拟的目标

(转)C#生成时间戳

/// <summary> /// 获取当前时间戳 /// </summary> /// <param name="bflag">为真时获取10位时间戳,为假时获取13位时间戳.</param> /// <returns></returns> public static string GetTimeStamp(bool bflag = true) { TimeSpan ts = DateTime.UtcNow -

powershell生成时间戳13和10位

定义: 时间戳是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数. 通俗的讲, 时间戳是一份能够表示一份数据在一个特定时间点已经存在的完整的可验证的数据.  它的提出主要是为用户提供一份电子证据, 以证明用户的某些数据的产生时间.  ---摘自百度百科 Ticks是一个周期,存储的是一百纳秒,换算为秒,一千万分之一秒. 实现: (([DateTime]::Now.ToUniversalTime().Ticks - 6213

Aspose.Cell 生成带水印的excel文件

1 private void ExportDataSet(string fileName, string templatePath, DataSet ds, HttpResponse reponse, FileFormatType FileType= FileFormatType.Xlsx) 2 { 3 Aspose.Cells.License Clicense = new Aspose.Cells.License(); 4 string asposePath = Server.MapPath(

php图片的应用-生成带有水印文字的图片-生成带有水印图标的图片-生成缩略图(//为注释内容,不影响文件执行)

<?php ////////////////定义水印文字函数开始////////////////// function watertext($i,$t='版权所有',$s=20,$c='white',$p=9){ $img = imagecreatefromjpeg($i);   //imagecreatefromjpeg - 由文件或 URL 创建一个新图象. $cc = ''; ////////switch循环,循环水印文字的颜色 开始////// switch($c){ case 'whi

增加录像时间戳水印、 camera框架介绍

http://blog.csdn.net/mirkerson/article/details/38920107 http://blog.csdn.net/jimbo_lee/article/details/27545193 http://blog.csdn.net/andensy/article/details/50320301