Kafka 提供了 3 种提交 offset 的方式
- 自动提交
复制
1234 |
// 自动提交,默认trueprops.put("enable.auto.commit", "true");// 设置自动每1s提交一次props.put("auto.commit.interval.ms", "1000"); |
- 手动同步提交 offset
复制
1 |
consumer.commitSync(); |
- 手动异步提交 offset
复制
1 |
consumer.commitAsync(); |
上面说了既然异步提交 offset 可能会重复消费, 那么我使用同步提交是否就可以表明这个问题呢?
复制
1234567 |
while(true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); records.forEach(record -> { insertIntoDB(record); consumer.commitSync(); });} |
很明显不行, 因为 insertIntoDB 和 commitSync() 做不到原子操作, 如果 insertIntoDB() 成功了,但是提交 offset 的时候 consumer 挂掉了,然后服务器重启,仍然会导致重复消费问题。
如何做到不重复消费?
只要保证处理消息和提交 offset 得操作是原子操作,就可以做到不重复消费。我们可以自己管理 committed offset, 而不让 kafka 来进行管理。
比如如下使用方式:
- 如果消费的数据刚好需要存储在数据库,那么可以把 offset 也存在数据库,就可以就可以在一个事物中提交这两个结果,保证原子操作。
- 借助搜索引擎,把 offset 和数据一起放到索引里面,比如 Elasticsearch
每条记录都有自己的 offset, 所以如果要管理自己的 offset 还得要做下面事情
- 设置 enable.auto.commit=false
- 使用每个 ConsumerRecord 提供的 offset 来保存消费的位置。
- 在重新启动时使用 seek(TopicPartition, long) 恢复上次消费的位置。
通过上面的方式就可以在消费端实现”Exactly Once” 的语义, 即保证只消费一次。但是是否真的需要保证不重复消费呢?这个得看具体业务, 重复消费数据对整体有什么影响在来决定是否需要做到不重复消费。
原文地址:https://www.cnblogs.com/doit8791/p/11312979.html
时间: 2024-10-03 22:07:38