kafka消息通信原理学习(1)

关于 Topic 和 Partition:

  Topic:

在 kafka 中,topic 是一个存储消息的逻辑概念,可以认为是一个消息集合。每条消息发送到 kafka 集群的消息都有一个类别。物理上来说,不同的 topic 的消息是分开存储的,每个 topic 可以有多个生产者向它发送消息,也可以有多个消费者去消费其中的消息。

  Partition:

  每个 topic 可以划分多个分区(每个 Topic 至少有一个分区),同一 topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka只保证在同一个分区内的消息是有序的。下图中,对于名字为 test 的 topic,做了 3 个分区,分别是p0、p1、p2.

? 每一条消息发送到 broker 时,会根据 partition 的规则选择存储到哪一个 partition。如果 partition 规则设置合理,那么所有的消息会均匀的分布在不同的partition中,这样就有点类似数据库的分库分表的概念,把数据做了分片处理。

  Topic&Partition 的存储:

  Partition 是以文件的形式存储在文件系统中,比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3,命名规则是<topic_name>-<partition_id>,创建3个分区的topic:

sh kafka-topics.sh --create --zookeeper 192.168.254.135:2181 --replication-factor 1 --partitions 3 --topic firstTopic

kafka 消息分发策略:

  消息是 kafka 中最基本的数据单元,在 kafka 中,一条消息由 key、value 两部分构成,在发送一条消息时,我们可以指定这个 key,那么 producer 会根据 key 和 partition 机制来判断当前这条消息应该发送并存储到哪个 partition 中。我们可以根据需要进行扩展 producer 的 partition 机制。

  我们可以通过如下代码来实现自己的分片策略:

public class MyPartition implements Partitioner {//实现Partitioner接口

    private Random random=new Random();
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获得分区列表
        List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic);
        int partitionNum=0;
        if(key==null){
            partitionNum=random.nextInt(partitionInfos.size()); //随机分区
        }else{
            partitionNum=Math.abs((key.hashCode())%partitionInfos.size());
        }
        System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum);
        return partitionNum;  //指定发送的分区值
    }
    @Override
    public void close() {

    }
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

  然后基于之前的代码在producer上需要在消息发送端增加配置:指定自己的partiton策略

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPartition");

消息默认的分发机制:

  默认情况下,kafka 采用的是 hash 取模的分区算法。如果Key 为 null,则会随机分配一个分区。这个随机是在这个参数”metadata.max.age.ms”的时间范围内随机选择一个。对于这个时间段内,如果 key 为 null,则只会发送到唯一的分区。这个值值哦默认情况下是 10 分钟更新一次。关 于 Metadata ,简单理解就是Topic/Partition 和 broker 的映射关系,每一个 topic 的每一个 partition,需要知道对应的 broker 列表是什么,leader是谁、follower 是谁。这些信息都是存储在 Metadata 这个类里面。

消费端如何消费指定的分区:

  通过下面的代码,就可以消费指定该 topic 下的 0 号分区。其他分区的数据就无法接收。

//消费指定分区的时候,不需要再订阅
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消费指定的分区
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消费原理:

  在实际生产过程中,每个 topic 都会有多个 partitions,多个 partitions 的好处在于,一方面能够对 broker 上的数据进行分片有效减少了消息的容量从而提升 io 性能。另外一方面,为了提高消费端的消费能力,一般会通过多个consumer 去消费同一个 topic ,也就是消费端的负载均衡机制,也就是我们接下来要了解的,在多个 partition 以及多个 consumer 的情况下,消费者是如何消费消息的?kafka 存在 consumer group的 概 念 , 也 就是 group.id 一 样 的 consumer ,这些consumer 属于一个 consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区。当然每一个分区只能由同一个消费组内的 consumer 来消费,那么同一个consumer group 里面的 consumer 是怎么去分配该消费哪个分区里的数据的呢?举个简单的例子就是如果存在的分区输,即partiton的数量于comsumer数量一致的时候,每个comsumer对应一个分区,如果comsumer数量多于分区,那么多出来的数量的comsumer将不工作,相反则是其中将会有comsumer消费多个分区。

  分区分配策略:

  在 kafka 中,存在两种分区分配策略,一种是 Range(默认)、另 一 种 另 一 种 还 是 RoundRobin ( 轮 询 )。 通 过comsumer的配置partition.assignment.strategy 这个参数来设置。

  Range strategy(范围分区): 

  Range 策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。假设我们有 10 个分区,3 个消费者,排完序的分区将会是 0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C3-0。然后将 partitions 的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。比如我们有 10 个分区,3 个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区,所以最后分区分配的结果看起来是这样的:

  C1-0 将消费 0, 1, 2, 3 分区

  C2-0 将消费 4, 5, 6 分区

  C3-0 将消费 7, 8, 9 分区

假如我们有 11 个分区,那么最后分区分配的结果看起来是这样的:

  C1-0 将消费 0, 1, 2, 3 分区

  C2-0 将消费 4, 5, 6, 7 分区

  C3-0 将消费 8, 9, 10 分区

假如我们有 2 个主题(T1 和 T2),分别有 10 个分区,那么最后分区分配的结果看起来是这样的:

  C1-0 将消费 T1 主题的 0, 1, 2, 3 分区以及 T2 主题的 0, 1, 2, 3 分区

  C2-0 将消费 T1 主题的 4, 5, 6 分区以及 T2 主题的 4, 5, 6 分区

  C3-0 将消费 T1 主题的 7, 8, 9 分区以及 T2 主题的 7, 8, 9 分区

可以看出,C1-0 消费者线程比其他消费者线程多消费了 2 个分区,这就是 Range strategy 的一个很明显的弊端.

  RoundRobin strategy(轮询分区):

  轮询分区策略是把所有 partition 和所有 consumer 线程都列出来,然后按照 hashcode 进行排序。最后通过轮询算法分配 partition 给消费线程。如果所有 consumer 实例的订阅是相同的,那么 partition 会均匀分布。假如按照 hashCode 排序完的 topic&partitions 组依次为 T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为 C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

  C1-0 将消费 T1-5, T1-2, T1-6 分区;

  C1-1 将消费 T1-3, T1-1, T1-9 分区;

  C2-0 将消费 T1-0, T1-4 分区;

  C2-1 将消费 T1-8, T1-7 分区;

  使用轮询分区策略必须满足两个条件

    1. 每个主题的消费者实例具有相同数量的流

    2. 每个消费者订阅的主题必须是相同的

  什么时候会触发这个策略呢?当出现以下几种情况时,kafka 会进行一次分区分配操作,也就是 kafka consumer 的 rebalance。

  1. 同一个 consumer group 内新增了消费者。

  2. 消费者离开当前所属的 consumer group,比如主动停机或者宕机。

  3. topic 新增了分区(也就是分区数量发生了变化)。

  4.消费者主动取消订阅topic

  kafka consuemr 的 rebalance 机制规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic的每个分区。而具体如何执行分区策略,就是前面提到过的两种内置的分区策略。而 kafka 对于分配策略这块,提供了可插拔的实现方式, 也就是说,除了这两种之外,我们还可以创建自己的分配机制。可以通过继承 AbstractPartitionAssignor 抽象类实现 assign 来做到。

  谁来执行 Rebalance 以及管理 consumer 的 group 呢?

  Kafka 提供了一个角色:coordinator(协调员) 来执行对于 consumer group 的管理,当 consumer group 的第一个 consumer 启动的时候,它会去和 kafka server(broker) 确定谁是它们组的 coordinator。之后该 group 内的所有成员都会和该 coordinator 进行协调通信。consumer group 如何确定自己的 coordinator 是谁呢? 消费 者 向 kafka 集 群 中 的 任 意 一 个 broker 发 送 一 个GroupCoordinatorRequest 请求,服务端会返回一个负载最 小 的 broker 节 点 的 id , 并 将 该 broker 设 置 为coordinator。在 rebalance 之前,需要保证 coordinator 是已经确定好了的,整个 rebalance 的过程分为两个步骤 ,一个是JoinGroup 的过程,在这个过程之后会进入一个Synchronizing Group State 阶段。那么这两个阶段都做了什么呢?

  JoinGroup 的过程:

  表示加入到 consumer group 中,在这一步中,所有的成员都会向 coordinator 发送 joinGroup 的请求。一旦所有成员都发送了 joinGroup 请求,那么 coordinator 会选择一个 consumer 担任 leader 角色,并把组成员信息和订阅信息发送消费者。下图就是描述了这么一个过程,并且请求与响应中携带的一些重要的信息。

  protocol_metadata: 序列化后的消费者的订阅信息

  leader_id: 消费组中的消费者,coordinator 会选择一个座位 leader,对应的就是 member_id

  member_metadata 对应消费者的订阅信息

  members:consumer group 中全部的消费者的订阅信息

  generation_id:年代信息,类似于 zookeeper 的时候的 epoch 是一样的,对于每一轮 rebalance ,generation_id 都会递增。主要用来保护 consumer group。隔离无效的 offset 提交。也就是上一轮的      consumer 成员无法提交 offset 到新的 consumer group 中。

  Synchronizing Group State 阶段:

  进入了 Synchronizing Group State阶段,主要逻辑是向 GroupCoordinator 发 送SyncGroupRequest 请求,并且处理 SyncGroupResponse响应,简单来说,就是 leader 将消费者对应的 partition 分配方案同步给 consumer group 中的所有 consumer,每个消费者都会向 coordinator 发送 syncgroup 请求,不过只有 leader 节点会发送分配方案,其他消费者只是打打酱油而已。当 leader 把方案发给 coordinator 以后,coordinator 会把结果设置到 SyncGroupResponse 中。这样所有成员都知道自己应该消费哪个分区。

  member_assignment :在syncGroup发送请求的时候,只有leader角色的comsumer才会去发送这个信息,而其他消费端是空的。然后会通过coordinator去分发给各个comsumer。

  ? consumer group 的分区分配方案是在客户端执行的!Kafka 将这个权利下放给客户端主要是因为这样做可以有更好的灵活性

offset :

  每个 topic可以划分多个分区(每个 Topic 至少有一个分区),同一topic 下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个 offset(称之为偏移量),它是消息在此分区中的唯一编号,kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序不跨分区,即 kafka 只保证在同一个分区内的消息是有序的; 对于应用层的消费来说,每次消费一个消息并且提交以后,会保存当前消费到的最近的一个 offset。那么 offset 保存在哪里?

  offset 在哪里维护?

  在 kafka 中,提供了一个__consumer_offsets_* 的一个topic , 把 offset 信 息 写 入 到 这 个 topic 中 。__consumer_offsets——按保存了每个 consumer group某一时刻提交的 offset 信息。__consumer_offsets 默认有50 个分区。可以在 /tmp/kafka-logs/ 下查看。那么如何查看对应的 consumer_group 保存在哪个分区中呢?

  通过Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由 于 默 认 情 况 下groupMetadataTopicPartitionCount 有 50 个分区,计算得到的结果为:4, 意味着当前的 consumer_group 的位移信息保存在__consumer_offsets 的第 4个分区,执行如下命令,可以查看当前 consumer_goup 中的offset 位移信息

sh kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 4 --broker-list 192.168.254.135:9092,192.168.254.136:9092,192.168.254.137:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

  执行语句可以看到如下结果:

  这个意思就是 KafkaConsumerDemo1 消费组在 testTopic 中现在的 offsets 现在是 111.

消息的存储:

  首先我们需要了解的是,kafka 是使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,Log 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的明明规则是<topic_name>_<partition_id>比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log,这里可以通过server.properties中的log.dirs=/tmp/kafka-logs去修改)中就有 3 个目录,firstTopic-0~3多个分区在集群中的分配 如果我们对于一个 topic,在集群中创建多个 partition,那么 partition 是如何分布的呢?

1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上

  结合前面讲的消息分发策略,就应该能明白消息发送到 broker 上,消息会保存到哪个分区中,并且消费端应该消费哪些分区的数据了。

原文地址:https://www.cnblogs.com/wuzhenzhao/p/10137490.html

时间: 2024-10-08 11:49:47

kafka消息通信原理学习(1)的相关文章

Spark消息通信原理(一)——Spark消息通信架构

在Spark中定义了通信框架的接口,这些接口中调用了Netty的具体方法(在spark2.x前,使用的是Akka).各接口和实现类的关系如下图所示. 将终端(EndPoint)注册到Rpc环境中: 在各个模块中,如DriverEndPoint.ClientEndPoint.Master.Worker等,会先使用RpcEnv的静态方法创建RpcEnv实例,然后实例化终端,由于终端都是继承与ThreadSafeEpcEndPoint,即创建的终端实例属于线程安全的,接着调用RpcEnv的启动终端方法

Linux学习之路-http通信原理

http通信原理 应用通讯的基本模型分析 基本通讯流程: 客户端http应用使用本机IP+随机注册生成的TCP端口,形成套接字socket,调用系统socket api 再经过网络层.数据链路层.物理层层层封装,把数据送达请求的服务器,经过层层解封,送达对应的http服务监听的套接字socket监听的相应的端口上. 期间涉及到的N种技术: (1)TCP和UDP协议 TCP特点: a.面向连接:收发数居前必须和对方建立可靠连接,一个连接必须经过3次握手 简单过程: 主机A向主机B发出连接请求数据包

SSL 通信原理及Tomcat SSL 配置

SSL 通信原理及Tomcat SSL 双向配置 目录1 参考资料 .................................................................................................................................. 12 SSL(Server Socket Layer)简介 .......................................................

Android Binder跨进程通信原理分析

出发前预备子弹 我们知道进程之间,虚拟地址不同,是不能直接通信的,这是一种保护机制.打开任务管理器,查看一下N多的进程,试想一下如果这些进程直接通信会带来什么后果? 而用户空间可以通过System calls(系统回调)与内核空间通信的,如果在内核空间中有一个模块,能够完成数据的转发,那么是不是两个进程就可以通信了呢?如下图: 上面提到一些用户空间.内核空间的概念,用户空间也能大概猜到是什么东西,而内核空间,就知道它是很底层的东西好了.而模块呢,可以简单的理解为实现一个功能的程序或一个硬件电路等

通信原理之IP协议,ARP协议 (三)

把这三个协议放到一起学习是因为这三个协议处于同一层,ARP协议用来找到目标主机的Ethernet网卡Mac地址,IP则承载要发送的消息.数据链路层可以从ARP得到数据的传送信息,而从IP得到要传输的数据信息. 1.IP协议 IP协议是TCP/IP协议的核心,所有的TCP,UDP,IMCP,IGCP的数据都以IP数据格式传输.要注意的是,IP不是可靠的协议,这是说,IP协议没有提供一种数据未传达以后的处理机制--这被认为是上层协议--TCP或UDP要做的事情.所以这也就出现了TCP是一个可靠的协议

使用内存映射开发高性能进程间消息通信组件

一.背景 项目开发中免不了各模块或系统之间进行消息通信,目前热门的消息中间件有Redis.RabbitMQ.Kafka.RocketMQ等等. 以上几种组件中Redis在消息队列方面表现还可以,但是如果涉及发布订阅功能,就不行了,最近项目就使用了redis的发布订阅, 每秒只能发出几千条,虽然目前绰绰有余,但是瓶颈可以预期. 其余的几种都是比较重量级的消息中间件,什么跨平台.分布式.集群.支持N种协议等等,很高大全, 我们可能就只使用了其中1.2个功能.严格来说,项目中集成这几种MQ的工作量是不

详细说说Binder通信原理与机制

先上一张Binder 的工作流程图.(如果不清晰,可以 复制图片链接到浏览器 或 保存到本地 查看,我经常都是这样看图的哈) 更多Android高级面试合集放在github上面了(更多面试文档,项目下载,源码)https://github.com/xiangjiana/androids需要更多项目下载,源码的小伙伴可以点击关于我 联系我获取 一开始上手,陌生的东西比较多,But,其实并不复杂.喔,流程图是用 ProcessOn 画的.很棒的在线画图工具. 出发前预备子弹 我们知道进程之间,虚拟地

ZeroMQ——一个轻量级的消息通信组件

ZeroMQ是一个轻量级的消息通信组件,尽管名字中包含了"MQ",严格上来讲ZeroMQ并不是"消息队列/消息中间件".ZeroMQ是一个传输层API库, 更关注消息的传输.与消息队列相比,ZeroMQ有以下一些特点: 点对点无中间节点 传统的消息队列都需要一个消息服务器来存储转发消息.而ZeroMQ则放弃了这个模式,把侧重点放在了点对点的消息传输上,并且(试图)做到极致.以为消息服务器最终还是转化为服务器对其他节点的点对点消息传输上.ZeroMQ能缓存消息,但是是

消息通信机制NSNotificationCenter -备

消息通信机制NSNotificationCenter的学习.最近写程序需要用到这类,研究了下,现把成果和 NSNotificationCenter是专门供程序中不同类间的消息通信而设置的,使用起来极为方便, 长话短说. 设置通知,就是说要在什么地方(哪个类)接受通知,一般在初始化中做. [[NSNotificationCenter defaultCenter] addObserver:self selector:@selector(test:) name:@"test" object: