Kafka获取订阅某topic的所有consumer group【客户端版】

之前写过如何用服务器端的API代码来获取订阅某topic的所有consumer group,参见这里。使用服务器端的API需要用到kafka.admin.AdminClient类,但是这个类在0.11.0.0版本已经被标记为不推荐使用了,故目前最合适的方式还是通过客户端API:org.apache.kafka.clients.admin.AdminClient。今天碰到有人问这个问题,我就尝试写了一个。使用之前你需要引入kafka client包依赖(以2.2.0版本为例)

Maven:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>

Gradle:

compile group: ‘org.apache.kafka‘, name: ‘kafka-clients‘, version: ‘2.2.0‘

下面是代码:

 1 private static List<String> getGroupsForTopic(String brokerServers, String topic)
 2             throws ExecutionException, InterruptedException, TimeoutException {
 3         Properties props = new Properties();
 4         props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers);
 5
 6         try (AdminClient client = AdminClient.create(props)) {
 7             List<String> allGroups = client.listConsumerGroups()
 8                     .valid()
 9                     .get(10, TimeUnit.SECONDS)
10                     .stream()
11                     .map(ConsumerGroupListing::groupId)
12                     .collect(Collectors.toList());
13
14             Map<String, ConsumerGroupDescription> allGroupDetails =
15                     client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS);
16
17             final List<String> filteredGroups = new ArrayList<>();
18             allGroupDetails.entrySet().forEach(entry -> {
19                 String groupId = entry.getKey();
20                 ConsumerGroupDescription description = entry.getValue();
21                 boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment)
22                         .map(MemberAssignment::topicPartitions)
23                         .map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
24                         .anyMatch(tps -> tps.contains(topic));
25                 if (topicSubscribed)
26                     filteredGroups.add(groupId);
27             });
28             return filteredGroups;
29         }
30     }

我会假设你的集群中没有配置安全认证和授权机制或者发起此AdminClient的用户是合法用户且有CLUSTER以及GROUP的DESCRIBE权限。

另外值得注意的是,上面这个函数无法获取非运行中的consumer group,即虽然一个group订阅了某topic,但是若它所有的consumer成员都关闭的话这个函数是不会返回该group的。

原文地址:https://www.cnblogs.com/huxi2b/p/10638008.html

时间: 2024-12-08 23:22:58

Kafka获取订阅某topic的所有consumer group【客户端版】的相关文章

.net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

依据Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代码块,还原本地环境,跟踪调试,发现自定义Consumer Group 的Consumer并没有分配到PartionID,如下图. 据此,基本就可以定位到不同组Consumer无法覆盖Partition的问题根源了. 仔细阅读Rebalance代码,发现Kafka.Client 在获取consumer时,并没有根据Group做筛选,获取到的是所有组的Consumer,如下图 (此处只

Kafka 如何读取指定topic中的offset -------------用来验证分区是不是均衡!!!(__consumer_offsets)(注,本文尚在测试验证阶段,,,后续一俩天会追加修正)

我现在使用的是librdkafka 的C/C++ 的客户端来生产消息,用flume来辅助处理异常的数据,,, 但是在前段时间,单独使用flume测试的时候发现,flume不能对分区进行负载均衡!同一个集群中,一个broker的一个分区已经有10亿条数据,另外一台的另一个分区只有8亿条数据: 因此,我对flume参照别人的做法,增加了拦截器: 即在flume配置文件中 增加以下字段: ----- stage_nginx.sources.tailSource.interceptors = i2sta

Kafka消费组(consumer group)

一直以来都想写一点关于kafka consumer的东西,特别是关于新版consumer的中文资料很少.最近Kafka社区邮件组已经在讨论是否应该正式使用新版本consumer替换老版本,笔者也觉得时机成熟了,于是写下这篇文章讨论并总结一下新版本consumer的些许设计理念,希望能把consumer这点事说清楚,从而对广大使用者有所帮助. 在开始之前,我想花一点时间先来明确一些概念和术语,这会极大地方便我们下面的讨论.另外请原谅这文章有点长,毕竟要讨论的东西很多,虽然已然删除了很多太过细节的东

kafka.common.KafkaException: fetching topic metadata for topics

2016-10-26 11:05:29,716  WARN [flume_dx2.zdp.ol-1477451127056-e2015d55-leader-finder-thread] kafka.consumer.ConsumerFetcherManager$LeaderFinderThread (line 89) [flume_dx2.zdp.ol-1477451127056-e2015d55-leader-finder-thread], Failed to find leader for 

Azure ARM模式下获取订阅下VM信息

Azure ARM模式下获取VM信息 马上就要双十一了,对于一些大客户而言,使用的VM机器超过几百台,无论是促销活动还是每个季度的机器梳理,都需要对这些VM进行梳理总结,是否有公网IP,IP动态静态,对于动态IP而言,因为机器重启有可能造成IP改变,有可能对业务造成很大影响.所以快速获取订阅下所有VM信息就很重要. 如下介绍通过直接运行PowerShell脚本就可以很快获取到全部信息 脚本如下: 可以通过powershell收集统计VM的公网IP,内网IP,机器型号,位置,操作系统,端口 # L

从外部重置一个运行中consumer group的消费进度

对于0.10.1以上版本的kafka, 如何从外部重置一个运行中的consumer group的进度呢?比如有一个控制台,可以主动重置任意消费组的消费进度重置到12小时之前. 需要这么几个步骤: 1. 加入这个group 2. 踢掉所有其它group memeber 3. try assign all TopicPartition to this client 4. commit offsets 5. leave group 其中第二步是为了让自己当上leader,当然有可能不需要踢掉其它所有成

Consumer group理解深入

每一个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费.(不同consumer group可以同时消费同一条消息) 很多传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证queue的长度比较少,提高效率.而如上文所将,Kafka并不删除 已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer

Java API获取非compacted topic总消息数

目前Kafka并没有提供直接的工具来帮助我们获取某个topic的当前总消息数,需要我们自行写程序来实现.下列代码可以实现这一功能,特此记录一下: /** * 获取某个topic的当前消息数 * Java 8+ only * * @param topic * @param brokerList * @return */ public static long totalMessageCount(String topic, String brokerList) { Properties props =

Kafka Java API操作topic

Kafka官方提供了两个脚本来管理topic,包括topic的增删改查.其中kafka-topics.sh负责topic的创建与删除:kafka-configs.sh脚本负责topic的修改和查询,但很多用户都更加倾向于使用程序API的方式对topic进行操作. 上一篇文章中提到了如何使用客户端协议(client protocol)来创建topic,本文则使用服务器端的Java API对topic进行增删改查.开始之前,需要明确的是,下面的代码需要引入kafka-core的依赖,以kafka 0