Kafka系统的角色
- Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic
- topic: 可以理解为一个MQ消息队列的名字
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息 都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序。也就是说,一个topic在集群中可以有多个partition,那么分区的策略是什么?(消息发送到哪个分区上,有两种基本的策略,一是采用Key Hash算法,一是采用Round Robin算法)
ookeeper在Kakfa中扮演的角色Kafka将元数据信息保存在Zookeeper中,但是发送给Topic本身的数据是不会发到Zk上的,否则Zk就疯了。
- kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。
- 而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)
- Broker端使用zookeeper来注册broker信息,以及监测partition leader存活性.
- Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息.
- Zookeer和Producer没有建立关系,只和Brokers、Consumers建立关系以实现负载均衡,即同一个Consumer Group中的Consumers可以实现负载均衡
入门
1、简介
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
<ignore_js_op>
2、Topics/logs
一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。它唯一的标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因为在kafka中几乎不允许对消息进行“随机读写”。
<ignore_js_op>
kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支.
对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值..(offset将会保存在zookeeper中,参见下文)
kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响.
partitions的设计目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.(具体原理参见下文).
3、Distribution
一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性.
基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可..由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定.
Producers
Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等.
Consumers
本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.
Guarantees
1) 发送到partitions中的消息将会按照它接收的顺序追加到日志中
2) 对于消费者而言,它们消费消息的顺序和日志中消息顺序一致.
3) 如果Topic的"replicationfactor"为N,那么允许N-1个kafka实例失效.
二、使用场景
1、Messaging
对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
2、Websit activity tracking
kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
3、Log Aggregation
kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
它的架构包括以下组件:
话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。
生产者(Producer):是能够发布消息到话题的任何对象。
服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。
消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。
Kafka存储策略
1)kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。
2)每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
3)每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。
4)发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。
Kafka删除策略
1)N天前的删除。
2)保留最近的MGB数据。
Kafka broker
与其它消息系统不同,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,broker完全不管(有offset managerbroker管理)。
- 从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。
- 这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。
以下摘抄自kafka官方文档:
Kafka Design
目标
1) 高吞吐量来支持高容量的事件流处理
2) 支持从离线系统加载数据
3) 低延迟的消息系统
持久化
1) 依赖文件系统,持久化到本地
2) 数据持久化到log
效率
1) 解决”small IO problem“:
使用”message set“组合消息。
server使用”chunks of messages“写到log。
consumer一次获取大的消息块。
2)解决”byte copying“:
在producer、broker和consumer之间使用统一的binary message format。
使用系统的pagecache。
使用sendfile传输log,避免拷贝。
端到端的批量压缩(End-to-end Batch Compression)
Kafka支持GZIP和Snappy压缩协议。
The Producer
负载均衡
1)producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key)%numPartitions,如果key为null则随机选择一个partition。
2)自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。
异步批量发送
批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。
The Consumer
consumer控制消息的读取。
Push vs Pull
1)producer push data to broker,consumer pull data from broker
2)consumer pull的优点:consumer自己控制消息的读取速度和数量。
3)consumer pull的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有数据。
Consumer Position
1)大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。
2)Kafka由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息。
Message Delivery Semantics
三种:
At most once—Messages may be lost but are never redelivered.
At least once—Messages are never lost but may be redelivered.
Exactly once—this is what people actually want, each message is delivered once and only once.
Producer:有个”acks“配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。
Consumer:
* 读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应”At most once“。
* 读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应”At least once“。
* 读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。
Kafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。
复制(Replication)
1)一个partition的复制个数(replication factor)包括这个partition的leader本身。
2)所有对partition的读和写都通过leader。
3)Followers通过pull获取leader上log(message和offset)
4)如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从”in sync replicas“(ISR)列表中删除。
5)当所有的”in sync replicas“的follower把一个消息写入到自己的log中时,这个消息才被认为是”committed“的。
6)如果针对某个partition的所有复制节点都挂了,Kafka选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。
日志压缩(Log Compaction)
1)针对一个topic的partition,压缩使得Kafka至少知道每个key对应的最后一个值。
2)压缩不会重排序消息。
3)消息的offset是不会变的。
4)消息的offset是顺序的。
Distribution
Consumer Offset Tracking
1)High-level consumer记录每个partition所消费的maximum offset,并定期commit到offset manager(broker)。
2)Simple consumer需要手动管理offset。现在的Simple consumer Java API只支持commit offset到zookeeper。
Consumers and Consumer Groups
1)consumer注册到zookeeper
2)属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费。
3)当broker或同一个group的其他consumer的状态发生变化的时候,consumer rebalance就会发生。
Zookeeper协调控制
1)管理broker与consumer的动态加入与离开。
2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。
3)维护消费关系及每个partition的消费信息。
生产者代码示例: