Flink kafka producer with transaction support

Background

In flink 1.5 above, it provides a new kafka producer implementation: FlinkKafkaProducer011, aligning with kafka 0.11 above that supports transaction. Kafka transaction allows multiple kafka messages sent by producer to deliver in an atomic way --> either all success or all fail. The messages can belong to different partitions. Before flink 1.5, it provides exact once semantics only for its internal state via check point. However, if you write stream state to external storage such as kafka or database, there is no guarantee. In flink 1.5, an important new class TwoPhaseCommitSinkFunction is introduced to support extactly once semantic for externa storage. FlinkKafkaProducer011 leverage this feature, so before we look at FlinkKafkaProducer011, we will briefly go through what TwoPhaseCommitSinkFunction does.

TwoPhaseCommitSinkFunction

As its name suggests, TwoPhaseCommitSinkFunction provides a template to implement two phase commit protocol so that writing to external storage and flink checkpoint reaches consensus on success/failure. It provides the following abstract methods:

  • beginTransaction
  • preCommit
  • commit
  • abort
  • invoke(TXN transaction, IN value, Context context)

The sequence diagram below shows how these methods are invoked (highlighted in red)

In a typical two phase commit protocol for database, the precommit operation usually involves writing db operations to a WAL file (write ahead log). Once the db operation is persisted, even if server crashed afterwards, it can always recover db operations from log and apply to database, so commit can always success.

Internally, TwoPhaseCommitSinkFunction keeps pendingTransactions : an orderedMap (LinkedHashMap) that records current pending transactions that not yet committed. Key is the checkpoint id that transaction occurs, value is the transaction holder object that contains transaction start time and transaction information. Normally the map contains only one transaction state entry corresponding to the current checkpoint. Under rare situation, it can contain transaction state entry from previous checkpoint if previous checkpoint failed or acknowledgement is delayed.

On snapshot state (snapshotState):
Current transaction holder is put into pendingTransactions. Precommit is called to perform the prepare step in two phase commit. A new transaction is started and internal state is updated to reflect the latest current transaction holder and pendingTransactions.

On checkpoint notification (notifyCheckpointComplete):
All pending transactions in pendingTransactions that have id less than or equals the notified checkpoint id will be committed. Committed transaction is removed from pendingTransactions.

Checkpoint recovery (initializeState)
RecoverAndCommit method will be called on each of the pendingTransactions. While for the current transaction in state (precommit method not yet called), recoverAndAbort method is called

To summarize:
Transaction scope aligns with the checkpoint frequency, indicating that all records between two commits will belong to a single transaction.
Prepare (precommit) step must succeed for snapshot to complete.

Kafka transaction api

KafkaProducer.initTransactions()
KafkaProducer.beginTransaction()
KafkaProducer.sendOffsetsToTransaction(offsets, consumerGroupId)
KafkaProducer.commitTransaction()
KafkaProducer.abortTransaction()

Besides a special property "transactional.id" needs to be assigned to ProducerConfig. This raises an important implication that there can be only one active transaction per producer at any time.

initTransactions method: Ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance had failed with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion

Also: initTransactions method assigns an internal producer id and epoch number. Each producer restart will generate a new epoch number, thus allow kafka transaction manager to detect and reject transaction request that was sent before producer restart.

sendOffsetsToTransaction method: used when read from one kafka topic and send data to an other topic (Eg, kafka streaming). so that consumer offset shifts only if producer successfully send data (exactly once)

Combine both

FlinkKafkaProducer011 supports 3 modes: exactly once, at least once or none. Under exactly once mode, a new KafkaProducer will be created on each call to beginTransaction whereas under the other two modes, existing KafkaProducer from current state will be reused. Another important difference is that kafka transaction is only enabled for exactly once mode. As discussed previously, beginTransaction method can be called from initializeState method or snapshotState method. A new KafkaProducer with a different transaction id ensures that producer records sent between two checkpoints belong to different transactions, so that each one can be committed or rolled back separately.

The difference between at least once or none mode is the precommit behavior. for exactly once and at least once mode, kafkaProducer is flushed in precommit to make sure that records in producer buffer are sent to broker before checkpoint snapshot completes. Otherwise, if flink crashes after the current checkpoint, unsent records in producer buffer are lost.

Transaction state object: KafkaTransactionState contains a transient flink kafka producer. transaction id, producer id and epoch. The kafka producer reference act as a shortcut to retrieve kafka producer from current transactionHolder in various lifecycle stage. On state recovery from checkpoint, kafka producer in transaction state will be null and need to be re-created. To commit, transaction id, producer id and epoch for the new kafka producer need to match the previous kafka producer before crash. Otherwise, broker cannot associate commit request with the producer for previous transaction and reject it. See FlinkKafkaProducer.resumeTransaction method on how transaction state, producer id and epoch is set. To abort, only transaction id matters, as there can be only one pending transaction per transaction id.

Another interesting aspect worth exploration is how the unique transaction id is generated. As flink kafka producer runs in parallel, generating a distributed, yet globally unique transaction id is a challenge.
The transaction id can be recycled after commit, so we do not need an unbounded sequence but a range of unique values.
The id range generated will be from startIndex + subtaskIndex * poolSize to startIndex + (subtaskIndex + 1) * poolSize - 1
Start index is stored in NextTransactionalIdHint in flink list state object. Each time a new transaction id range is requested, it will be incremented byNumberOfParallelSubtasks * kafkaProducersPoolSize;
But why not just use java uuid? I think there are two reasons:
Java uuid is purely random, where a monotonically increasing transaction id sequence is more desirable for tracking and monitoring purpose.
If flink crashes before the first checkpoint complete, there is no information about transaction id in state. In this case, system has to guess possible transaction ids within a range and try to abort each of them. With purely random uuid, even guess is not possible.

Other thoughts

Currently, only transaction id is pooled for reuse. for exactly once mode, a new kafka producer is created on each checkpoint snapshot. Depending on the checkpoint interval, this might be an overhead. Why not pool kafka producer directly instead?

原文地址:http://blog.51cto.com/shadowisper/2294047

时间: 2024-10-20 10:32:21

Flink kafka producer with transaction support的相关文章

Flink+kafka实现Wordcount实时计算

1. Flink Flink介绍: Flink 是一个针对流数据和批数据的分布式处理引擎.它主要是由 Java 代码实现.目前主要还是依靠开源社区的贡献而发展.对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已.再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点.Flink 可以支持本地的快速迭代,以及一些环形的迭代任务. Flink的特性: Flink是个分布式流处理开源框架: 1>. 即使数据源是无序的或者晚到达的数据,也能保持结果准确

关于高并发下kafka producer send异步发送耗时问题的分析

最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍.所以就对kafka API send 的源码进行了一下跟踪和分析,在此总结记录一下. 首先看springboot下 kafka producer 的使用 在config中进行配置,向IOC容器中注入DefaultKa

kafka producer源码

producer接口: /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this

Kafka Producer相关代码分析

Kafka Producer相关代码分析 标签(空格分隔): kafka Kafka Producer将用户的消息发送到Kafka集群(准确讲是发送到Broker).本文将分析Producer相关的代码实现. 类kafka.producer.Producer 如果你自己实现Kafka客户端来发送消息的话,你就是用到这个类提供的接口来发送消息.(如果你对如何利用Producer API来发送消息还不是很熟悉的话,可以参看官方的例子).这个类提供了同步和异步两种方式来发送消息. 异步发送消息是基于同

kafka producer生产数据到kafka异常:Got error produce response with correlation id 16 on topic-partition...Error: NETWORK_EXCEPTION

kafka producer生产数据到kafka异常:Got error produce response with correlation id 16 on topic-partition...Error: NETWORK_EXCEPTION 1.问题描述 2017-09-13 15:11:30.656 o.a.k.c.p.i.Sender [WARN] Got error produce response with correlation id 25 on topic-partition t

kafka producer consumer

package demo; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class producer { private final Producer<String, String> producer; public final static

Entity Framework 6.0 Tutorials(6):Transaction support

Transaction support: Entity Framework by default wraps Insert, Update or Delete operation in a transaction, whenever you execute SaveChanges(). EF starts a new transaction for each operation and completes the transaction when the operation finishes.

【转】Kafka producer原理 (Scala版同步producer)

转载自:http://www.cnblogs.com/huxi2b/p/4583249.html     供参考 本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性produc

【原创】Kafka producer原理 (Scala版同步producer)

本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性producer.type指定,默认是sync,即同步发送模式.本文也主要关注于同步发送的代码走读.下面以console-pr