重置kafka topic consumer的offset

如果你在使用Kafka来分发消息,在数据处理的过程中可能会出现处理程序出异常或者是其它的错误,会造成数据丢失或不一致。这个时候你也许会想要通过kafka把数据从新处理一遍,我们知道kafka默认会在磁盘上保存到7天的数据,你只需要把kafka的某个topic的consumer的offset设置为某个值或者是最小值,就可以使该consumer从你设置的那个点开始消费。

查询topic的offset的范围

用下面命令可以查询到topic:DynamicRange broker:SparkMaster:9092的offset的最小值:

$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -2

输出

DynamicRange:0:1288

查询offset的最大值:

$ /opt/cloudera/parcels/KAFKA/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list SparkMaster:9092 —topic DynamicRange --time -1

输出

DynamicRange:0:7885

从上面的输出可以看出topic:DynamicRange只有一个partition:0 offset范围为:[1288,7885]

设置consumer group的offset

启动zookeeper client

$ /opt/cloudera/parcels/CDH/lib/zookeeper/bin/zkCli.sh

通过下面命令设置consumer group:DynamicRangeGroup topic:DynamicRange partition:0的offset为1288:

set /consumers/DynamicRangeGroup/offsets/DynamicRange/0 1288

参考:https://metabroadcast.com/blog/resetting-kafka-offsets

转载请注明出处:http://www.cnblogs.com/keitsi/p/5685576.html

时间: 2024-10-15 21:34:24

重置kafka topic consumer的offset的相关文章

kafka producer consumer

package demo; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class producer { private final Producer<String, String> producer; public final static

【原创】Kafka console consumer源代码分析

上一篇中分析了Scala版的console producer代码,这篇文章来分析一下console consumer的工作原理.其实不论是哪个consumer,大部分的工作原理都是类似的.我们用console consumer作为切入点,既容易理解又不失一般性. 首先需要说明的,我使用的Kafka环境是Kafka0.8.2.1版本,这也是最新的版本.另外我们主要分析consumer的原理,没有过分纠结于console consumer的使用方法——所以我在这里选用了最简单的一条命令作为开始:bi

5.kafka API consumer

1.kafka consumer流程1.1.在启动时或者协调节点故障转移时,消费者发送ConsumerMetadataRequest给bootstrap brokers列表中的任意一个brokers.在ConsumerMetadataResponse中,它接收消费者对应的消费组所属的协调节点的位置信息. 1.2.消费者连接协调节点,并发送HeartbeatRequest.如果返回的HeartbeatResponse中返回IllegalGeneration错误码,说明协调节点已经在初始化.消费者就

kafka topic制定规则

kafka topic的制定,我们要考虑的问题有很多,比如生产环境中用几备份.partition数目多少合适.用几台机器支撑数据量,这些方面如何去考量?笔者根据实际的维护经验,写一些思考,希望大家指正. 1.replicas数目 可以从上图看到,备份越多,性能越低,因为kafka的写入只写入主分区,备份相当于消费者从主分区pull数据,这样势必会造成性能的损耗,故建议在生产环境中使用一主一备即可. 2. partition数量 (1)设置partition数量的时候我们需要注意:kafka的pa

关于Flink slot 和kafka topic 分区关系的说明

今天又有小伙伴在群里问 slot 和 kafka topic 分区(以下topic,默认为 kafka 的 topic )的关系,大概回答了一下,这里整理一份 首先必须明确的是,Flink Task Manager 的 slot 数 和 topic 的分区数是没有直接关系的,而这个问题其实是问的是: 任务的并发数与 slot 数的关系 最大并发数 = slot 数 这里有两个原因:每个算子的不同并行不能在同一slot,不同的算子可以共享 slot ,所以最大并行度 就等于 slot 数. 这样就

Kafka Topic Partition Replica Assignment实现原理及资源隔离方案

本文共分为三个部分: Kafka Topic创建方式 Kafka Topic Partitions Assignment实现原理 Kafka资源隔离方案 1. Kafka Topic创建方式 Kafka Topic创建方式有以下两种表现形式: (1)创建Topic时直接指定Topic Partition Replica与Kafka Broker之间的存储映射关系 /usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeep

手动删除Kafka Topic

一.删除Kafka topic 运行./bin/kafka-topics  --delete --zookeeper [zookeeper server]  --topic [topic name]:如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion 可以通过命令:./bin/kafka-topics --zookeeper

用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有kafka和RocketMQ. 在投递的时候我们使用的是非压平的消息模式(canal.mq.flatMessage =false //是否为flat json格式对象),然后消费topic的时候就一直无法正常显示和序列化,通过kafka-console-consumer.sh命令收到的消息如下图 在github上也能找到相关问题 canal-kafka 数据同步到

Kafka Java consumer动态修改topic订阅

前段时间在Kafka QQ群中有人问及此事--关于Java consumer如何动态修改topic订阅的问题.仔细一想才发现这的确是个好问题,因为如果简单地在另一个线程中直接持有consumer实例然后调用subscribe进行修改,consumer端必然会抛出异常ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access 和KafkaProducer不同的是,KafkaConsumer不