Kafka 命令
Kafka启动的时候使用 –daemon 后台运行,其命令如下:
./kafka-server-start.sh –daemon ../config/server.properties
1) 查看当前某个group 消费的
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group 1 --topic 000B_7F14
得到的结果为:
其中各参数解释如下:
Group:消费的group id
Topic:主题
Pid:分区
Offset:消费的数目
logSize:总数
lag:未消费的条数
在执行这条命令的时候得到下面的提示:
WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
也就是说在0.9版本之后,该命令已经被废弃了,所以,使用ConsumerGroupCommand命令取而代之。
2) 列出zk下的所有消费组
/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 –list
得到结果为:
3)
创建topic
./kafka-topics.sh
--create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2
--topic 00137F1A
其中指定该topic分为两个区,只有一份数据。Replication有点类似mongodb中的副本集。
4)
新增partition
新增之前,查看topic的详细信息如下:
Topic:2008_0020 PartitionCount:1 ReplicationFactor:2 Configs:
Topic: 2008_0020 Partition: 0 Leader: 2 Replicas: 2,1 Isr:
2,1
/kafka-topics.sh
--zookeeper 127.0.0.1:2181 --alter --partitions 3 --topic 2008_0020
原来只有1个parttions,现在增加到3个。
再次查看topic的信息:
Topic:2008_0020 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: 2008_0020 Partition: 0 Leader: 2 Replicas: 2,1 Isr:
2,1
Topic: 2008_0020 Partition: 1 Leader: 0 Replicas: 0,1 Isr:
0,1
Topic: 2008_0020 Partition: 2 Leader: 1 Replicas: 1,2 Isr:
1,2
我们看到其topic的数目已经发生了改变。
5)
查看topic的详细信息
/kafka-topics.sh
-zookeeper 127.0.0.1:2181 -describe -topic 000B_7F14
得到的结果如下:
Topic: 000B_7F14 PartitionCount:2 ReplicationFactor:1 Configs:
Topic: 000B_7F14 Partition: 0 Leader: 0 Replicas: 0 Isr:
0
Topic: 000B_7F14 Partition: 1 Leader: 1 Replicas: 1 Isr:
1
其中topic:主题名称
PartitionCount:分区数量
ReplicationFactor:表示该topic在不同的broker中需要保存几份,值为1,说明在一个broker中保存,当前使用的是0.10版本,即,kafka 已经能够支持在partition级别进行备份。
ISR:
in-sync replicas,副本集所在的服务器。表示的是当前有效的broker.
6)
使用控制台生产数据
./kafka-console-producer.sh --broker-list 127.0.0.1:19092
--topic S96
然后在下面写入数据并换行,消费者将拿到数据
7)
使用控制台消费数据
./kafka-console-consumer.sh
--zookeeper localhost:2181 --topic S96
--from-beginning
如果后面加上from-beginning则会从最开始的位置进行消费
在使用这条命令的时候,在shell中会提示 using the
new consumer by passing [bootstrap-server] instead of [zookeeper]
这个bootstrap-server具体与zookeeper之间的差别是?
Zookeeper 是直接连接的zk,而bootstrap-server则是连接的broker的ip.因为在0.9之后,kafka使用了新的consumer
API进行消费。旧的API会逐步淘汰。
8)
查看所有的topic
./kafka-topics.sh –zookeeper 127.0.0.1:2181
–list
得到结果为:
2008_0020
000B_7F14
__consumer_offsets
也可以从zk的节点topics中查看,具体的命令为:
ls /config/topics ,得到的结果如下:
[ 2008_0020, __consumer_offsets, 000B_7F14]
同样的,在/brokers/topics中能够得到当前的所有topic
9)
查看某个分区的topic的偏移量最大最小值
./kafka-run-class.sh
kafka.tools.GetOffsetShell –topic 000B_7F14 –broker-list 127.0.0.1:19092 –partitions
0
上面的命令旨在查看000B_7F14的分区1中的最大最小偏移量,得到的结果为:
000B_7F14:0:5544
查看groupid下对该topic消费的情况,可以得到:
./kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker –zookeeper 127.0.0.1:2181 –group 1 –topic
000B_7F14
Group
Topic Pid Offset logSize Lag Owner
1
000B_7F14 0 2094 5544 3450 none
可以看到当前的topic的最大值的确为5544