kafka在zookeeper中的存储结构

参考site:http://kafka.apache.org/documentation.html#impl_zookeeper

1、zookeeper客户端相关命令

在确保zookeeper服务启动状态下,通过 bin/zkCli.sh -server 127.0.0.1:2181 该命令来连接客户端

简单操作如下:

  1. 显示根目录下、文件: ls /  使用 ls 命令来查看当前 ZooKeeper 中所包含的内容

  2. 显示根目录下、文件: ls2 / 查看当前节点数据并能看到更新次数等数据

  3. 创建文件,并设置初始内容: create /zk "test" 创建一个新的 znode节点“ zk ”以及与它关联的字符串

  4. 获取文件内容: get /zk 确认 znode 是否包含我们所创建的字符串

  5. 修改文件内容: set /zk "zkbak" 对 zk 所关联的字符串进行设置

  6. 删除文件: delete /zk 将刚才创建的 znode 删除

  7. 退出客户端: quit

  8. 帮助命令: help

2、topic注册信息

/brokers/topics/[topic] :

存储某个topic的partitions所有分配信息

Schema:
{
    "version": "版本编号目前固定为数字1",
    "partitions": {
        "partitionId编号": [
            同步副本组brokerId列表
        ],
        "partitionId编号": [
            同步副本组brokerId列表
        ],
        .......
    }
}
Example:
{
    "version": 1,
    "partitions": {
	"0":[0,1,2]
     }
}

如下图:

3.partition状态信息

/brokers/topics/[topic]/partitions/[0...N]  其中[0..N]表示partition索引号

/brokers/topics/[topic]/partitions/[partitionId]/state

Schema:
{
    "controller_epoch": 表示kafka集群中的中央控制器选举次数,
    "leader": 表示该partition选举leader的brokerId,
    "version": 版本编号默认为1,
    "leader_epoch": 该partition leader选举次数,
    "isr": [同步副本组brokerId列表]
}

Example:
{
    "controller_epoch":20,
    "leader":0,
    "version":1,
    "leader_epoch":0,
    "isr":[0,1,2]
}

如图:

 

4. broker注册信息

/brokers/ids/[0...N]

每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)

Schema:
{
    "jmx_port": jmx端口号,
    "timestamp": kafka broker初始启动时的时间戳,
    "host": 主机名或ip地址,
    "version": 版本编号默认为1,
    "port": kafka broker的服务端端口号,由server.properties中参数port确定
}

Example:
{
    "jmx_port":1,
    "timestamp":"1452068227537",
    "host":"h1",
    "version":1,
    "port":9092
}

如图:

5. controller epoch

/controller_epoch -> int (epoch)

此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller中央控制器所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;

如图:

6. controller注册信息

/controller -> int (broker id of the controller)  存储center controller中央控制器所在kafka broker的信息

Schema:
{
    "version": 版本编号默认为1,
    "brokerid": kafka集群中broker唯一编号,
    "timestamp": kafka broker中央控制器变更时的时间戳
}

Example:
{
    "version":1,
    "brokerid":0,
    "timestamp":"1452068227409"
}

如图:

7. consumer注册信息

每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

/consumers/[groupId]/ids/[consumerIdString]

是一个临时的znode,此节点的值为请看consumerIdString产生规则,即表示此consumer目前所消费的topic + partitions列表.

consumerId产生规则: 

   StringconsumerUuid = null;
       if(config.consumerId!=null && config.consumerId){
           consumerUuid = consumerId;
       }else {
           String uuid = UUID.randomUUID()
           consumerUuid = "%s-%d-%s".format(
                InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
                uuid.getMostSignificantBits().toHexString.substring(0,8)); 

       }
  String consumerIdString = config.groupId + "_" + consumerUuid;  

Schema:
{
    "version": 版本编号默认为1,
    "subscription": { //订阅topic列表
        "topic名称": consumer中topic消费者线程数
    },
    "pattern": "static",
    "timestamp": "consumer启动时的时间戳"
}

Example:
{
    "version":1,
    "subscription":{
        "replicatedtopic":1
    },
    "pattern":"white_list",
    "timestamp":"1452134230082"
}

如图:

8. consumer owner

/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

当consumer启动时,所触发的操作:

a) 首先进行"Consumer Id注册";

b) 然后在"Consumer id 注册"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,

都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

c) 在"Broker id 注册"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

9. consumer offset

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

用来跟踪每个consumer目前所消费的partition中最大的offset

此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失效,

重新触发balance,其他consumer可以继续消费.

10. topic 配置

/config/topics/[topic_name]

时间: 2024-08-26 21:41:51

kafka在zookeeper中的存储结构的相关文章

kafka笔记-Kafka在zookeeper中的存储结构【转】

参考链接:apache kafka系列之在zookeeper中存储结构  http://blog.csdn.net/lizhitao/article/details/23744675 1.topic注册信息 /brokers/topics/[topic] : 存储某个topic的partitions所有分配信息 Schema: {    "version": "版本编号目前固定为数字1",    "partitions": {        &q

Kafka学习之路 (五)Kafka在zookeeper中的存储

当kafka启动的时候,就会向zookeeper里面注册一些信息,这些数据也称为Kafka的元数据信息. 一.Kafka在zookeeper中存储结构图 二.分析 根目录下的结构 服务端开启的情况下,进入客户端的命令:{zookeeper目录}/bin/zkCli.sh # {zookeeper目录}/bin/zkCli.sh [zk: localhost:2181(CONNECTED) 1] ls / [cluster, controller_epoch, controller, broker

Twitter Storm源代码分析之ZooKeeper中的目录结构

徐明明博客:Twitter Storm源代码分析之ZooKeeper中的目录结构 我们知道Twitter Storm的所有的状态信息都是保存在Zookeeper里面,nimbus通过在zookeeper上面写状态信息来分配任务,supervisor,task通过从zookeeper中读状态来领取任务,同时supervisor, task也会定义发送心跳信息到zookeeper, 使得nimbus可以监控整个storm集群的状态, 从而可以重启一些挂掉的task.ZooKeeper 使得整个sto

使用kafka bin目录中的zookeeper-shell.sh来查看kafka在zookeeper中的配置

cd kafka_2.11-0.10.2.1\bin\windowsecho ls /brokers/ids |  zookeeper-shell.bat localhost:2181 使用kafka bin目录中的zookeeper-shell.sh来查看kafka在zookeeper中的配置. 连接zookeeper: bin/zookeeper-shell.sh 127.0.0.1:2181 https://my.oschina.net/tongyufu/blog/1806196 http

kafka在zookeeper中存储结构

1.topic注册信息 /brokers/topics/[topic] : 存储某个topic的partitions所有分配信息 Schema: {    "version": "版本编号目前固定为数字1",    "partitions": {        "partitionId编号": [            同步副本组brokerId列表        ],        "partitionId编号&q

iOS中keychain存储结构的研究

keychain在ios中是保存在sqlite数据库中的. 这个数据库文件的位置: 真机: /private/var/Keychains/keychain-2.db 虚拟机: /Users/USER-HOME/Library/Developer/CoreSimulator/Devices/26DCA62C-B516-4DEA-A601-5C2D0EA07710/data/Library/Keychains/keychain-2-debug.db 在虚拟机中,这个数据库考出来就不能读了,很奇怪.

Kafka 和 ZooKeeper 的分布式消息队列分析

1. Kafka 总体架构 基于 Kafka-ZooKeeper 的分布式消息队列系统总体架构如下: 如上图所示,一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群.Kafka通过 ZooKeeper 管理集群配置.选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡,在下一课介绍).Pro

深入解析 ObjC 中方法的结构

因为 ObjC 的 runtime 只能在 Mac OS 下才能编译,所以文章中的代码都是在 Mac OS,也就是 x86_64 架构下运行的,对于在 arm64 中运行的代码会特别说明. 在上一篇分析 isa 的文章从 NSObject 的初始化了解 isa中曾经说到过实例方法被调用时,会通过其持有 isa 指针寻找对应的类,然后在其中的 class_data_bits_t 中查找对应的方法,在这一篇文章中会介绍方法在 ObjC 中是如何存储方法的. 这篇文章的首先会根据 ObjC 源代码来分

Apache kafka系列之在zookeeper中存储结构

1.topic注册信息 /brokers/topics/[topic] :存储某个topic的partitions所有分配信息 Schema: { "version": "版本编号目前固定为数字1", "partitions": { "partitionId编号": [ 同步副本组brokerId列表 ], "partitionId编号": [ 同步副本组brokerId列表 ], ....... } }