Flink监控:Monitoring Apache Flink Applications

This post originally appeared on the Apache Flink blog. It was reproduced here under the Apache License, Version 2.0.



This blog post provides an introduction to Apache Flink’s built-in monitoring and metrics system, that allows developers to effectively monitor their Flink jobs. Oftentimes, the task of picking the relevant metrics to monitor a Flink application can be overwhelming for a DevOps team that is just starting with stream processing and Apache Flink. Having worked with many organizations that deploy Flink at scale, I would like to share my experience and some best practice with the community.

With business-critical applications running on Apache Flink, performance monitoring becomes an increasingly important part of a successful production deployment. It ensures that any degradation or downtime is immediately identified and resolved as quickly as possible.

Monitoring goes hand-in-hand with observability, which is a prerequisite for troubleshooting and performance tuning. Nowadays, with the complexity of modern enterprise applications and the speed of delivery increasing, an engineering team must understand and have a complete overview of its applications’ status at any given point in time.

Flink’s Metrics System

The foundation for monitoring Flink jobs is its metrics system which consists of two components; Metrics and MetricsReporters.

Metrics

Flink comes with a comprehensive set of built-in metrics such as:

  • Used JVM Heap / NonHeap / Direct Memory (per Task-/JobManager)
  • Number of Job Restarts (per Job)
  • Number of Records Per Second (per Operator)

These metrics have different scopes and measure more general (e.g. JVM or operating system) as well as Flink-specific aspects.

As a user, you can and should add application-specific metrics to your functions. Typically these include counters for the number of invalid records or the number of records temporarily buffered in managed state. Besides counters, Flink offers additional metrics types like gauges and histograms. For instructions on how to register your own metrics with Flink’s metrics system please check out Flink’s documentation. In this blog post, we will focus on how to get the most out of Flink’s built-in metrics.

MetricsReporters

All metrics can be queried via Flink’s REST API. However, users can configure MetricsReporters to send the metrics to external systems. Apache Flink provides reporters to the most common monitoring tools out-of-the-box including JMX, Prometheus, Datadog, Graphite and InfluxDB. For information about how to configure a reporter check out Flink’s MetricsReporter documentation.

In the remaining part of this blog post, we will go over some of the most important metrics to monitor your Apache Flink application.

Monitoring General Health

The first thing you want to monitor is whether your job is actually in a RUNNING state. In addition, it pays off to monitor the number of restarts and the time since the last restart.

Generally speaking, successful checkpointing is a strong indicator of the general health of your application. For each checkpoint, checkpoint barriers need to flow through the whole topology of your Flink job and events and barriers cannot overtake each other. Therefore, a successful checkpoint shows that no channel is fully congested.

Key Metrics


  Metric


 Scope


   Description


uptime


job


The time that the job has been running without     interruption.


fullRestarts


job


The total number of full restarts since this job was   submitted.


numberOfCompletedCheckpoints


job


The number of successfully completed checkpoints.


numberOfFailedCheckpoints


job


The number of failed checkpoints.

Example Dashboard Panels

Figure 1: Uptime (35 minutes), Restarting Time (3 milliseconds) and Number of Full Restarts (7)

Figure 2: Completed Checkpoints (18336), Failed (14)

Possible Alerts

  • ΔfullRestarts > threshold
  • ΔnumberOfFailedCheckpoints > threshold

Monitoring Progress & Throughput

Knowing that your application is RUNNING and checkpointing is working fine is good, but it does not tell you whether the application is actually making progress and keeping up with the upstream systems.

Throughput

Flink provides multiple metrics to measure the throughput of your application. For each operator or task (remember: a task can contain multiple chained tasks Flink counts the number of records and bytes going in and out. Out of those metrics, the rate of outgoing records per operator is often the most intuitive and easiest to reason about.

Key Metrics


  Metric


  Scope


   Description


numRecordsOutPerSecond


task


The number of records this operator/task sends per second.


numRecordsOutPerSecond


operator


The number of records this operator sends per second.

Example Dashboard Panels

Figure 3: Mean Records Out per Second per Operator

Possible Alerts

  • recordsOutPerSecond = 0 (for a non-Sink operator)

Note: Source operators always have zero incoming records and sink operators always have zero outgoing records because the metrics only count Flink-internal communication. There is Jira ticketto change this behavior.

Progress

For applications, that use event time semantics, it is important that watermarks progress over time. A watermark of time t tells the framework, that it should not anymore expect to receive  events with a timestamp earlier than t, and in turn, to trigger all operations that were scheduled for a timestamp < t. For example, an event time window that ends at t = 30 will be closed and evaluated once the watermark passes 30.

As a consequence, you should monitor the watermark at event time-sensitive operators in your application, such as process functions and windows. If the difference between the current processing time and the watermark, known as even-time skew, is unusually high, then it typically implies one of two issues. First, it could mean that your are simply processing old events, for example during catch-up after a downtime or when your job is simply not able to keep up and events are queuing up. Second, it could mean a single upstream sub-task has not sent a watermark for a long time (for example because it did not receive any events to base the watermark on), which also prevents the watermark in downstream operators to progress. This JIRA ticket provides further information and a work around for the latter.

Key Metrics


   Metric


  Scope


   Description


currentOutputWatermark


operator


The last watermark this operator has emitted

Example Dashboard Panel

Figure 4: Event Time Lag per Subtask of a single operator in the topology. In this case, the watermark is lagging a few seconds behind for each subtask.

Possible Alerts

  • currentProcessingTime - currentOutputWatermark > threshold

“Keeping Up”

When consuming from a message queue, there is often a direct way to monitor if your application is keeping up. By using connector-specific metrics you can monitor how far behind the head of the message queue your current consumer group is. Flink forwards the underlying metrics from most sources.

Key Metrics


  Metric


 Scope  


  Description


records-lag-max


user


applies to FlinkKafkaConsumer

The maximum lag in terms of the number of records for any partition in  this window. An increasing value over time is your best indication that the consumer group is not keeping up with the producers.


millisBehindLatest


user


applies to FlinkKinesisConsumer

The number of milliseconds a consumer is behind the head of the stream. For any consumer and Kinesis shard, this indicates how far it is behind the current time.

Possible Alerts

  • records-lag-max > threshold
  • millisBehindLatest > threshold

Monitoring Latency

Generally speaking, latency is the delay between the creation of an event and the time at which results based on this event become visible. Once the event is created it is usually stored in a persistent message queue, before it is processed by Apache Flink, which then writes the results to a database or calls a downstream system. In such a pipeline, latency can be introduced at each stage and for various reasons including the following:

  1. It might take a varying amount of time until events are persisted in the message queue.
  2. During periods of high load or during recovery, events might spend some time in the message queue until they are processed by Flink (see previous section).
  3. Some operators in a streaming topology need to buffer events for some time (e.g. in a time window) for functional reasons.
  4. Each computation in your Flink topology (framework or user code), as well as each network shuffle, takes time and adds to latency.
  5. If the application emits through a transactional sink, the sink will only commit and publish transactions upon successful checkpoints of Flink, adding latency usually up to the checkpointing interval for each record.

In practice, it has proven invaluable to add timestamps to your events at multiple stages (at least at creation, persistence, ingestion by Flink, publication by Flink; possibly sampling those to save bandwidth). The differences between these timestamps can be exposed as a user-defined metric in your Flink topology to derive the latency distribution of each stage.

In the rest of this section, we will only consider latency, which is introduced inside the Flink topology and cannot be attributed to transactional sinks or events being buffered for functional reasons (4.).

To this end, Flink comes with a feature called Latency Tracking. When enabled, Flink will insert so-called latency markers periodically at all sources. For each sub-task, a latency distribution from each source to this operator will be reported. The granularity of these histograms can be further controlled by setting metrics.latency.granularity as desired.

Due to the potentially high number of histograms (in particular for metrics.latency.granularity: subtask), enabling latency tracking can significantly impact the performance of the cluster. It is recommended to only enable it to locate sources of latency during debugging.

Key Metrics


  Metric


  Scope    


   Description


latency


operator


The latency from the source operator to this operator.


restartingTime


job


The time it took to restart the job, or how long the current restart has   been in progress.

 

Example Dashboard Panel

Figure 5: Latency distribution between a source and a single sink subtask.

JVM Metrics

So far we have only looked at Flink-specific metrics. As long as latency & throughput of your application are in line with your expectations and it is checkpointing consistently, this is probably everything you need. On the other hand, if you job’s performance is starting to degrade among the first metrics you want to look at are memory consumption and CPU load of your Task- & JobManager JVMs.

Memory

Flink reports the usage of Heap, NonHeap, Direct & Mapped memory for JobManagers and TaskManagers.

  • Heap memory - as with most JVM applications - is the most volatile and important metric to watch. This is especially true when using Flink’s filesystem statebackend as it keeps all state objects on the JVM Heap. If the size of long-living objects on the Heap increases significantly, this can usually be attributed to the size of your application state (check the checkpointing metrics for an estimated size of the on-heap state). The possible reasons for growing state are very application-specific. Typically, an increasing number of keys, a large event-time skew between different input streams or simply missing state cleanup may cause growing state.
  • NonHeap memory is dominated by the metaspace, the size of which is unlimited by default and holds class metadata as well as static content. There is a JIRA Ticket to limit the size to 250 megabyte by default
  • The biggest driver of Direct memory is by far the number of Flink’s network buffers, which can be configured.
  • Mapped memory is usually close to zero as Flink does not use memory-mapped files.

In a containerized environment you should additionally monitor the overall memory consumption of the Job- and TaskManager containers to ensure they don’t exceed their resource limits. This is particularly important, when using the RocksDB statebackend, since RocksDB allocates a considerable amount of memory off heap. To understand how much memory RocksDB might use, you can checkout this blog post by Stefan Richter.

Key Metrics


  Metric


  Scope


   Description


Status.JVM.Memory.NonHeap.Committed


job-/

taskmanager


The amount of non-heap memory   guaranteed to be available to the JVM   (in bytes).


Status.JVM.Memory.Heap.Used


job-/

taskmanager


The amount of heap memory currently   used (in bytes).


Status.JVM.Memory.Heap.Committed


job-/

taskmanager


The amount of heap memory     guaranteed to be available to the JVM   (in bytes).


Status.JVM.Memory.Direct.MemoryUsed


job-/

taskmanager


The amount of memory used by the   JVM for the direct buffer pool (in bytes).


Status.JVM.Memory.Mapped.MemoryUsed


job-/

taskmanager


The amount of memory used by the   JVM for the mapped buffer pool (in   bytes).


Status.JVM.GarbageCollector.G1 Young   Generation.Time


job-/

taskmanager


The total time spent performing G1       Young Generation garbage collection.


Status.JVM.GarbageCollector.G1 Old   Generation.Time


job-/

taskmanager


The total time spent performing G1 Old   Generation garbage collection.

 

Example Dashboard Panel

Figure 6: TaskManager memory consumption and garbage collection times.

Figure 7: JobManager memory consumption and garbage collection times.

Possible Alerts

  • container memory limit < container memory + safety margin

 

CPU

Besides memory, you should also monitor the CPU load of the TaskManagers. If your TaskManagers are constantly under very high load, you might be able to improve the overall performance by decreasing the number of task slots per TaskManager (in case of a Standalone setup), by providing more resources to the TaskManager (in case of a containerized setup), or by providing more TaskManagers. In general, a system already running under very high load during normal operations, will need much more time to catch-up after recovering from a downtime. During this time you will see a much higher latency (event-time skew) than usual.

A sudden increase in the CPU load might also be attributed to high garbage collection pressure, which should be visible in the JVM memory metrics as well.

If one or a few TaskManagers are constantly under very high load, this can slow down the whole topology due to long checkpoint alignment times and increasing event-time skew. A common reason is skew in the partition key of the data, which can be mitigated by pre-aggregating before the shuffle or keying on a more evenly distributed key.

Key Metrics


   Metrics


   Scope


   Description


Status.JVM.CPU.Load


job-/

taskmanager


The recent CPU usage of the JVM.

 
Example Dashboard Panel

Figure 8: TaskManager & JobManager CPU load

System Resources

In addition to the JVM metrics above, it is also possible to use Flink’s metrics system to gather insights about system resources, i.e. memory, CPU & network-related metrics for the whole machine as opposed to the Flink processes alone. System resource monitoring is disabled by default and requires additional dependencies on the classpath. Please check out the Flink system resource metrics documentation for additional guidance and details. System resource monitoring in Flink can be very helpful in setups without existing host monitoring capabilities.

 

Conclusion

This post tries to shed some light on Flink’s metrics and monitoring system. You can utilise it as a starting point when you first think about how to successfully monitor your Flink application. I highly recommend to start monitoring your Flink application early on in the development phase. This way you will be able to improve your dashboards and alerts over time and, more importantly, observe the performance impact of the changes to your application throughout the development phase. By doing so, you can ask the right questions about the runtime behaviour of your application, and learn much more about Flink’s internals early on.

原文地址:https://www.cnblogs.com/felixzh/p/10502168.html

时间: 2024-08-01 12:06:58

Flink监控:Monitoring Apache Flink Applications的相关文章

为什么学习Apache Flink

Apache Flink 的简介 Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎.Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序.此外,Flink的运行时本身也支持迭代算法的执行. Flink :是一个数据处理框架.分布式数据处理引擎.有状态计算.支持有界数据计算与无界数据计算 Flink应用开发基础语义.多层API:数据流.数据集合.有状态.无状态.时间

Apache Flink

Flink 剖析 1.概述 在如今数据爆炸的时代,企业的数据量与日俱增,大数据产品层出不穷.今天给大家分享一款产品—— Apache Flink,目前,已是 Apache 顶级项目之一.那么,接下来,笔者为大家介绍Flink 的相关内容. 2.内容 2.1 What's Flink Apache Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(Flink Runtime),提供支持流处理和批处理两种类型应用的功能.现有的开源计算方案,会把流处

Apache Flink 1.3.0正式发布及其新功能介绍

下面文档是今天早上翻译的,因为要上班,时间比较仓促,有些部分没有翻译,请见谅. 2017年06月01日儿童节 Apache Flink 社区正式发布了 1.3.0 版本.此版本经历了四个月的开发,共解决了680个issues.Apache Flink 1.3.0 是 1.x.y 版本线上的第四个主要版本,其 API 和其他 1.x.y 使用 @Public 注释的API是兼容的. 此外,Apache Flink 社区目前制定了每四月发行一个主要版本(Apache Flink 1.2.0 是201

通过 GOOGLE 大数据计算平台演进理解 APACHE FLINK 前世今生

一.背景 2019年1月,伴随 APACHE FLINK 母公司 Data Artisans 被 收购 ,FLINK 毫无争议成为继 SPARK 之后的新一代大数据计算平台,本文希望通过 GOOGLE 计算平台演进来更好的理解 FLINK. 二.GOOGLE 大数据计算平台演进 GOOGLE 作为搜索引擎的顶级公司,需要处理海量数据,其大数据计算平台的演进是行业的风向标:本文通过 GOOGLE 在该领域发表的论文进行剖析,希望从中提取一些演进的主线. 2.1 分布式的三篇经典 2003年,[Th

Apache Flink 整体介绍

前言 Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告警这边需要从 kafka topic 里面实时读取到监控数据,并将读取到的监控数据做一些 聚合/转换/计算 等操作,然后将计算后的结果与告警规则的阈值进行比较,然后做出相应的告警措施(钉钉群.邮件.短信.电话等).画了个简单的图如下: 目前告警这块的架构是这样的结构,刚进公司那会的时候,架构是所有的监控数据直接存在 ElasticS

Apache Flink 零基础入门(一):基础概念解析

作者:陈守元.戴资力 一.Apache Flink 的定义.架构及原理 Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算. 1. Flink Application 了解 Flink 应用开发需要先理解 Flink 的 Streams.State.Time 等基础处理语义以及 Flink 兼顾灵活性和方便性的多层次 API. Streams:流,分为有限数据流与无限数据流,unbou

Apache Flink 是什么?

架构 Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算.Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算. 接下来,我们来介绍一下 Flink 架构中的重要方面. 处理无界和有界数据 任何类型的数据都可以形成一种事件流.信用卡交易.传感器测量.机器日志.网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流. 数据可以被作为 无界 或者 有界 流来处理. 无界流 有定义流的开始,但没有定义流的结束.它们会无休止地产生

基于grafana+prometheus构建Flink监控

先上一个架构图: Flink App : 通过report 将数据发出去 Pushgateway :  Prometheus 生态中一个重要工具 Prometheus :  一套开源的系统监控报警框架 (Prometheus 入门与实践) Grafana: 一个跨平台的开源的度量分析和可视化工具,可以通过将采集的数据查询然后可视化的展示,并及时通知(可视化工具Grafana:简介及安装) Node_exporter : 跟Pushgateway一样是Prometheus 的组件,采集到主机的运行

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,