监控Kafka消费进度

使用Kafka作为消息中间件消费数据时,监控Kafka消费的进度很重要。其中,在监控消费进度的过程中,主要关注消费Lag。

常用监控Kafka消费进度的方法有三种,分别是使用Kafka自带的命令行工具、使用Kafka Consumer API和Kafka自带的JMX监控指标,这里介绍前两种方法。
注: 内网IP:10.12.100.126 10.12.100.127 10.12.100.128 外网IP:47.90.133.76 47.90.133.77 47.90.133.78 用户名:server1 server2 server3

1 使用kafka自带的命令行工具

针对Kafka高级消费API,使用kafka自带的命令行工具kafka-consumer-groups.sh脚本直接查看Kafka消费进度

1.1 列出存在的所有消费者组

(base) [email protected]:/opt/kafka/kafka_2.11-0.10.2.2/bin#  kafka-consumer-groups.sh new-consumer --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
consumer
consumers

1.2 使用kafka-consumer-groups.sh查看消费进度

(base) [email protected]:/opt/kafka/kafka_2.11-0.10.2.2/bin# kafka-consumer-groups.sh --bootstrap-server 10.12.100.126:9092,10.12.100.127:9092,10.12.100.128:9092 --describe --group consumers
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/hadoop/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/kafka/kafka_2.11-0.10.2.2/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
test                        0          17734           17734           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                  consumer-1
test                        1          17736           17736           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                   consumer-1
test                        2          17735           17735           0          consumer-1-dd342b6b-176a-4127-9a0b-81a3b46bd388   /47.90.133.76                  consumer-1

GROUP TOPIC PID OFFSET LOGSIZE LAG
消费者组 话题id 分区id 当前已消费的条数 总条数 未消费的条数

注意:LAG的单位时消息条数,LAG为0,表示消费者实时消费生产者产生的消息,无滞后;LAG越大,表示消费者不能及时消费生产者生产的消息,有滞后。

2 使用kafka Consumer API

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition
def get_topic_offset(brokers, topic):
    client = SimpleClient(brokers)
    partitions = client.topic_partitions[topic]
    offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in  partitions.keys()]
    offsets_responses = client.send_offset_request(offset_requests)
    return sum([r.offsets[0] for r in offsets_responses])
def get_group_offset(brokers, group_id, topic):
    consumer = KafkaConsumer(bootstrap_servers=brokers,
                             group_id=group_id,
                             )
    pts = [TopicPartition(topic=topic, partition=i) for i in
           consumer.partitions_for_topic(topic)]
    result = consumer._coordinator.fetch_committed_offsets(pts)
    return sum([r.offset for r in result.values()])
if __name__ == '__main__':
    topic_offset =  get_topic_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "test")
    group_offset =  get_group_offset("47.90.133.76:9092,47.90.133.77:9092,47.90.133.78:9092", "consumers", "test")
    lag = topic_offset - group_offset
    print(topic_offset) # topic的offset总和
    print(group_offset) # topic特定group已消费的offset的总和
    print(lag) # 未消费的条数

(base) [email protected]:~# python  getKafkaLag.py
17735
17735
0

代码参考:https://www.jianshu.com/p/e48af92e199d

原文地址:https://www.cnblogs.com/eugene0/p/12233132.html

时间: 2024-11-08 04:29:01

监控Kafka消费进度的相关文章

zabbix监控kafka消费

一.Kafka监控的几个指标 1.lag:多少消息没有消费 lag=logsize-offset 2.logsize:Kafka存的消息总数 3.offset:已经消费的消息 Kafka管理工具 介绍: https://www.iteblog.com/archives/1605.html 二.查看zookeeper配置 cat /home/app/zookeeper/zookeeper/conf/zoo.cfg | egrep -v "^$|^#" clientPort=2181 三.

kafka将消费进度重置到最新的位置

/** *重置kafka消费进度 *参数中需要指定kafka集群中一台broker地址,要重置的topic名称,消费组,以及partition个数 */public static void seekLatest(String broker, String topic, String group, int partitionCnt){ Map<String, Object> configProps = new HashMap<>(); configProps.put(Consumer

从外部重置一个运行中consumer group的消费进度

对于0.10.1以上版本的kafka, 如何从外部重置一个运行中的consumer group的进度呢?比如有一个控制台,可以主动重置任意消费组的消费进度重置到12小时之前. 需要这么几个步骤: 1. 加入这个group 2. 踢掉所有其它group memeber 3. try assign all TopicPartition to this client 4. commit offsets 5. leave group 其中第二步是为了让自己当上leader,当然有可能不需要踢掉其它所有成

Kafka 消息监控 - Kafka Eagle

1.概述 在开发工作当中,消费 Kafka 集群中的消息时,数据的变动是我们所关心的,当业务并不复杂的前提下,我们可以使用 Kafka 提供的命令工具,配合 Zookeeper 客户端工具,可以很方便的完成我们的工作.随着业务的复杂化,Group 和 Topic 的增加,此时我们使用 Kafka 提供的命令工具,已预感到力不从心,这时候 Kafka 的监控系统此刻便尤为显得重要,我们需要观察消费应用的详情. 监控系统业界有很多杰出的开源监控系统.我们在早期,有使用 KafkaMonitor 和

jmxtrans监控kafka

我们知道jmx可以将程序内部的信息暴露出来,但是要想监控这些信息的话,就还需要自己写java程序调用jmx接口去获取数据,并按照某种格式发送到其他地方(如监控程序zabbix,ganglia).这时jmxtrans就派上用场了,jmxtrans的作用是自动去jvm中获取所有jmx格式数据,并按照某种格式(json文件配置格式)输出到其他应用程序(常用的有ganglia) 安装 主页:https://github.com/jmxtrans/jmxtrans(这里面也有一个下载地址,貌似版本更高)

kafka 消费?

前置资料  kafka kafka消费中的问题及解决方法: 情况1: 问题:脚本读取kafka 数据,写入到数据库,有时候出现MySQL server has gone away,导致脚本死掉.再次启动,这过程中的kafka数据丢失. 原因:MySQL server has gone away 出现可能是连接超时,可能超过每秒请求上限-这些异常是小概率事件,难以避免.git kafka 的demo脚本是实时监听的脚本, 简单明了,没有再去针对kafka偏移量研究:但是一旦断掉, 过程中的kafk

分享一些 Kafka 消费数据的小经验

前言 之前写过一篇<从源码分析如何优雅的使用 Kafka 生产者> ,有生产者自然也就有消费者. 建议对 Kakfa 还比较陌生的朋友可以先看看. 就我的使用经验来说,大部分情况都是处于数据下游的消费者角色.也用 Kafka 消费过日均过亿的消息(不得不佩服 Kakfa 的设计),本文将借助我使用 Kakfa 消费数据的经验来聊聊如何高效的消费数据. 单线程消费 以之前生产者中的代码为例,事先准备好了一个 Topic:data-push,3个分区. 先往里边发送 100 条消息,没有自定义路由

SSM(十六) 曲线救国-Kafka消费异常

最近线上遇到一个问题:在消费kafka消息的时候如果长时间(大概半天到一天的时间)队列里没有消息就可能再也消费不了.针对这个问题我们反复调试多次.线下模拟,调整代码,但貌似还是没有找到原因.但是只要重启消费进程就又可以继续消费. 解决方案 由于线上业务非常依赖kafka的消费,但一时半会也没有找到原因,所以最后只能想一个临时的替换方案: 基于重启就可以消费这个特点,我们在每次消费的时候都记下当前的时间点,当这个时间点在十分钟之内都没有更新我们就认为当前队列中没有消息了,就需要重启下消费进程. 既

Kafka消费组(consumer group)

一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka社区邮件组已经在讨论是否应该正式使用新版本consumer替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本consumer的些许设计理念,希望能把consumer这点事说清楚,从而对广大使用者有所帮助. 在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东