本文着重介绍几个常用的topic命令行命令,包括listTopic,createTopic,deleteTopic和describeTopic等。由于alterTopic并不是很常用,本文中就不涉及了。另外本文的代码分析是基于kafka_2.10-0.8.2.1的(虽然截图是Kafka 0.8.1的^_^ )
一. list topic 显示所有topic
1. 从zookeeper的/brokers/topics节点下获取所有topic封装成topic集合
2. 遍历该集合,查看每个topic是否是待删除topic——即在/admin/delete_topics下是否存在同名节点。如果是,打印topic已经被标记为删除;否则直接打印topic名称
二、create topic 创建topic
1. 从命令行中获取要创建的topic名称
2. 解析命令行指定的topic配置(如果存在的话),配置都是x=a的格式
3. 若指定了replica-assignment参数表明用户想要自己分配分区副本与broker的映射——通常都不这么做,如果不提供该参数Kafka帮你做这件事情
4. 检查必要的参数是否已指定,包括:zookeeper, replication-factor,partition和topic
5. 获取/brokers/ids下所有broker并按照broker id进行升序排序
6. 在broker上分配各个分区的副本映射 (没有指定replica-assignment参数,这也是默认的情况)
7. 检查topic名字合法性、自定义配置的合法性,并且要保证每个分区都必须有相同的副本数
8. 若zookeeper上已有对应的路径存在,直接抛出异常表示该topic已经存在
9. 确保某个分区的多个副本不会被分配到同一个broker
10. 若提供了自定义的配置,更新zookeeper的/config/topics/[topic]节点的数据
11. 创建/brokers/topics/[topic]节点,并将分区副本分配映射数据写入该节点
三、delete topic 删除topic
1. 获取待删除的topic,如果没有指定--topic就是删除所有的topic
2. 对于每个要删除的topic,在zookeeper上的/admin/delete_topics下创建对应的子节点。kafka目前的删除topic逻辑只是在Zookeeper上标记而已,会有专门的线程负责监听该路径下的变更并负责更新zookeeper上其他节点上的数据,但底层的日志文件目前还是需要手动删除。
四、describe topic 显示topic详细信息
1. 上图可见,如果指定了--topic就是只显示给定topic的信息,否则显示所有topic的详细信息。
2. 如果指定了under-replicated-partitions,那么就显示那些副本数量不足的分区(ISR size < AR.size)
3. 如果指定了unavailable-partitions,那么就显示那些leader副本已不可用的分区
4. 从zookeeper上获取当前所有可用的broker
5. 遍历每个要describe的topic,
6. 获取这个topic的分区副本分配信息,若该信息不存在说明topic不存在
7. 否则将分配信息按照分区号进行排序
10. 如果没有指定步骤2中的参数也没有指定步骤3中的参数,那么显示分区数信息、副本系数信息以及配置信息
11. 默认情况下还会显示各个分区的信息
12. 从zookeeper中获取每个分区的ISR、Leader、AR信息并显示