漫游Kafka设计篇之Producer和Consumer

Kafka Producer

消息发送

producer直接将数据发送到broker的leader(主节点),不需要在多个节点进行分发。为了帮助producer做到这点,所有的Kafka节点都可以及时的告知:哪些节点是活动的,目标topic目标分区的leader在哪。这样producer就可以直接将消息发送到目的地了。

客户端控制消息将被分发到哪个分区。可以通过负载均衡随机的选择,或者使用分区函数。Kafka允许用户实现分区函数,指定分区的key,将消息hash到不同的分区上(当然有需要的话,也可以覆盖这个分区函数自己实现逻辑).比如如果你指定的key是user id,那么同一个用户发送的消息都被发送到同一个分区上。经过分区之后,consumer就可以有目的的消费某个分区的消息。

异步发送

批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。这个策略可以配置的,比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去(比如100条消息就发送,或者每5秒发送一次)。这种策略将大大减少服务端的I/O次数。

既然缓存是在producer端进行的,那么当producer崩溃时,这些消息就会丢失。Kafka0.8.1的异步发送模式还不支持回调,就不能在发送出错时进行处理。Kafka 0.9可能会增加这样的回调函数。见Proposed Producer API.

Kafka Consumer

Kafa consumer消费消息时,向broker发出"fetch"请求去消费特定分区的消息。consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息。customer拥有了offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。

推还是拉?

Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer,也就是pull还push。在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。
一些消息系统比如Scribe和Apache Flume采用了push模式,将消息推送到下游的consumer。这样做有好处也有坏处:由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。消息系统都致力于让consumer以最大的速率最快速的消费消息,但不幸的是,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。最终Kafka还是选取了传统的pull模式。
Pull模式的另外一个好处是consumer可以自主决定是否批量的从broker拉取数据。Push模式必须在不知道下游consumer消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免consumer崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull模式下,consumer就可以根据自己的消费能力去决定这些策略。
Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到t达。为了避免这点,Kafka有个参数可以让consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。

消费状态跟踪

对消费消息状态的记录也是很重要的。
大部分消息系统在broker端的维护消息被消费的记录:一个消息被分发到consumer后broker就马上进行标记或者等待customer的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。
但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,一旦consumer处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到consumer已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果consumer处理消息成功了但是向broker发送响应时失败了,这条消息将被消费两次。第二个问题时,broker必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,
Kafka采用了不同的策略。Topic被分成了若干分区,每个分区在同一时间只被一个consumer消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。
这带来了另外一个好处:consumer可以把offset调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?consumer发现解析数据的程序有bug,在修改bug后再来解析一次消息,看起来是很合理的额呀!

离线处理消息

高级的数据持久化允许consumer每个隔一段时间批量的将数据加载到线下系统中比如Hadoop或者数据仓库。这种情况下,Hadoop可以将加载任务分拆,拆成每个broker或每个topic或每个分区一个加载任务。Hadoop具有任务管理功能,当一个任务失败了就可以重启而不用担心数据被重新加载,只要从上次加载的位置继续加载消息就可以了。

时间: 2024-08-05 19:14:06

漫游Kafka设计篇之Producer和Consumer的相关文章

漫游Kafka设计篇之效率优化

原文地址:http://blog.csdn.net/honglei915/article/details/37564757 Kafka在提高效率方面做了很大努力.Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作.读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也尽量使读的操作更轻量化. 我们之前讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的I/O操作和太多的字节拷贝.I/O问题发生在客户端和服务端之

漫游Kafka设计篇之性能优化

Kafka在提高效率方面做了很大努力.Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作.读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka也尽量使读的操作更轻量化. 我们之前讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的I/O操作和太多的字节拷贝.I/O问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中.消息集(message set)为了避免这些问题,Kafka建立了“消息集(mess

漫游Kafka设计篇之数据持久化

不要畏惧文件系统! Kafka大量依赖文件系统去存储和缓存消息.对于硬盘有个传统的观念是硬盘总是很慢,这使很多人怀疑基于文件系统的架构能否提供优异的性能.实际上硬盘的快慢完全取决于使用它的方式.设计良好的硬盘架构可以和内存一样快.在6块7200转的SATA RAID-5磁盘阵列的线性写速度差不多是600MB/s,但是随即写的速度却是100k/s,差了差不多6000倍.现代的操作系统都对次做了大量的优化,使用了 read-ahead 和 write-behind的技巧,读取的时候成块的预读取数据,

漫游Kafka设计篇之消息传输的事务定义

之前讨论了consumer和producer是怎么工作的,现在来讨论一下数据传输方面.数据传输的事务定义通常有以下三种级别: 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输. 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输. 精确的一次(Exactly once):  不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的. 大多数消息系统声称可以做到“精确的一次”,但是仔细阅读它们的的文档可以看到里面存在误导,比如没有说明当

漫游Kafka设计篇之主从同步

Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量.Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的. Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据. 创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中.

漫游Kafka入门篇之简单介绍

原文地址:http://blog.csdn.net/honglei915/article/details/37564521 介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: Kafka将消息以topic为单位进行归纳. 将向Kafka topic发布消息的程序成为producers. 将预订topics并消费消息的程序成为consumer. Kafka以集群的方式运行,

漫游kafka实战篇之搭建Kafka开发环境

转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647 上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息.下面我们来搭建kafka的开发环境. 添加依赖 搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了.不过我们使用另一种更加流行的方式:使用maven管理jar包依赖. 创建好mav

漫游Kafka实现篇之消息和日志

原文地址:http://blog.csdn.net/honglei915/article/details/37760631 消息格式 消息由一个固定长度的头部和可变长度的字节数组组成.头部包含了一个版本号和CRC32校验码. /** * 具有N个字节的消息的格式如下 * * 如果版本号是0 * * 1. 1个字节的 "magic" 标记 * * 2. 4个字节的CRC32校验码 * * 3. N - 5个字节的具体信息 * * 如果版本号是1 * * 1. 1个字节的 "ma

漫游Kafka实战篇之搭建Kafka运行环境

接下来一步一步搭建Kafka运行环境. Step 1: 下载Kafka 点击下载最新的版本并解压. > tar -xzf kafka_2.9.2-0.8.1.1.tgz > cd kafka_2.9.2-0.8.1.1 Step 2: 启动服务 Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务.可以在命令的结尾加个&符号,这样就可以启动后离开控制台. > bin/zookeeper-server-start.sh