Kafka 如何读取指定topic中的offset -------------用来验证分区是不是均衡!!!(__consumer_offsets)(注,本文尚在测试验证阶段,,,后续一俩天会追加修正)

我现在使用的是librdkafka 的C/C++ 的客户端来生产消息,用flume来辅助处理异常的数据,,,

但是在前段时间,单独使用flume测试的时候发现,flume不能对分区进行负载均衡!同一个集群中,一个broker的一个分区已经有10亿条数据,另外一台的另一个分区只有8亿条数据;

因此,我对flume参照别人的做法,增加了拦截器;

即在flume配置文件中 增加以下字段;

-----

stage_nginx.sources.tailSource.interceptors = i2
stage_nginx.sources.tailSource.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
stage_nginx.sources.tailSource.interceptors.i2.headerName=key
stage_nginx.sources.tailSource.interceptors.i2.preserveExisting=false

----

增加完后,要先进行自己测试,验证flume拦截器的负载均衡功能;

好,下来话不多少,,看测试步骤;

1,创建topic 相关联的分区 (因现场暂时只有2个分区,所以我这边暂时取2个分区做测试)

  (我暂时使用的kafka版本是kafka_2.11-0.9.0.1,以下都是在kafka相关版本的bin路径下操作命令

  ./kafka-topics.sh --create --zookeeper 192.165.1.91:12181,192.165.1.92:12181,192.165.1.64:12181 --replication-factor 1 --partitions 2 --topic test3

   创建topic test3  不要分区  zookeeper 3台   分区2个  zookeeper端口号12181(我本地的broker端口号是19091,这个在kafka  conf/ server.properties里边配置)

2,查看topic的创建情况

  在broker的每台机器的目录下,分别查看topic的创建情况!  

  下边是我91机器的情况:

  ./kafka-topics.sh --describe --zookeeper 192.165.1.91:12181 --topic test3

  

    Topic:test3 PartitionCount:2 ReplicationFactor:1 Configs:

    Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

    Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2

-------------意思是  他有俩个分区,,每个分区他的备份分区都是他们自己,即没有分区,,你们可以根据你们自身的现状做不同的操作;

3,要查消费情况,必须的建立消费组,,下来创建消费group

  ./kafka-console-consumer.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --topic test3 --from-beginning --new-consumer

4,查看自己创建的  group id号;

  ./kafka-consumer-groups.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --list --new-consumer

  本地我显示的是:console-consumer-54762

5,查询__consumer_offsets topic所有内容

  注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false(同时要配置好你的consumer.properties中有关zookeeper和broker相关的IP和端口信息

  ./kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 2 --broker-list 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

  额:运行这个居然会报错!!!见鬼了!!

6,后边再运行最后一步,显示如下,,,,但是为啥:消费个数是零???套路不对!!!推测情况:1,可能是我第二步没有消费;2,消费配置文件的时候配置错了;

-------------------------------------------------

6. 计算指定consumer group在__consumer_offsets topic中分区信息

这时候就用到了第4步获取的group.id(本例中是console-consumer-54762)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

Math.abs(groupID.hashCode()) % numPartitions

所以在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息,下面让我们验证一下。

7. 获取指定consumer group的位移信息 

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

--------------------------------------------------------

参考文章:http://www.cnblogs.com/huxi2b/p/6061110.html

时间: 2024-10-11 06:19:03

Kafka 如何读取指定topic中的offset -------------用来验证分区是不是均衡!!!(__consumer_offsets)(注,本文尚在测试验证阶段,,,后续一俩天会追加修正)的相关文章

php读取指定目录中的信息

$path = "ceshi"; //打开指定目录 $dirHandle = opendir($path); while (($item = readdir($dirHandle)) !== false) {        //.表示当前目录 ..表示上级目录        if ($item != "." && $item != "..") {             if (is_file($path."/"

读取指定路径的Excel内容到DataTable中

1 /// <summary> 2 /// 读取指定路径的Excel内容到DataTable中 3 /// </summary> 4 /// <param name="path"></param> 5 /// <returns></returns> 6 public DataTable ImportToDataSet(string path) 7 { 8 string strConn = "Provide

Java读取excel指定sheet中的各行数据,存入二维数组,包括首行,并打印

1. 读取 //读取excel指定sheet中的各行数据,存入二维数组,包括首行 public static String[][] getSheetData(XSSFSheet sheet) throws IOException { String[][] testArray = new String[sheet.getPhysicalNumberOfRows()][]; for(int rowId =0;rowId<sheet.getPhysicalNumberOfRows();rowId++)

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。

需求: 目前kafka的topic上有一批数据,这些数据被分配到9个不同的partition中(就是发布时key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)这些数据的唯一键值:int_id+start_time,其中int_id和start_time是topic record中的记录.这9组数据按照唯一键值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.prim

Kafka创建&amp;查看topic,生产&amp;消费指定topic消息

启动zookeeper和Kafka之后,进入kafka目录(安装/启动kafka参考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html) 1.创建Topic 1)运行命令: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1 2181 是zookeeper 端口 图示为创建成

架构设计:系统间通信(28)——Kafka及场景应用(中1)

(接上文<架构设计:系统间通信(27)--其他消息中间件及场景应用(上)>) 在本月初的写作计划中,我本来只打算粗略介绍一下Kafka(同样是因为进度原因).但是,最近有很多朋友要求我详细讲讲Kafka的设计和使用,另外两年前我在研究Kafka准备将其应用到生产环境时,由于没有仔细理解Kafka的设计结构所导致的问题最后也还没有进行交代.所以我决定即使耽误一些时间,也要将Kafka的原理和使用场景给读者详细讨论讨论.这样,也算是对两年来自己学习和使用Kafka的一个总结. 4.Kafka及特性

架构设计:系统间通信(29)——Kafka及场景应用(中2)

接上文:<架构设计:系统间通信(28)--Kafka及场景应用(中1)> 4-3.复制功能 我们在上文中已经讨论了Kafka使用分区的概念存储消息,一个topic可以有多个分区它们分布在整个Kafka集群的多个Broker服务节点中,并且一条消息只会按照消息生产者的要求进入topic的某一个分区.那么问题来了:如果某个分区中的消息在被消费端Pull之前,承载该分区的Broker服务节点就因为各种异常原因崩溃了,那么在这个Broker重新启动前,消费者就无法收到消息了. 为了解决这个问题,Apa

关于kafka更改消费者对应分组下的offset值

kafka的offset保存位置分为两种情况 0.9.0.0版本之前默认保存在zookeeper当中 0.9.0.0版本之后保存在broker对应的topic当中 1.如何辨别你启用的consumer的offset保存位置进入zookeeper的命令行当中 zkCli.sh localhost:2181 用 ls / 查看目录如果你在代码中定义的group id 没有在 /consumers 这个文件夹中,代表offset保存在broker的topic中前提是consumer确实已经创建并启动如

spark streaming读取kakfka数据手动维护offset

在spark streaming读取kafka的数据中,spark streaming提供了两个接口读取kafka中的数据,分别是KafkaUtils.createDstream,KafkaUtils.createDirectStream,前者会自动把offset更新到zk中,默认会丢数据,效率低,后者不会经过zk,效率更高,需要自己手动维护offse,通过维护护offset写到zk中,保障数据零丢失,只处理一次,下面来看看KafkaUtils.createDirectStream的使用,我把z