深入了解Kafka【四】消费者的Offset管理


1、Offset Topic

Consumer通过提交Offset来记录当前消费的最后位置,以便于消费者发生崩溃或者有新的消费者加入消费者组,而引发的分区再均衡操作,每个消费者可能会分到不同的分区。我测试的kafka版本是:0.11.0.2,消费者往一个特殊的主题“_consumer_offset”发送消息,如图:

消息的内容包括:

fields content
Key Consumer Group, topic, partition
Payload Offset, metadata, timestamp

提交到“_consumer_offset”主题的消息会根据消费组的key进行分区,一个消费组内的所有消息,都会发送到唯一的Partition。

2、Offset Commit

Offset的提交逻辑其实和普通的生产者往kafka发送数据是一样的。

2.1、Consumer

消费者启动时会为“_consumer_offset”主题创建一个内置的生产者,用于Offset数据的提交。

2.2、Broker

就是将Offset提交当成是正常的生产请求,逻辑不变。

“_consumer_offset”主题会在集群中的第一个Offset提交请求时被自动创建。

3、Offset的提交方式

Offset提交时会有两个问题:重复消费和漏消费。

  • 当提交的Offset小于客户端处理的最后一条消息的Offset,会造成重复消费。
    情景:先消费,后提交Offset,如果消费成功、提交失败,消费者下次获取的Offset还是以前的,所以会造成重复消费。
  • 当提交的Offset大于客户端处理的最后一条消息的Offset,会造成漏消费。
    情景:先提交Offset,后消费,如果提交成功、消费失败,消费者下次获取的Offset已经是新的,所以会造成漏消费。

根据具体的业务情况,选择合适的提交方式,可以有效的解决掉重复消费和漏消费的问题。

3.1、自动提交

自动提交是最简单的提交方式,通过设置参数,可以开启自动提交也可以设置提交的时间间隔。缺点就是,当消费了一些数据后,还未达到自动的提交时间,这个时候,有新的消费者加入,或者当前消费者挂掉,会出现分区再均衡操作,之后消费者重新在上一次提交的Offset开始消费,造成重复消费。虽然可以缩短自动提交间隔,但是还是无法解决这个问题。

3.2、同步提交当前Offset

关闭手动提交,可以通过同步提交接口来提交当前的Offset,虽然可以获取主动性,但是也牺牲了吞吐量,因为同步提交必然是阻塞的,而且会有重试机制。

3.3、异步提交当前Offset

使用异步提交方式,既有主动性,也可以增加kafka消费的吞吐量,没有重试机制,也解决不掉重复消费的问题。

3.4、同步和异步组合提交

正常使用的时候使用异步提交,速度快。当要关闭消费者的时候,使用同步提交,即使失败了也会一直重试,直到提交成功或者发生无法恢复的错误。不管是同步提交还是异步提交都避免不了重复消费和漏消费的问题。

3.5、提交指定的Offset

因为自动提交、同步提交与异步提交都是将最后一个Offset提交上去。通过提交指定的Offset,可以减轻重复消费和漏消费的问题,但是相应的消费端就需要复杂的业务处理,而且需要自己维护Offset。

原文地址:https://www.cnblogs.com/clawhub/p/12008158.html

时间: 2024-08-02 08:09:43

深入了解Kafka【四】消费者的Offset管理的相关文章

java实现Kafka的消费者示例

使用java实现Kafka的消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 8

【OC语法快览】四、基础内存管理

Basic Memory Management                                                           基础内存管理 If you're writing an application for Mac OS X, you have the option to enable garbage collection. In general, this means that you don't have to think about memory

Testlink1.9.17使用方法( 第四章 测试需求管理 )

第四章 测试需求管理 QQ交流群:585499566 需求规格说明书是我们开展测试的依据.首先,我们可以对项目(产品)的需求规格说明书进行分解和整理,将其拆分为多个需求,一个项目可以包含多个需求,一个需求可以包含多个测试需求. 创建产品需求规格 创建测试需求 一. 创建产品需求规格 单击主页上面的"产品需求"区域->点击[产品需求规格]按钮-->点击[新建产品需求规格] 对"产品需求规格"的描述比较简单,内容包含文档ID.标题.范围,类型.如下图所示:

Git工程开发实践(四)——Git分支管理策略

Git工程开发实践(四)--Git分支管理策略 一.Git版本管理的挑战 Git是非常优秀的版本管理工具,但面对版本管理依然有非常大得挑战.工程开发中,开发者彼此的代码协作必然带来很多问题和挑战:A.如何开始一个Feature开发,而不影响其它Feature?B.由于很容易创建新分支,分支多了如何管理,时间久了,如何知道每个分支是干什么的?C.哪些分支已经合并回了主干?D.如何进行Release的管理?开始一个Release的时候如何冻结Feature, 如何在Prepare Release的时

关于kafka更改消费者对应分组下的offset值

kafka的offset保存位置分为两种情况 0.9.0.0版本之前默认保存在zookeeper当中 0.9.0.0版本之后保存在broker对应的topic当中 1.如何辨别你启用的consumer的offset保存位置进入zookeeper的命令行当中 zkCli.sh localhost:2181 用 ls / 查看目录如果你在代码中定义的group id 没有在 /consumers 这个文件夹中,代表offset保存在broker的topic中前提是consumer确实已经创建并启动如

spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)

application-test.properties 1 #kafka 2 kafka.consumer.zookeeper.connect=*:2181 3 kafka.consumer.servers=*:9092 4 kafka.consumer.enable.auto.commit=true 5 kafka.consumer.session.timeout=6000 6 kafka.consumer.auto.commit.interval=1000 7 #保证每个组一个消费者消费同一

kafka消费端提交offset的方式

Kafka 提供了 3 种提交 offset 的方式 自动提交 复制 1234 // 自动提交,默认trueprops.put("enable.auto.commit", "true");// 设置自动每1s提交一次props.put("auto.commit.interval.ms", "1000"); 手动同步提交 offset 复制 1 consumer.commitSync(); 手动异步提交 offset 复制 1

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

kafka 生产者消费者 api接口

生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class KafkaProducer  extends Thread{ private String topic