Kafka(七)消费者偏移量

在Kafka0.9版本之前消费者保存的偏移量是在zookeeper中/consumers/GROUP.ID/offsets/TOPIC.NAME/PARTITION.ID。新版消费者不在保存偏移量到zookeeper中,而是保存在Kafka的一个内部主题中“__consumer_offsets”,该主题默认有50个分区,每个分区3个副本,分区数量有参数offset.topic.num.partition设置。通过消费者组ID的哈希值和该参数取模的方式来确定某个消费者组已消费的偏移量保存到__consumer_offsets主题的哪个分区中。

Kafka消费者API提供两种方法用来查询偏移量。一个是committed(TopicPartition partition)方法,这个方法返回一个OffsetAndMetadata对象,通过这个对象可以获取指定分区已提交的偏移量;另外一个方法position(TopicPartition partition)返回的是下一次拉取位置。

同时Kafka消费者还提供了重置消费偏移量的方法,seek(TopicPartition partition, long offset),该方法用于指定消费起始位置,另外还有seekToBeginning()和seekToEnd(),从名字就能看出来是干嘛的。

偏移量提交有自动和手动,默认是自动(enable.auto.commit = true)。自动提交的话每隔多久自动提交一次呢?这个由消费者协调器参数auto.commit.interval.ms 毫秒执行一次提交。有些场景我们需要手动提交偏移量,尤其是在一个长事务中并且保证消息不被重复消费以及消息不丢失,比如生产者一个订单提交消息,消费者拿到后要扣减库存,扣减成功后该消息才能提交,所以在这种场景下需要手动提交,因为库存扣减失败这个消息就不能消费,同时客户这个订单状态也不能是成功。手动提交也有两种一个是同步提交一个是异步提交,其区别就是消费者线程是否阻塞。如果使用手动提交就要关闭自动提交,因为自动提交默认是开启的。

原文地址:http://blog.51cto.com/littledevil/2148207

时间: 2024-10-10 17:39:40

Kafka(七)消费者偏移量的相关文章

java实现Kafka的消费者示例

使用java实现Kafka的消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 8

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

kafka 生产者消费者 api接口

生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class KafkaProducer  extends Thread{ private String topic

spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)

application-test.properties 1 #kafka 2 kafka.consumer.zookeeper.connect=*:2181 3 kafka.consumer.servers=*:9092 4 kafka.consumer.enable.auto.commit=true 5 kafka.consumer.session.timeout=6000 6 kafka.consumer.auto.commit.interval=1000 7 #保证每个组一个消费者消费同一

关于kafka更改消费者对应分组下的offset值

kafka的offset保存位置分为两种情况 0.9.0.0版本之前默认保存在zookeeper当中 0.9.0.0版本之后保存在broker对应的topic当中 1.如何辨别你启用的consumer的offset保存位置进入zookeeper的命令行当中 zkCli.sh localhost:2181 用 ls / 查看目录如果你在代码中定义的group id 没有在 /consumers 这个文件夹中,代表offset保存在broker的topic中前提是consumer确实已经创建并启动如

kafka 创建消费者报错

kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181 --topic test --from-beginning # zookeeper is not a recognized option 在创建消费者的时候出现  zookeeper is not a recognized option,经过在浏览器搜索发现是版本问题,消费者读取信息配置不在是 zookeeper 的节点信息,而直接改成 kafka

Kafka消费者没有收到通知的分析

今天遇到两位三方人员跟我反馈,某微服务的异步接口功能不正常了,由于该异步接口采用Kafka异步消息的方案,对方说没有收到Kafka给消费者的通知,根据此问题,联系了相关人员进行了分析: (一)明确环境是否一致 1.生产者和消费者链接Kafka的地址是否一致,初步发现A方消费者链接Kafka的地址不正确,没有与生产者链接Kafka的地址保持一致. 2.topic和key是否都一致,初步可以确定A方消费的topic和key都是跟生产者的topic和key是一一对应的. (二)生产者和Kafka之间的

Kafka实战解惑

一.Kafka简介 Kafka是LinkedIn使用Scala开发的一个分布式消息中间件,它以水平扩展能力和高吞吐率著称,被广泛用于日志处理.ETL等应用场景.Kafka具有以下主要特点: 消息的发布.订阅均具有高吞吐量:据统计数字表明,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB). 消息可持久化:消息可持久化到磁盘并且通过Replication机制防止数据丢失. 分布式系统,可水平扩展:所有的生产者(Producer).消费者(Consumer).消息中

Kafka、RabbitMQ、RocketMQ、ActiveMQ 17 个方面综合对比

本文将从,Kafka.RabbitMQ.ZeroMQ.RocketMQ.ActiveMQ 17 个方面综合对比作为消息队列使用时的差异.(欢迎加入Java程序员群:630441304,一起学习交流会) 一.资料文档 Kafka:中.有kafka作者自己写的书,网上资料也有一些. rabbitmq:多.有一些不错的书,网上资料多. zeromq:少.没有专门写zeromq的书,网上的资料多是一些代码的实现和简单介绍. rocketmq:少.没有专门写rocketmq的书,网上的资料良莠不齐,官方文