Kafka消息重新发送

Kafka消息重新发送

 

1、  使用kafka消息队列做消息的发布、订阅,如果consumer端消费出问题,导致数据并没有消费,此时不需要担心,数据并不会立刻丢失,kafka会把数据在服务器的磁盘上默认存储7天,或者自己指定有两种方式:1)指定时间,log.retention.hours=1682)指定大小,log.segment.bytes=1073741824。此时就可以通过重置某个topicoffset来是消息重新发送,进行消费

 

2、        查看topic的offset的范围

 

1)使用下面的命令可以查看topicuserlogbrokerspark:9092offset的最小值:

#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -2

 

2)offset的最大值:

#./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list spark:9092 -topic userlog --time -1

 

 

3、        设置consumer group的offset

1)启动zookeeper,如果使用的是kafka内置的zookeeper,直接启动bin目录下的zookeeper-shell.sh,进行登录:

#./zookeeper-shell.sh  localhost:2181

通过如下命令,来重置offset值,比如topic:userlog,group:userlogs,partition:0 ,offset:2181

 

set /consumers/userlogs/offsets/userlog/0 1288

 

如果有多个partition都执行上输的命令,并将0换为相对应的分区号就可以了。

###如果创建分区的时候设置了zk的根目录,如localhost:2181/kafka

则重置的命令为:

set /kafka/consumers/userlogs/offsets/userlog/0 1288

 

2)如果使用单独安装的zookeeper,直接使用bin目录下的ZKCli.sh登录后,执行上述命令即可。

 

4、 设置完成后需要重启相关的服务,就可以从设置offset的地方开始消费。

重启服务:consumer服务。

时间: 2024-10-04 19:07:48

Kafka消息重新发送的相关文章

【转】解决Maxwell发送Kafka消息数据倾斜问题

最近用Maxwell解析MySQL的Binlog,发送到Kafka进行处理,测试的时候发现一个问题,就是Kafka的Offset严重倾斜,三个partition,其中一个的offset已经快200万了,另外两个offset才不到两百.Kafka数据倾斜的问题一般是由于生产者使用的Partition接口实现类对分区处理的问题,一般是对key做hash之后,对分区数取模.当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势(参考Apache Kaf

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada

【Kafka 源码解读】之 【代码没报错但是消息却发送失败!】

聊聊最近,2020年,在2019年的年尾时,大家可谓对这年充满新希望,特别是有20200202这一天.可是澳洲长达几个月的大火,新型冠状病毒nCoV的发现,科比的去世等等事情,让大家感到相当的无奈,生命是如此的脆弱,明天又是如此的未知.但是人应当活在当下,勇敢的面对疫情,和大家和政府一起打赢这场没硝烟的战争! 作为程序员,我必定不能停止工作,不能停止学习,现在在家办公,完全配合现在提倡的隔离战术,对自己负责,对社会负责.下面我会和大家分享一篇我之前写的笔记,和大家一起讨论关于 Kafka 的一个

使用kafka消息队列解决分布式事务

微服务框架Spring Cloud介绍 Part1: 使用事件和消息队列实现分布式事务 本文转自:http://skaka.me/blog/2016/04/21/springcloud1/ 不同于单一架构应用(Monolith), 分布式环境下, 进行事务操作将变得困难, 因为分布式环境通常会有多个数据源, 只用本地数据库事务难以保证多个数据源数据的一致性. 这种情况下, 可以使用两阶段或者三阶段提交协议来完成分布式事务.但是使用这种方式一般来说性能较差, 因为事务管理器需要在多个数据源之间进行

Kafka消息topic分区

kafka是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息数据库,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小.因此有了partition的概念. kafka对消息进行一定的计算,通过hash来进行分区.这样,就把一份log文件分成了多份.如上面的分区读写日志图,分成多份以后,在单台broker上,比如快速上手中,

apache kafka消息服务

apache kafka中国社区QQ群:162272557 apache kafka参考 http://kafka.apache.org/documentation.html 消息队列分类: 点对点: 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息.这里要注意: 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息. Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费. 发布/订阅 消息生产者(发布)将消息

一文看懂Kafka消息格式的演变

摘要 对于一个成熟的消息中间件而言,消息格式不仅关系到功能维度的扩展,还牵涉到性能维度的优化.随着Kafka的迅猛发展,其消息格式也在不断的升级改进,从0.8.x版本开始到现在的1.1.x版本,Kafka的消息格式也经历了3个版本.本文这里主要来讲述Kafka的三个版本的消息格式的演变,文章偏长,建议先关注后鉴定. Kafka根据topic(主题)对消息进行分类,发布到Kafka集群的每条消息都需要指定一个topic,每个topic将被分为多个partition(分区).每个partition在

如何在优雅地Spring 中实现消息的发送和消费

本文将对rocktmq-spring-boot的设计实现做一个简单的介绍,读者可以通过本文了解将RocketMQ Client端集成为spring-boot-starter框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个spring-boot-starter工具包来配置,发送和消费RocketMQ消息. 作者简介:辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对Microservice.Messaging和Storage等领域有深刻

kafka消息通信原理学习(1)

关于 Topic 和 Partition: Topic: 在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合.每条消息发送到 kafka 集群的消息都有一个类别.物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息. Partition: 每个 topic 可以划分多个分区(每个 Topic 至少有一个分区),同一 topic 下的不同分区包含的消息是不同的.每个消息在被添加到分区时,