Java API获取非compacted topic总消息数

目前Kafka并没有提供直接的工具来帮助我们获取某个topic的当前总消息数,需要我们自行写程序来实现。下列代码可以实现这一功能,特此记录一下:

/**
     * 获取某个topic的当前消息数
     * Java 8+ only
     *
     * @param topic
     * @param brokerList
     * @return
     */
    public static long totalMessageCount(String topic, String brokerList) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic))
                    .orElse(Collections.emptyList())
                    .stream()
                    .map(info -> new TopicPartition(info.topic(), info.partition()))
                    .collect(Collectors.toList());
            Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps);
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);

            return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum();
        }
    }

  

原文地址:https://www.cnblogs.com/huxi2b/p/9530072.html

时间: 2025-01-12 14:35:23

Java API获取非compacted topic总消息数的相关文章

使用JAVA API获取hadoop集群的FileSystem

所需要配置的参数:  Configuration conf = new Configuration();   conf.set("fs.defaultFS", "hdfs://hadoop2cluster");   conf.set("dfs.nameservices", "hadoop2cluster");   conf.set("dfs.ha.namenodes.hadoop2cluster", &qu

Java 数组 获取二维数组的行数和列数

对于Object[][] array,array.length返回行数,array[0].length返回列数,元素个数为array.length*array[0].length. 参考资料 在JAVA中怎样求二维数组的行数和列数? 原文地址:https://www.cnblogs.com/WJQ2017/p/8412615.html

如何实现桌面App图标可以动态显示消息数(类似手机上的QQ图标)?

原文:如何实现桌面App图标可以动态显示消息数(类似手机上的QQ图标)? 手机上的APP , 像QQ和微信等都可以在图标上动态显示消息数(最大99) , 那么你有没有想过这些效果是如何实现的?桌面上开发的传统应用程序能否也实现类似的功能? 1 思路 桌面快捷方式的图标本质上就是基于一个图片产生的 , 第一种是动态生成图标(不过感觉比较费事且也消耗资源) , 建议方式是预先定义从0到99这100个图标(0就是不显示消息数 , >=99的就用99代替); 获取用户的未处理消息数(根据业务情况产生 ,

Kafka Java API操作topic

Kafka官方提供了两个脚本来管理topic,包括topic的增删改查.其中kafka-topics.sh负责topic的创建与删除:kafka-configs.sh脚本负责topic的修改和查询,但很多用户都更加倾向于使用程序API的方式对topic进行操作. 上一篇文章中提到了如何使用客户端协议(client protocol)来创建topic,本文则使用服务器端的Java API对topic进行增删改查.开始之前,需要明确的是,下面的代码需要引入kafka-core的依赖,以kafka 0

JMS开发步骤和持久化/非持久化Topic消息

------------------------------------------------ 开发一个JMS的基本步骤如下: 1.创建一个JMS connection factory 2.通过connection factory来创建JMS connection 3.启动JMS connection 4.通过connection创建JMS session 5.创建JMS destination 6.创建JMS producer 或者创建JMS message,并设置destination 7

java 正则表达式获取匹配和非获取匹配

1 package test1; 2 3 import java.util.regex.Matcher; 4 import java.util.regex.Pattern; 5 6 public class TestExp { 7 /** 8 * 9 * 在使用正则表达式的时候,我们经常会使用()把某个部分括起来,称为一个子模式. 子模式有Capturing和Non-Capturing两种情况. 10 * Capturing指获取匹配 : 11 * 是指系统会在幕后将所有的子模式匹配结果保存起来

使用Google Elevation API获取海拔高度(java版)

Google Elevation API提供了根据某地经纬度获取该地点海拔高度的接口.开发者可以调用该API获取地点的海拔信息.使用Google Elevation API之前,用户首先需要注册为Google Developer,获取API key. 开发者可以查看API的调用记录,目前Elevation API的限制次数为2500次/每天,每次API访问的字节数小于2000字符. 相应的Java代码如下所示: package elevation; import java.io.*; impor

使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)

使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通

Java API 各个包的内容解释

java.applet 提供创建 applet 所必需的类和 applet 用来与其 applet 上下文通信的类. java.awt 包含用于创建用户界面和绘制图形图像的所有类. java.awt.color 提供用于颜色空间的类. java.awt.datatransfer 提供在应用程序之间和在应用程序内部传输数据的接口和类. java.awt.dnd Drag 和 Drop 是一种直接操作动作,在许多图形用户界面系统中都会遇到它,它提供了一种机制,能够在两个与 GUI 中显示元素逻辑相关