kafka查看消费数据

一、如何查看

在老版本中,使用kafka-run-class.sh 脚本进行查看。但是对于最新版本,kafka-run-class.sh 已经不能使用,必须使用另外一个脚本才行,它就是kafka-consumer-groups.sh

普通版

查看所有组

要想查询消费数据,必须要指定组。那么线上运行的kafka有哪些组呢?使用以下命令:

bin/kafka-consumer-groups.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --list

注意:根据实际情况修改kafka的连接地址

执行输出:

...
usercenter
...

这些组在是程序代码里面定义的,比如usercenter就是一个业务模块。

查看消费情况

bin/kafka-consumer-groups.sh --describe --bootstrap-server kafka-1.default.svc.cluster.local:9092 --group usercenter

参数解释:

--describe  显示详细信息

--bootstrap-server 指定kafka连接地址

--group 指定组。

注意:--group指定的组必须存在才行!可以用上面的--list命令来查看

执行输出:

TOPIC                         PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST             CLIENT-ID
xx-pending-business-c0a9dc75 0          -               0               -               consumer-1-20b51e57-70de-417b-ba4a-4e7ffbb6df7a /192.169.220.117 consumer-1
xx-pending-business-c0a9dc71 0          1082            1082            0               -                                               -                -

指定自己的分组 自己消费的topic会显示kafka总共有多少数据,以及已经被消费了多少条

标记解释:

GROUP TOPIC PID OFFSET LOGSIZE  LAG
消费者组 主题id   分区id  当前已消费的条数 总条数 未消费的条数

从上面的信息可以看出,topic为xx-pending-business-c0a9dc71 总共消费了1082条信息, 未消费的条数为0。也就是说,消费数据没有积压的情况!

注意:以kafkaspout类作为消费者去读kafka数据,相当于直接从kafka server上取文件,没有消费者组的概念

每次读的数据存在自己zk的offet中,所以不能通过上述命令查看

ACL版查看

如果kafka启用了acl权限验证,则不能直接使用上面的命令查看,需要增加参数--command-config参数才行

此参数必须要指定一个配置文件才行

修改默认的配置文件

cd /kafka_2.12-2.1.0/
vim config/config/consumer.properties

最后一行增加2行,表示指定SASL协议连接

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

查看所有组

bin/kafka-consumer-groups.sh --command-config config/consumer.properties  --bootstrap-server kafka-1.default.svc.cluster.local:9092 --list

注意:--command-config 后面的配置文件,我用的是相对路径。请以实际情况为准!

查看消费情况

bin/kafka-consumer-groups.sh --command-config config/consumer.properties  --describe --bootstrap-server kafka-1.default.svc.cluster.local:9092 --group usercenter

如果需要使用shell脚本,来检测kafka的消费数据,有没有积压。

可以先使用--list命令,然后对结果使用for循环遍历,将组名赋予到--group参数上即可!

本文参考链接:

https://blog.csdn.net/sweetgirl520/article/details/80323584

原文地址:https://www.cnblogs.com/xiao987334176/p/10199994.html

时间: 2024-09-27 16:06:02

kafka查看消费数据的相关文章

Flume简介与使用(三)——Kafka Sink消费数据之Kafka安装

前面已经介绍了如何利用Thrift Source生产数据,今天介绍如何用Kafka Sink消费数据. 其实之前已经在Flume配置文件里设置了用Kafka Sink消费数据 agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafkaSink.topic = TRAFFIC_LOG agent1.sinks.kafkaSink.brokerList = 10.208.129.3:90

关于kafka重新消费数据问题

我们在使用consumer消费数据时,有些情况下我们需要对已经消费过的数据进行重新消费,这里介绍kafka中两种重新消费数据的方法. 1. 修改offset 我们在使用consumer消费的时候,每个topic会产生一个偏移量,这个偏移量保证我们消费的消息顺序且不重复.Offest是在zookeeper中存储的,我们可以设置consumer实时或定时的注册offset到zookeeper中.我们修改这个offest到我们想重新消费的位置,就可以做到重新消费了.具体修改offest的方法这里就不详

kafka从头消费数据

从头消费数据需满足两个条件: 1. groupid为新的 2. auto_offset_reset_config的值为earliest Properties p = new Properties(); p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

相同数据源情况下,使用Kafka实时消费数据 vs 离线环境下全部落表后处理数据,结果存在差异

原因分析: 当某个consumer宕机时,消费位点(例如2s提交一次)尚未提交到zookeeper,此时Kafka集群自动rebalance后另一consumer来接替该宕机consumer继续消费,因为先前宕机consumer最近的消费位点尚未提交,导致数据重复消费 突发流量.跨机房(网络请求延时高).网络不稳定,出现丢包现象 业务逻辑有偏差 常见丢包现象如突然掉线.页面卡住.视频卡住.图片加载卡主等,使用Ping测量丢包的最佳方法是向一个IP地址发送大量的Ping命令,然后检查没有应答的那些

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

Kafka重复消费和丢失数据研究

Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交. 原因1:强行kill线程,导致消费后的数据,offset没有提交. 原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费.例如: try { consumer.unsubscribe(); } catch (Exception e) { } try { consumer.close(); }

分享一些 Kafka 消费数据的小经验

前言 之前写过一篇<从源码分析如何优雅的使用 Kafka 生产者> ,有生产者自然也就有消费者. 建议对 Kakfa 还比较陌生的朋友可以先看看. 就我的使用经验来说,大部分情况都是处于数据下游的消费者角色.也用 Kafka 消费过日均过亿的消息(不得不佩服 Kakfa 的设计),本文将借助我使用 Kakfa 消费数据的经验来聊聊如何高效的消费数据. 单线程消费 以之前生产者中的代码为例,事先准备好了一个 Topic:data-push,3个分区. 先往里边发送 100 条消息,没有自定义路由

[转帖]kafka 如何保证数据不丢失

https://www.cnblogs.com/MrRightZhao/p/11498952.html 一般我们在用到这种消息中件的时候,肯定会考虑要怎样才能保证数据不丢失,在面试中也会问到相关的问题.但凡遇到这种问题,是指3个方面的数据不丢失,即:producer consumer 端数据不丢失  broker端数据不丢失下面我们分别从这三个方面来学习,kafka是如何保证数据不丢失的 一.producer 生产端是如何保证数据不丢失的 1.ack的配置策略 acks = 0 生产者发送消息之

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874 3.编写Pom文件 <?xml v