我现在使用的是librdkafka 的C/C++ 的客户端来生产消息,用flume来辅助处理异常的数据,,,
但是在前段时间,单独使用flume测试的时候发现,flume不能对分区进行负载均衡!同一个集群中,一个broker的一个分区已经有10亿条数据,另外一台的另一个分区只有8亿条数据;
因此,我对flume参照别人的做法,增加了拦截器;
即在flume配置文件中 增加以下字段;
-----
stage_nginx.sources.tailSource.interceptors = i2
stage_nginx.sources.tailSource.interceptors.i2.type=org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
stage_nginx.sources.tailSource.interceptors.i2.headerName=key
stage_nginx.sources.tailSource.interceptors.i2.preserveExisting=false
----
增加完后,要先进行自己测试,验证flume拦截器的负载均衡功能;
好,下来话不多少,,看测试步骤;
1,创建topic 相关联的分区 (因现场暂时只有2个分区,所以我这边暂时取2个分区做测试)
(我暂时使用的kafka版本是kafka_2.11-0.9.0.1,以下都是在kafka相关版本的bin路径下操作命令)
./kafka-topics.sh --create --zookeeper 192.165.1.91:12181,192.165.1.92:12181,192.165.1.64:12181 --replication-factor 1 --partitions 2 --topic test3
创建topic test3 不要分区 zookeeper 3台 分区2个 zookeeper端口号12181(我本地的broker端口号是19091,这个在kafka conf/ server.properties里边配置)
2,查看topic的创建情况
在broker的每台机器的目录下,分别查看topic的创建情况!
下边是我91机器的情况:
./kafka-topics.sh --describe --zookeeper 192.165.1.91:12181 --topic test3
Topic:test3 PartitionCount:2 ReplicationFactor:1 Configs:
Topic: test3 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
-------------意思是 他有俩个分区,,每个分区他的备份分区都是他们自己,即没有分区,,你们可以根据你们自身的现状做不同的操作;
3,要查消费情况,必须的建立消费组,,下来创建消费group
./kafka-console-consumer.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --topic test3 --from-beginning --new-consumer
4,查看自己创建的 group id号;
./kafka-consumer-groups.sh --bootstrap-server 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --list --new-consumer
本地我显示的是:console-consumer-54762
5,查询__consumer_offsets topic所有内容
注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false(同时要配置好你的consumer.properties中有关zookeeper和broker相关的IP和端口信息)
./kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 2 --broker-list 192.165.1.91:19092,192.165.1.92:19092,192.165.1.64:19092 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
额:运行这个居然会报错!!!见鬼了!!
6,后边再运行最后一步,显示如下,,,,但是为啥:消费个数是零???套路不对!!!推测情况:1,可能是我第二步没有消费;2,消费配置文件的时候配置错了;
-------------------------------------------------
6. 计算指定consumer group在__consumer_offsets topic中分区信息
这时候就用到了第4步获取的group.id(本例中是console-consumer-54762)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
所以在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息,下面让我们验证一下。
7. 获取指定consumer group的位移信息
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
--------------------------------------------------------
参考文章:http://www.cnblogs.com/huxi2b/p/6061110.html