Flink生产数据到Kafka频繁出现事务失效导致任务重启

在生产中需要将一些数据发到kafka,而且需要做到EXACTLY_ONCE,kafka使用的版本为1.1.0,flink的版本为1.8.0,但是会很经常因为提交事务引起错误,甚至导致任务重启

kafka producer的配置如下

  def getKafkaProducer(kafkaAddr: String, targetTopicName: String, kafkaProducersPoolSize: Int): FlinkKafkaProducer[String] = {
    val properties = new Properties()
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaAddr)
    properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 6000 * 6 + "")
    // 设置了retries参数,可以在Kafka的Partition发生leader切换时,Flink不重启,而是做5次尝试:
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, "5")
    properties.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(1048576 * 5))
    val serial = new KeyedSerializationSchemaWrapper(new SimpleStringSchema())
    //val producer = new FlinkKafkaProducer011[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
    val producer = new FlinkKafkaProducer[String](targetTopicName, serial, properties, Optional.of(new KafkaProducerPartitioner[String]()), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, kafkaProducersPoolSize)
    producer.setWriteTimestampToKafka(true)
    producer
  }

Flink env如下

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(60 * 1000 * 1, CheckpointingMode.EXACTLY_ONCE)
    val config = env.getCheckpointConfig
    //RETAIN_ON_CANCELLATION在job canceled的时候会保留externalized checkpoint state
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
    config.setMinPauseBetweenCheckpoints(3000)
    //用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
    config.setMaxConcurrentCheckpoints(1)
    //指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
    config.setCheckpointTimeout(30000)
    //用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行
    //https://issues.apache.org/jira/browse/FLINK-11662
    config.setFailOnCheckpointingErrors(false)

然后经常会出现事务失效的问题,报错有很多种,大概为以下

java.lang.RuntimeException: Error while confirming checkpoint
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1218)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException: Committing one of transactions failed, logging first encountered failure
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.notifyCheckpointComplete(TwoPhaseCommitSinkFunction.java:296)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:684)
    at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1213)
    ... 5 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1002)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:619)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
    at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278)
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:105)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:650)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.
Checkpoint failed: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer‘s transaction has been expired by the broker.

Checkpoint failed: Could not complete snapshot 11 for operator Sink: data_Sink (2/2).

这些错误基本涉及到两阶段提交、事务、checkpoint。

查看kafka documentation和研究ProducerConfig这个类后发现 kafka producer 在使用EXACTLY_ONCE的时候需要增加一些配置

the transaction timeout must be larger than the checkpoint interval, but smaller than the broker transaction.max.timeout.ms.

在getKafkaProducer增加以下配置后,不再出现原来的错误

    //checkpoint 间隔时间<TRANSACTION_TIMEOUT_CONFIG<kafka transaction.max.timeout.ms (默认900秒)
    properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 1000 * 60 * 3 + "")
    properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

至此,问题解决。

参考:

https://www.cnblogs.com/felixzh/p/10184762.html

https://www.cnblogs.com/wangzhuxing/p/10111831.html

http://www.heartthinkdo.com/?p=2040

http://romanmarkunas.com/web/blog/kafka-transactions-in-practice-1-producer/

原文地址:https://www.cnblogs.com/createweb/p/11971846.html

时间: 2024-07-29 08:50:09

Flink生产数据到Kafka频繁出现事务失效导致任务重启的相关文章

kafka没配置好,导致服务器重启之后,topic丢失,topic里面的消息也丢失

转,原文:https://blog.csdn.net/zfszhangyuan/article/details/53389916 ------------------------------------------------ 这个问题,在线上集群环境一般不容易出现,因为相关的日志文件参数都已经配置好了,而且经受住时间的的验证了. 作为新手,我在本地配置了一个单机kafka,用得是kafka自带的zookeeper服务. kafka安装很简单如下: 1).下载kafka: wget http:/

spring事务失效情况分析

详见:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt113 <!--[if !supportLists]-->一.<!--[endif]-->声明式事务和注解事务并存的情况下,事务失效. 该情况常见配置如下: <!--事务声明--> <bean name="transactionManager" class="org.springframework.jdbc.dat

spring声明式事务 同一类内方法调用事务失效

只要避开Spring目前的AOP实现上的限制,要么都声明要事务,要么分开成两个类,要么直接在方法里使用编程式事务 [问题] Spring的声明式事务,我想就不用多介绍了吧,一句话“自从用了Spring AOP啊,事务管理真轻松啊,真轻松:事务管理代码没有了,脑不酸了,手不痛了,一口气全配上了事务:轻量级,测试起来也简单,嘿!”.不管从哪个角度看,轻量级声明式事务都是一件解放生产力的大好事.所以,我们“一直用它”. 不过,最近的一个项目里,却碰到了一个事务管理上的问题:有一个服务类,其一个声明了事

kafka producer生产数据到kafka异常:Got error produce response with correlation id 16 on topic-partition...Error: NETWORK_EXCEPTION

kafka producer生产数据到kafka异常:Got error produce response with correlation id 16 on topic-partition...Error: NETWORK_EXCEPTION 1.问题描述 2017-09-13 15:11:30.656 o.a.k.c.p.i.Sender [WARN] Got error produce response with correlation id 25 on topic-partition t

Spring component-scan 的逻辑 、单例模式下多实例问题、事务失效

原创内容,转发请保留:http://www.cnblogs.com/iceJava/p/6930118.html,谢谢 之前遇到该问题,今天查看了下 spring 4.x 的代码 一,先理解下 context:component-scan 处理过程: 1 <!-- scan the package and the sub package --> 2 <!-- 3 [重要]:容易产生事务失效的地方,见:http://jinnianshilongnian.iteye.com/blog/176

Spring3.x事务失效的原因以及解决办法

项目中如果使用spring来管理事务,可能会出现事务失效的情况,我认为主要的原因是cglib无法获取到代代理的实例.. 如果带上事务,那么用annotation方式的事务注解和bean配置,事务会失效,要将service的bean配置到xml文件中才行,这样springmvc就不会扫描到@Service的类了 这个问题有另一种解决办法: 首先在主容器中(applicationContext.xml),将Controller的注解排除掉 <context:component-scan base-p

SpringMvc配置 导致实事务失效

SpringMVC回归MVC本质,简简单单的Restful式函数,没有任何基类之后,应该是传统Request-Response框架中最好用的了. Tips 1.事务失效的惨案 Spring MVC最打击新人的事情,你必须保证spring-mvc.xml的context:component-scan只扫描Controller,而 applicationContext.xml里的不包含Controller. 否则你定义在applicationContext.xml里的事务就要失效了.方法如下: sp

Spring父子上下文(WebApplicationContext)(防止事务失效)

如果你使用了listener监听器来加载配置,一般在Struts+Spring+Hibernate的项目中都是使用listener监听器的.如下 <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> Spring会创建一个WebApplicationContext上下文,称为父上下文(父容器)

Spring事务失效的原因

Spring事务失效的原因 5种大的原因 如使用mysql且引擎是MyISAM,则事务会不起作用,原因是MyISAM不支持事务,可以改成InnoDB 假如有兴趣了解 mysql中 " engine=innodb " 以及 " engine=innodb 和engine=myisam的区别 ",可以读读这篇文章:http://blog.sina.com.cn/s/blog_6ac4c6cb01018pb1.html 可使用下述语句之一检查表的标类型: SHOW TAB