Kafka consumer消息的拉取及偏移的管理

消费者拉取消息并处理主要有4个步骤:

  • 获取消费者所拉取分区的偏移位置OffsetFetchRequest(新的消息是从偏移位置开始的)
  • 创建FetchReqeust,生成Map<Node, FetchRequest>,以消费者所拉取消息的节点为key来分组,所消费的TopicPartition的数据为value,并放入到unsent队列
  • 调用poll方法实际发送请求给相应的node,如果返回成功,在onSecuss方法中,消息被保存在completedFetches中
  • 从completedFetches中取出数据,转换成consumerRecord,清空缓冲区及更新消费偏移位置

偏移管理:更新拉取偏移,updateFetchPositions,发送OffsetFetchRequest请求

消费者在启动后,需要获取其所消费的分区的最后提交的偏移位置。消费者在消费完消息后需要提交消费偏移(committed offset),当发生再平衡(reblance)后,分区(partition)有可能被不同的消费者去拉取消息,那新的消费者需要知道上次是消费到哪个偏移位置的,那么新的消费者就需要发出请求给coordinator,以取得提交偏移(committed offset,前一个消费者最后的提交偏移)并更新本地的拉取偏移(fetch position)。消费者在提交偏移的时候,有2种策略可以选择,自动提交(auto commit)和手动提交(manually commit)

自动提交:

通常情况下,为了提高性能,会使用自动提交方式,自动提交的间隔(auto.commit.interval.ms)默认为5000毫秒,是通过延时队列的任务来实现的,在consumer每次拉取消息消费后,如果延时队列的auto commit task到了提交间隔时间,则提交任务更新committed offset,如果没有到延迟任务的timeout时间,则不执行延迟任务,继续拉取消息,但在实际消费处理消息后,提交偏移前,消费者有可能崩溃,这就导致存在重复消费

手动提交:

在某些场景下,为了能更准确的控制消费偏移,以保证消息不会重复消费或者不会丢失,由消费者客户端手动控制是否提交偏移

偏移与消费语义

重复消费(至少一次消费语义 at least once):

如果消费流程是:拉取消息,处理消息,提交消费偏移。每次poll拉取N条消息,并处理消息后,在消费者线程保存commited offset,然后执行延时任务,但有可能因为未到时间间隔,没有执行提交偏移的任务,如果这个时候,消费者崩溃,触发再平衡,导致刚才消费的offset没有被提交,那实际的commited offset要小于真正的消费偏移,当新的消费者从新获取的提交偏移出拉取消息后,就导致重复消费了,如下图所示

遗漏消费(至多一次消费语义 at most once):

如果消费流程是:拉取消息,提交消费偏移,处理消息。当提交消费偏移后,加入consumer崩溃触发再平衡,当新的消费者试图更新偏移信息时,将会导致遗漏消息,如下图所示:

消息的拉取及消费

消费者如果在上一次的消息拉取过程中有消息存在,则直接返回,否则从前面更新的拉取偏移位置处重新发送拉取消息的请求。

拉取消息的请求以消费者所消费的TopicPartition所在的节点分组Map<Node, FetchRequest>,然后再通过poll到相应的节点来获取分区消息,一旦成功获取掉消息,将被保存在completedFetches,在返回时转换为按TopicPartition分组的record

Map<TopicPartition, List<ConsumerRecord<K, V>>> drained

另外,会在本地记录所消费的最后一条消息的偏移+1,在下次消费时,进行偏移检查,判断第一条记录的offset必须与这个值相等,否则则忽略

 private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
                       PartitionRecords<K, V> partitionRecords,
                       int maxRecords) {
    ......
    List<ConsumerRecord<K, V>> partRecords = partitionRecords.take(maxRecords);
    long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; //下一条消费位置
    ......

    subscriptions.position(partitionRecords.partition, nextOffset); //记录消费的此TopicPartition的位置为nextOffset
    return partRecords.size();
    .......
  }

原文地址:https://www.cnblogs.com/benfly/p/9830784.html

时间: 2024-08-03 09:15:40

Kafka consumer消息的拉取及偏移的管理的相关文章

kafka 消费者拉取消息

本文只跟踪消费者拉取消息的流程.对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient. 入口在 KafkaConsumer#pollOnce 中,抽出主要步骤: // 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送 fetcher.sendFetches(); // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll client.poll(pol

RocketMQ 拉取消息-文件获取

看完了上一篇的<RocketMQ 拉取消息-通信模块>,请求进入PullMessageProcessor中,接着 PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中调用了: final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessag

解决 MySQL 比如我要拉取一个消息表中用户id为1的前10条最新数据

我们都知道,各种主流的社交应用或者阅读应用,基本都有列表类视图,并且都有滑到底部加载更多这一功能, 对应后端就是分页拉取数据.好处不言而喻,一般来说,这些数据项都是按时间倒序排列的,用户只关心最新的动态,而不关心几个月甚至几年前消息,所以后端返回给客户端的数据是不会一次性传递全部内容的(不仅耗费流量,而且还给服务器带来巨大压力). 举个例就说MySQL,它已经给我们提供了相应的语句来支持这一功能,那就是limit关键字.比如我要拉取一个消息表中用户id为1的前10条最新数据,SQL语句如下: s

消息拉取

疑问:PullRequest何时添加? PullMessageService提供延迟添加与立即添加2种方式 疑问:PullRequest是在什么时候创建的呢? 1.上上图中 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest);mq根据PullRequest拉取任务执行完一次消息拉取任务之后,又将PullRequest对象放入到pullRequestQueue,第二个是在Reba

Kafka的消息格式

Commit Log Kafka储存消息的文件被它叫做log,按照Kafka文档的说法是: Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log 这反应出来的Kafka的行为是:消息被不断地append到文件末尾,而且消息是不可变的. 这种行为源于Kafka想要实现的功能:高吞吐量,多副本,消息持久化.这种简单的log形式的文件结构能够更好

Kafka——分布式消息系统

Kafka——分布式消息系统 架构 Apache Kafka是2010年12月份开源的项目,采用scala语言编写,使用了多种效率优化机制,整体架构比较新颖(push/pull),更适合异构集群. 设计目标: (1) 数据在磁盘上的存取代价为O(1)(2) 高吞吐率,在普通的服务器上每秒也能处理几十万条消息(3) 分布式架构,能够对消息分区(4) 支持将数据并行的加载到hadoop Kafka实际上是一个消息发布订阅系统.producer向某个topic发布消息,而consumer订阅某个top

Kafka consumer在项目中的多线程处理方式

对于KafkaConsumer而言,它不像KafkaProducer,不是线程安全的,状态是在consumer中维护的,所以实现时要注意多线程的使用,一般有2种使用方法: 1:每个Consumer有自己的线程,consumer去拉取数据,并对数据处理,这种方式比较简单,易于实现,容易顺序处理消息 2:消费者处理者方式,创建一个线程池,在consumer拉取数据后,由线程池来中的线程来处理数据,把拉取数据与处理数据解耦,但数据处理有可能破坏partition的消息顺序 从Kafka 文档中我们也可

kafka consumer 配置详解

1.Consumer Group 与 topic 订阅 每个Consumer 进程都会划归到一个逻辑的Consumer Group中,逻辑的订阅者是Consumer Group.所以一条message可以被多个订阅message 所在的topic的每一个Consumer Group,也就好像是这条message被广播到每个Consumer Group一样.而每个Consumer Group中,类似于一个Queue(JMS中的Queue)的概念差不多,即一条消息只会被Consumer Group中

RabbitMQ 和 Kafka 的消息可靠性对比

RabbitMQ和Kafka都提供持久的消息保证.两者都提供至少一次和至多一次的保证,另外,Kafka在某些限定情况下可以提供精确的一次(exactly-once)保证. 让我们首先理解一下上述术语的含义: 至多一次投递:消息绝对不会被重复投递,但是消息可能丢失 至少一次投递:消息绝对不会被丢失,但是有可能重复被消费 精确的一次投递:消息系统的圣杯.所有的消息精确的被投递一次. “投递”貌似不是准确的语言描述,“处理”才是.无论怎么描述,我们关心的是,消费者能否处理消息,以及处理的次数.但是使用