kafka学习笔记:知识点整理(一)

一、kafka 架构

1.1 拓扑结构

如下图:

图.1

1.2 相关概念

如图.1中,kafka 相关名词解释如下:

1.producer:   消息生产者,发布消息到 kafka 集群的终端或服务。 2.broker:   kafka 集群中包含的服务器。 3.topic:   每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。 4.partition:   partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。 5.consumer:   从 kafka 集群中消费消息的终端或服务。 6.Consumer group:   high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。 7.replica:   partition 的副本,保障 partition 的高可用。 8.leader:   replica 中的一个角色, producer 和 consumer 只跟 leader 交互。 9.follower:   replica 中的一个角色,从 leader 中复制数据。 10.controller:   kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。 12.zookeeper:   kafka 通过 zookeeper 来存储集群的 meta 信息。

1.3 zookeeper 节点

kafka 在 zookeeper 中的存储结构如下图所示:

图.2

二、producer 发布消息

2.1 写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

2.2 消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

1. 指定了 patition,则直接使用; 2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition 3. patition 和 key 都未指定,使用轮询选出一个 patition。

附上 Java 客户端分区源码,一目了然:

//创建消息实例 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null"); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException("Invalid timestamp " + timestamp); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp;
} //计算 patition,如果指定了 patition 则直接使用,否则使用 key 计算 private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
     Integer partition = record.partition(); if (partition != null) {
          List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic()); int lastPartition = partitions.size() - 1; if (partition < 0 || partition > lastPartition) { throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
          } return partition;
     } return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
} // 使用 key 选取 patition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = counter.getAndIncrement();
          List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition();
          } else { return DefaultPartitioner.toPositive(nextValue) % numPartitions;
          }
     } else { //对 keyBytes 进行 hash 选出一个 patition return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
}

2.3 写入流程

producer 写入消息序列图如下所示:

图.3

流程说明:

1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader 2. producer 将消息发送给该 leader 3. leader 将消息写入本地 log 4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK 5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

愿意了解更多技术分享的可关注:mingli.com

朋友需要请加球球:二零四二八四九二三七

2.4 producer delivery guarantee

一般情况下存在三种情况:

1. At most once 消息可能会丢,但绝不会重复传输 2. At least one 消息绝不会丢,但可能会重复传输 3. Exactly once 每条消息肯定会被传输一次且仅传输一次

当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于 replication 的存在,它就不会丢。但是如果 producer 发送数据给 broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还并未实现。所以目前默认情况下一条消息从 producer 到 broker 是确保了 At least once,可通过设置 producer 异步发送实现At most once。

时间: 2024-10-22 06:32:20

kafka学习笔记:知识点整理(一)的相关文章

iOS学习笔记-精华整理

iOS学习笔记总结整理 一.内存管理情况 1- autorelease,当用户的代码在持续运行时,自动释放池是不会被销毁的,这段时间内用户可以安全地使用自动释放的对象.当用户的代码运行告一段 落,开始等待用户的操作,自动释放池就会被释放掉(调用dealloc),池中的对象都会收到一个release,有可能会因此被销毁. 2-成员属性:     readonly:不指定readonly,默认合成getter和setter方法.外界毫不关心的成员,则不要设置任何属性,这样封装能增加代码的独立性和安全

js学习笔记知识点

AJAX用法安全限制JSONPCORS面向对象编程创建对象构造函数原型继承class继承 AJAX 用法 AJAX不是JavaScript的规范,它只是一个哥们"发明"的缩写:Asynchronous JavaScript and XML,意思就是用JavaScript执行异步网络请求 在现代浏览器上写AJAX主要依靠XMLHttpRequest对象: 'use strict'; function success(text) { var textarea = document.getE

bootstrap 新手学习笔记 代码整理

<!DOCTYPE html> <html> <head> <meta charset="utf-8"> <title>Bootstrap 实例 - 滚动监听(Scrollspy)插件方法</title> <link href="css/bootstrap.min.css" rel="stylesheet"> <script src="js/jqu

kafka学习笔记:知识点整理

一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕. 3.扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可. 4.

[Big Data - Kafka] kafka学习笔记:知识点整理

一.为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 2.冗余: 消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险.许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕. 3.扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可. 4.

kafka学习笔记:知识点整理(二)

三.kafka HA 3.1 replication 如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N).没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition.引入replication 之后,同一个 partition 可能会有多个 replic

springMVC学习笔记--知识点总结1

以下是学习springmvc框架时的笔记整理: 结果跳转方式 1.设置ModelAndView,根据view的名称,和视图渲染器跳转到指定的页面. 比如jsp的视图渲染器是如下配置的: <!-- 配置渲染器 --> <bean id="jspViewResolver" class="org.springframework.web.servlet.view.InternalResourceViewResolver"> <property

Kafka 学习笔记之 Producer/Consumer (Scala)

既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition. Producer: import java.util.Properties import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.Ke

基于CentOS6.5的LNMP下memcached学习笔记的整理

1.概述 Memcached是一个C语言编写的高性能分布式的内存对象缓存系统,用于减少Web应用直接跟数据库交互,从而减轻数据库负载压力,将一些经常访问的数据对象以K/V(键/值)对的形式存放在内存中作为缓存数据.缓存是一种加速应用向后端服务器读取数据的优化手段,其思想是用对内存的读取换取直接对磁盘数据库中数据的操作,以提高web应用程序的访问速度,从而实现web动态页面高性能.高并发的访问. Memcached是通过C语言Danga Interactive开发编写的,使用libevent框架实