在生产中需要将一些数据发到kafka,而且需要做到EXACTLY_ONCE,kafka使用的版本为1.1.0,flink的版本为1.8.0,但是会很经常因为提交事务引起错误,甚至导致任务重启
kafka producer的配置如下
def getKafkaProducer(kafkaAddr: String, targetTopicName: String, kafkaProducersPoolSize: Int): FlinkKafkaProducer[String] = { val properties = new Properties() properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr) properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 6000 * 6 + "") // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试: properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5") properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(1048576 * 5)) val serial = new KeyedSerializationSchemaWrapper(new SimpleStringSchema()) //val producer = new FlinkKafkaProducer011[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), Semantic.EXACTLY_ONCE, kafkaProducersPoolSize) val producer = new FlinkKafkaProducer[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize) producer.setWriteTimestampToKafka(true) producer }
Flink env如下
val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60 * 1000 * 1, CheckpointingMode.EXACTLY_ONCE) val config = env.getCheckpointConfig //RETAIN_ON_CANCELLATION在job canceled的时候会保留externalized checkpoint state config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1 config.setMinPauseBetweenCheckpoints(3000) //用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用) config.setMaxConcurrentCheckpoints(1) //指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉 config.setCheckpointTimeout(30000) //用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行 //https://issues.apache.org/jira/browse/FLINK-11662 config.setFailOnCheckpointingErrors(false)
然后经常会出现事务失效的问题,报错有很多种,大概为以下
java.lang.RuntimeException: Error while confirming checkpoint at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684) at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213) ... 5 more Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:619) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760) at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:105) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:650) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
Checkpoint failed: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker. Checkpoint failed: Could not complete snapshot 11 for operator Sink: data_Sink (2/2).
这些错误基本涉及到两阶段提交、事务、checkpoint。
查看kafka documentation和研究ProducerConfig这个类后发现 kafka producer 在使用EXACTLY_ONCE的时候需要增加一些配置
the transaction timeout must be larger than the checkpoint interval, but smaller than the broker transaction.max.timeout.ms.
在getKafkaProducer增加以下配置后,不再出现原来的错误
//checkpoint 间隔时间<TRANSACTION_TIMEOUT_CONFIG<kafka transaction.max.timeout.ms (默认900秒) properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 3 + "") properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1") properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
至此,问题解决。
参考:
https://www.cnblogs.com/felixzh/p/10184762.html
https://www.cnblogs.com/wangzhuxing/p/10111831.html
http://www.heartthinkdo.com/?p=2040
http://romanmarkunas.com/web/blog/kafka-transactions-in-practice-1-producer/
原文地址:https://www.cnblogs.com/createweb/p/11971846.html
时间: 2024-10-10 10:19:11