Kafka 0.8协议


介绍

该文档覆盖Kafka
0.8及后续版本所实现的协议,旨在提供一份可读性强的指导手册,包括可用请求、二进制格式和利用协议实现客户端等内容。该文档假设你理解此处描述的基本设计原则和术语。

0.7及之前版本的协议与当前版本类似,但我们选择一次性打破兼容性,来去除一些令人讨厌的东西以及产生新东西。

部分术语

metadata:元数据

broker:[暂无好用翻译]

message:消息

offset:位移

topic:主题

group:群组

partition:分区

概述

Kafka协议相当简单,只有六个客户端请求API:

  1. Metadata – 描述当前可用brokers,以及它们的host和port,并提供哪个broker有哪些partitions的信息。

  2. Send – 发送消息至broker。

  3. Fetch – 从broker获取消息,包括获取数据、获取集群的metadata(元数据)、获取offset(位移)信息。

  4. Offsets – 获取指定topic中partition的可用位移信息。

  5. Offset Commit – 提交一个消费者group的位移集合。

  6. Offset Fetch – 获取一个消费者group的位移集合。

每项内容会在之后详细描述。

预备知识

网络

Kafka使用基于TCP的二进制协议。协议定义了所有请求/响应消息对的API。所有消息有大小限定,并由以下主要类型组成。

客户端初始化一个socket连接,之后写入一系列请求消息,并读取相应的响应消息。在连接和断开连接时不需要执行握手。TCP乐于让你为许多请求保持长连接,并分摊TCP握手的开销。即便不考虑这些,TCP的开销也相当小。

由于数据分区的原因,客户端需要保持对多个brokers的连接,与包含数据的服务端保持通信。尽管如此,一个客户端实例一般没有必要保持对单个broker的多连接(如连接池)。

服务端保证在单个TCP连接上,请求会按照发送时的顺序来执行,返回时的顺序也一样。broker的请求处理模块对于一个连接只允许一个请求在处理中,以此来保证处理的顺序性。注意,客户端可以(理想上应该)使用非阻塞IO来实现请求管道及获取高吞吐量。例如,客户端可以在等待之前请求的响应结果时继续发送请求,因为未处理请求会缓存在底层操作系统的socket缓冲区中。除非有特别说明,否则所有的请求由客户端初始化,并从服务器获取响应结果消息。

服务器端可配置请求大小的最大限制,超过限制则导致socket重连。

分区和引导

Kafka是一种分区系统,因此并非所有的服务器都包含完整的数据集。topics被分割到预定义数目P的分区中,每个分区复制为N份拷贝。topic分区以0,
1, …提交日志顺序[?翻译有些不通顺]。

分区特性在所有系统上都有数据如何分配分区的问题。Kafka客户端直接控制分区分配,brokers没有何种消息该发布到哪个分区的特定语义。客户端发布消息时直接提交消息到指定分区,获取消息时直接从指定分区获取。若两个客户端想使用相同的分区模式,则它们必须有同样的方法来计算key到partition的映射。

这些发布和获取数据的请求必须发送到当前作为指定分区leader的broker上。该条件只broker执行,所以对特定分区的请求若发送到错误的broker上,会导致返回NotLeaderForPartition错误码(后详述)。

客户端如何找出哪些topics存在、包含哪些分区、这些分区在哪些brokers上,从而发送请求到正确的主机上。这个信息是动态的,所以不能以静态映射文件形式配置在客户端。Kafka的所有brokers可以响应metadata请求,以及描述集群的当前状态。

换句话说,客户端需要找到其中一个broker,该broker会告知客户端其他存在的brokers以及它们持有的分区。第一个broker可能无效,因此客户端的实现需要包括两或三个引导启动的URL。用户可以选择在客户端使用负载均衡器或只是静态配置两或三个kafka的hosts。

客户端不需要保持轮询来检测集群是否有变更,它可以在初始化的时候获取并缓存metadata,直到它收到标示metadata过期的错误。这个错误有两种形式:

1)     
socket错误,表示客户端不能与特定的broker通信。

2)      该broker不再持有所请求的数据

  1. 循环访问kafka”引导”地址,直到找到可以连接的一个。获取集群metadata。

  2. 处理获取和生产请求,发送到指定topic/partitions相应的broker

  3. 若产生相应的错误,刷新metadata并重试。

分区策略

上面提到消息分配分区由生产者客户端来控制。这意味着,如何将该功能展示给终端用户是一个问题。

Kafka分区实际上有两个目的:

  1. 在brokers之间均衡数据和请求负载It balances data and request load over brokers

  2. 在分区中允许本地状态和维持顺序,从而在消费者进程间分摊处理。称之为语义分区。

对于给定的应用场景,你可能只关注其中一个或者全部。

为了实现简单的负载均衡,一个简单的客户端实现方法是,对于所有brokers的请求采用Round-Robin方式。在生产者producers远多于brokers的环境中,可以采用单个客户端随机选择一个partition发布消息的方式作为另一个选择。后一个策略的TCP连接更少。

语义分区的意思是使用消息中的某种key来分配消息到不同分区。比如,如果你在处理点击消息流,你可能想通过用户ID来分区,从而使一个特定用户的所有数据进入一个消费者。为了满足这个需求,客户端可以采用与消息相关的某个key,并应用哈希算法来选择分区。

批量处理

我们的API鼓励对批量小数据放在一起进行批处理,以提升效率。我们发现批处理可以获取很大的性能提升。发送和获取消息的API都鼓励操作批量消息而非单条消息。智能的客户端可以利用这一点,并支持“异步”模式,将单个发送的消息分批处理,以更大的数据块发送数据。更进一步的说,允许跨多个topics和partitions的批处理,因此生产消息请求可能包括多个分区的数据,获取消息请求可能一次性从多个分区拉取数据。

客户端实现也可以选择一次发送一条消息,从而忽略这个特性。

版本管理和兼容性

协议设计需要能够在向后兼容模式上进行增量式进化[?翻译不顺]。版本控制建立在每个API的基础上,每个版本包含一个请求和响应对。每个请求包含一个API
key来标示调用的API,以及一个版本号来标示请求的格式及期望的响应格式。

这样做的目的是,客户端可以实现协议的特定版本,并在请求中标示该版本。我们的目标是,在不允许停机以及客户端和服务器不能一次性全部变更的情况下,允许API的进化。

服务端拒绝不支持的版本请求,并且响应给客户端,告知基于该请求版本的期望协议格式。期望的升级方式是,新的特性先在服务端铺开(旧客户端不会使用它们),当新客户端布署时,这些新特性逐渐得到使用。

当前所有版本的基数值为0,我们引入这些API时,会指出每个版本各自的格式。

协议

Protocol Primitive Types

The protocol is built out of the following primitive types.

Fixed Width
Primitives

int8, int16, int32, int64 - Signed integers with
the given precision (in bits) stored in big endian order.

Variable Length
Primitives

bytes, string - These types consist of a signed integer giving a
length N followed by N bytes of content. A length of -1 indicates null. string
uses an int16 for its size, and bytes uses an int32.

Arrays

This is a notation for handling repeated structures. These will
always be encoded as an int32 size containing the length N followed by N
repetitions of the structure which can itself be made up of other primitive
types. In the BNF grammars below we will show an array of a structure foo as
[foo].

Optional
entries

Certain fields may be present under certain conditions (say if
there is no error). These are represented as |foo|.

Notes on reading the request format
grammars

The BNFs
below give an exact context free grammar for the request and response binary
format. For each API I will give the request and response together followed by
all the sub-definitions. The BNF is intentionally not compact in order to give
human-readable name (for example I define a production for ErrorCode even though
it is just an int16 in order to give it a symbolic name). As always in a BNF a
sequence of productions indicates concatenation, so the MetadataRequest given
below would be a sequence of bytes containing first a VersionId, then a
ClientId, and then an array of TopicNames (each of which has its own
definition). Productions are always given in camel case and primitive types in
lower case. When there are multiple possible productions these
are separated with ‘|‘ and may be enclosed in parenthesis for
grouping. The top-level definition is always given first and subsequent
sub-parts are indented.

Common Request and Response Structure

All requests and responses originate from the following grammar
which will be incrementally describe through the rest of this document:












RequestOrResponse => Size (RequestMessage |
ResponseMessage)

Size => int32

Field

Description

MessageSize

The MessageSize field gives the size of the subsequent
request or response message in bytes. The client can read requests by
first reading this 4 byte size as an integer N, and then reading and
parsing the subsequent N bytes of the request.

Requests

Requests all have the following format:





















RequestMessage => ApiKey ApiVersion CorrelationId
ClientId RequestMessage

ApiKey => int16

ApiVersion => int16

CorrelationId => int32

ClientId => string

RequestMessage => MetadataRequest |
ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest |
OffsetFetchRequest

Field

Description

ApiKey

This is a numeric id for the API being invoked (i.e. is it
a metadata request, a produce request, a fetch request, etc).

ApiVersion

This is a numeric version number for this api. We version
each API and this version number allows the server to properly interpret
the request as the protocol evolves. Responses will always be in the
format corresponding to the request version. Currently the supported
version for all APIs is 0.

CorrelationId

This is a user-supplied integer. It will be passed back in
the response by the server, unmodified. It is useful for matching request
and response between the client and server.

ClientId

This is a user supplied identifier for the client
application. The user can use any identifier they like and it will be used
when logging errors, monitoring aggregates, etc. For example, one might
want to monitor not just the requests per second overall, but the number
coming from each client application (each of which could reside on
multiple servers). This id acts as a logical grouping across all requests
from a particular client.

The various request and response messages will be described
below.

Responses












Response => CorrelationId ResponseMessage

CorrelationId => int32

ResponseMessage => MetadataResponse | ProduceResponse |
FetchResponse | OffsetResponse | OffsetCommitResponse |
OffsetFetchResponse

Field

Description

CorrelationId

The server passes back whatever integer the client
supplied as the correlation in the request.

The response will always match the paired request (e.g. we will
send a MetadataResponse in return to a MetadataRequest).

Message sets

One structure common to both the produce and fetch requests is
the message set format. A message in kafka is a key-value pair with a small
amount of associated metadata. A message set is just a sequence of messages with
offset and size information. This format happens to be used both for the on-disk
storage on the broker and the on-the-wire format.

A message set is also the unit of compression in Kafka, and we
allow messages to recursively contain compressed message sets to allow batch
compression.

N.B., MessageSets are not preceded by an int32 like other array
elements in the protocol.





MessageSet => [Offset MessageSize Message]

Offset => int64

MessageSize => int32

Message format



























Message => Crc MagicByte Attributes Key Value

Crc => int32

MagicByte => int8

Attributes => int8

Key => bytes

Value => bytes

Field

Description

Offset

This is the offset used in kafka as the log sequence
number. When the producer is sending messages it doesn‘t actually know the
offset and can fill in any value here it likes.

Crc

The CRC is the CRC32 of the remainder of the message
bytes. This is used to check the integrity of the message on the broker
and consumer.

MagicByte

This is a version id used to allow backwards compatible
evolution of the message binary format.

Attributes

This byte holds metadata attributes about the message. The
lowest 2 bits contain the compression codec used for the message. The
other bits should be set to 0.

Key

The key is an optional message key that was used for
partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte
array. Kafka supports recursive messages in which case this may itself
contain a message set. The message can be null.

Compression

Kafka supports compressing messages for additional efficiency,
however this is more complex than just compressing a raw message. Because
individual messages may not have sufficient redundancy to enable good
compression ratios, compressed messages must be sent in special batches
(although you may use a batch of one if you truly wish to compress a message on
its own). The messages to be sent are wrapped (uncompressed) in a MessageSet
structure, which is then compressed and stored in the Value field of a single
"Message" with the appropriate compression codec set. The receiving system
parses the actual MessageSet from the decompressed value.

Kafka currently supports two compression codecs with the
following codec numbers:















Compression

Codec

None

0

GZIP

1

Snappy

2

The APIs

This section gives details on each of the individual APIs, their
usage, their binary format, and the meaning of their fields.

Metadata API

This API answers the following questions:

  • What topics exist?

  • How many partitions does each topic have?

  • Which broker is currently the leader for each partition?

  • What is the host and port for each of these brokers

This is the only request that can be addressed to any broker in
the cluster.

Since there may be many topics the client can give an optional
list of topic names in order to only return metadata for a subset of topics.

The metdata returned is at the partition level, but grouped
together by topic for convenience and to avoid redundancy. For each partition
the metadata contains the information for the leader as well as for all the
replicas and the list of replicas that are currently in-sync.

Metadata Request












MetadataRequest => [TopicName]

TopicName => string

Field

Description

TopicName

The topics to produce metadata for. If empty the request
will yield metadata for all topics.

Metadata Response

The response contains metadata for each partition, with
partitions grouped together by topic. This metadata refers to brokers by their
broker id. The brokers each have a host and port.





















MetadataResponse => [Broker][TopicMetadata]

Broker => NodeId Host Port

NodeId => int32

Host => string

Port => int32

TopicMetadata => TopicErrorCode TopicName
[PartitionMetadata]

TopicErrorCode => int16

PartitionMetadata => PartitionErrorCode
PartitionId Leader Replicas Isr

PartitionErrorCode => int16

PartitionId => int32

Leader => int32

Replicas => [int32]

Isr => [int32]

Field

Description

Leader

The node id for the kafka broker currently acting as
leader for this partition. If no leader exists because we are in the
middle of a leader election this id will be -1.

Replicas

The set of alive nodes that currently acts as slaves for
the leader for this partition.

Isr

The set subset of the replicas that are "caught up" to the
leader

Broker

The node id, hostname, and port information for a kafka
broker

Produce API

The produce API is used to send message sets to the server. For
efficiency it allows sending message sets intended for many topic partitions in
a single request.

The produce API uses the generic message set format, but since
no offset has been assigned to the messages at the time of the send the producer
is free to fill in that field in any way it likes.

Produce Request



























ProduceRequest => RequiredAcks Timeout [TopicName
[Partition MessageSetSize MessageSet]]

RequiredAcks => int16

Timeout => int32

Partition => int32

MessageSetSize => int32

Field

Description

RequiredAcks

This field indicates how many acknowledgements the servers
should receive before responding to the request. If it is 0 the server
will not send any response (this is the only case where the server will
not reply to a request). If it is 1, the server will wait the data is
written to the local log before sending a response. If it is -1 the server
will block until the message is committed by all in sync replicas before
sending a response. For any number > 1 the server will block waiting
for this number of acknowledgements to occur (but the server will never
wait for more acknowledgements than there are in-sync replicas).

Timeout

This provides a maximum time in milliseconds the server
can await the receipt of the number of acknowledgements in RequiredAcks.
The timeout is not an exact limit on the request time for a few reasons:
(1) it does not include network latency, (2) the timer begins at the
beginning of the processing of this request so if many requests are queued
due to server overload that wait time will not be included, (3) we will
not terminate a local write so if the local write time exceeds this
timeout it will not be respected. To get a hard timeout of this type the
client should use the socket timeout.

TopicName

The topic that data is being published to.

Partition

The partition that data is being published to.

MessageSetSize

The size, in bytes, of the message set that follows.

MessageSet

A set of messages in the standard format described
above.

Produce Response






















ProduceResponse => [TopicName [Partition ErrorCode
Offset]]

TopicName => string

Partition => int32

ErrorCode => int16

Offset => int64

Field

Description

Topic

The topic this response entry corresponds to.

Partition

The partition this response entry corresponds to.

ErrorCode

The error from this partition, if any. Errors are given on
a per-partition basis because a given partition may be unavailable or
maintained on a different host, while others may have successfully
accepted the produce request.

Offset

The offset assigned to the first message in the message
set appended to this partition.

Fetch API

The fetch API is used to fetch a chunk of one or more logs for
some topic-partitions. Logically one specifies the topics, partitions, and
starting offset at which to begin the fetch and gets back a chunk of
messages.

Fetch requests follow a long poll model so they can be made to
block for a period of time if sufficient data is not immediately available.

As an optimization the server is allowed to return a partial
message at the end of the message set. Clients should handle this case.

One thing to note is that the fetch API requires specifying the
partition to consume from. The question is how should a consumer know what
partitions to consume from? In particular how can you balance the partitions
over a set of consumers acting as a group so that each consumer gets a subset of
partitions. We have done this assignment dynamically using zookeeper for the
scala and java client. The downside of this approach is that it requires a
fairly fat client and a zookeeper connection. We haven‘t yet created a Kafka API
to allow this functionality to be moved to the server side and accessed more
conveniently. A simple consumer client can be implemented by simply requiring
that the partitions be specified in config, though this will not allow dynamic
reassignment of partitions should that consumer fail. We hope to address this
gap in the next major release.

Fetch Request































FetchRequest => ReplicaId MaxWaitTime MinBytes
[TopicName [Partition FetchOffset MaxBytes]]

ReplicaId => int32

MaxWaitTime => int32

MinBytes => int32

TopicName => string

Partition => int32

FetchOffset => int64

MaxBytes => int32

Field

Description

ReplicaId

The replica id indicates the node id of the replica
initiating this request. Normal client consumers should always specify
this as -1 as they have no node id. Other brokers set this to be their own
node id. The value -2 is accepted to allow a non-broker to issue fetch
requests as if it were a replica broker for debugging purposes.

MaxWaitTime

The max wait time is the maximum amount of time in
milliseconds to block waiting if insufficient data is available
at the time the request is issued.

MinBytes

This is the minimum number of bytes of messages that must
be available to give a response. If the client sets this to 0 the server
will always respond immediately, however if there is no new data since
their last request they will just get back empty message sets. If this is
set to 1, the server will respond as soon as at least one partition has at
least 1 byte of data or the specified timeout occurs. By setting higher
values in combination with the timeout the consumer can tune for
throughput and trade a little additional latency for reading only large
chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to
64k would allow the server to wait up to 100ms to try to accumulate 64k of
data before responding).

TopicName

The name of the topic.

Partition

The id of the partition the fetch is for.

FetchOffset

The offset to begin this fetch from.

MaxBytes

The maximum bytes to include in the message set for this
partition. This helps bound the size of the response.

Fetch Response

























FetchResponse => [TopicName [Partition ErrorCode
HighwaterMarkOffset MessageSetSize MessageSet]]

TopicName => string

Partition => int32

ErrorCode => int16

HighwaterMarkOffset => int64

MessageSetSize => int32

Field

Description

TopicName

The name of the topic this response entry is for.

Partition

The id of the partition this response is for.

HighwaterMarkOffset

The offset at the end of the log for this partition. This
can be used by the client to determine how many messages behind the end of
the log they are.

MessageSetSize

The size in bytes of the message set for this
partition

MessageSet

The message data fetched from this partition, in the
format described above.

Offset API

This API describes the valid offset range available for a set of
topic-partitions. As with the produce and fetch APIs requests must be directed
to the broker that is currently the leader for the partitions in question. This
can be determined using the metadata API.

The response contains the starting offset of each segment for
the requested partition as well as the "log end offset" i.e. the offset of the
next message that would be appended to the given partition.

We agree that this API is slightly funky.

Offset Request













OffsetRequest => ReplicaId [TopicName [Partition Time
MaxNumberOfOffsets]]

ReplicaId => int32

TopicName => string

Partition => int32

Time => int64

MaxNumberOfOffsets => int32

Field

Decription

Time

Used to ask for all messages before a certain time (ms).
There are two special values. Specify -1 to receive the latest offsets and
-2 to receive the earliest available offset. Note that because offsets are
pulled in descending order, asking for the earliest offset will always
return you a single element.

Offset Response




OffsetResponse => [TopicName [PartitionOffsets]]

PartitionOffsets => Partition ErrorCode
[Offset]

Partition => int32

ErrorCode => int16

Offset => int64

Offset Commit/Fetch API

These APIs allow for centralized management of offsets. Read
more Offset
Management
. As per comments on KAFKA-993
these API calls are not fully functional in releases until Kafka 0.8.1.1. It
will be available in the 0.8.2 release.

Consumer Metadata Request

The offsets for a given consumer group are maintained by a
specific broker called the offset coordinator. i.e., a consumer needs to issue
its offset commit and fetch requests to this specific broker. It can discover
the current offset coordinator by issuing a consumer metadata request.





ConsumerMetadataRequest => ConsumerGroup

ConsumerGroup => string

Consumer Metadata Response

On a successful (ErrorCode == 0) response, the coordinator
fields provide the id/host/port details of the offset coordinator.





ConsumerMetadataResponse => ErrorCode |CoordinatorId
CoordinatorHost CoordinatorPort|

ErrorCode => int16

CoordinatorId => int32

CoordinatorHost => string

CoordinatorPort => int32

Offset Commit Request




OffsetCommitRequest => ConsumerGroup [TopicName
[Partition Offset TimeStamp Metadata]]

ConsumerGroup => string

TopicName => string

Partition => int32

Offset => int64

TimeStamp => int64

Metadata => string

(If the time stamp field is set to -1, then the broker sets the
time stamp to the receive time before committing the offset.)

Offset Commit Response




OffsetCommitResponse => [TopicName [Partition
ErrorCode]]]

TopicName => string

Partition => int32

ErrorCode => int16

Offset Fetch Request




OffsetFetchRequest => ConsumerGroup [TopicName
[Partition]]

ConsumerGroup => string

TopicName => string

Partition => int32

Offset Fetch Response




OffsetFetchResponse => [TopicName [Partition Offset
Metadata ErrorCode]]

TopicName => string

Partition => int32

Offset => int64

Metadata => string

ErrorCode => int16

Note that if there is no offset associated with a
topic-partition under that consumer group the broker does not set an error code
(since it is not really an error), but returns empty metadata and sets the
offset field to -1.

Constants

Api Keys

The following are the numeric codes that the ApiKey in the
request can take for each of the above request types.






























API name

ApiKey Value

ProduceRequest

0

FetchRequest

1

OffsetRequest

2

MetadataRequest

3

Non-user facing control APIs

4-7

OffsetCommitRequest

8

OffsetFetchRequest

9

ConsumerMetadataRequest

10

Error Codes

We use numeric codes to indicate what problem occurred on the
server. These can be translated by the client into exceptions or whatever the
appropriate error handling mechanism in the client language. Here is a table of
the error codes currently in use:











































































Error

Code

Description

NoError

0

No error--it worked!

Unknown

-1

An unexpected server error

OffsetOutOfRange

1

The requested offset is outside the range of offsets
maintained by the server for the given topic/partition.

InvalidMessage

2

This indicates that a message contents does not match its
CRC

UnknownTopicOrPartition

3

This request is for a topic or partition that does not
exist on this broker.

InvalidMessageSize

4

The message has a negative size

LeaderNotAvailable

5

This error is thrown if we are in the middle of a
leadership election and there is currently no leader for this partition
and hence it is unavailable for writes.

NotLeaderForPartition

6

This error is thrown if the client attempts to send
messages to a replica that is not the leader for some partition. It
indicates that the clients metadata is out of date.

RequestTimedOut

7

This error is thrown if the request exceeds the
user-specified time limit in the request.

BrokerNotAvailable

8

This is not a client facing error and is used only
internally by intra-cluster broker communication.

Unused

9

Unused

MessageSizeTooLarge

10

The server has a configurable maximum message size to
avoid unbounded memory allocation. This error is thrown if the client
attempt to produce a message larger than this maximum.

StaleControllerEpochCode

11

Internal error code for broker-to-broker
communication.

OffsetMetadataTooLargeCode

12

If you specify a string larger than configured maximum for
offset metadata

OffsetsLoadInProgressCode

14

The broker returns this error code for an offset fetch
request if it is still loading offsets (after a leader change for that
offsets topic partition).

ConsumerCoordinatorNotAvailableCode

15

The broker returns this error code for consumer metadata
requests or offset commit requests if the offsets topic has not yet been
created.

NotCoordinatorForConsumerCode

16

The broker returns this error code if it receives an
offset fetch or commit request for a consumer group that it is not a
coordinator for.

Some Common Philosophical Questions

Some people have asked why we don‘t use HTTP. There are a number
of reasons, the best is that client implementors can make use of some of the
more advanced TCP features--the ability to multiplex requests, the ability to
simultaneously poll many connections, etc. We have also found HTTP libraries in
many languages to be surprisingly shabby.

Others have asked if maybe we shouldn‘t support many different
protocols. Prior experience with this was that it makes it very hard to add and
test new features if they have to be ported across many protocol
implementations. Our feeling is that most users don‘t really see multiple
protocols as a feature, they just want a good reliable client in the language of
their choice.

Another question is why we don‘t adopt XMPP, STOMP, AMQP or an
existing protocol. The answer to this varies by protocol, but in general the
problem is that the protocol does determine large parts of the implementation
and we couldn‘t do what we are doing if we didn‘t have control over the
protocol. Our belief is that it is possible to do better than existing messaging
systems have in providing a truly distributed messaging system, and to do this
we need to build something that works differently.

A final question is why we don‘t use a system like Protocol
Buffers or Thrift to define our request messages. These packages excel at
helping you to managing lots and lots of serialized messages. However we have
only a few messages. Support across languages is somewhat spotty (depending on
the package). Finally the mapping between binary log format and wire protocol is
something we manage somewhat carefully and this would not be possible with these
systems. Finally we prefer the style of versioning APIs explicitly and checking
this to inferring new values as nulls as it allows more nuanced control
of compatibility.

Kafka 0.8协议,布布扣,bubuko.com

时间: 2024-10-24 09:54:08

Kafka 0.8协议的相关文章

kafka 0.8.1 新producer 源码简单分析

1 背景 最近由于项目需要,需要使用kafka的producer.但是对于c++,kafka官方并没有很好的支持. 在kafka官网上可以找到0.8.x的客户端.可以使用的客户端有C版本客户端,此客户端虽然目前看来还较为活跃,但是代码问题还是较多的,而且对于c++的支持并不是很好. 还有c++版本,虽然该客户端是按照c++的思路设计,但是最近更新时间为2013年12月19日,已经很久没有更新了. 从官方了解到,kafka作者对于现有的producer和consumer的设计是不太满意的.他们打算

Kafka的通讯协议

Kafka的Producer.Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议.Kafka的这套协议完全是为了Kafka自身的业务需求而定制的,而非要实现一套类似于Protocol Buffer的通用协议.本文将介绍这套协议的相关内容. 基本数据类型 定长数据类型:int8,int16,int32和int64,对应到Java中就是byte, short, int和long. 变长数据类型:bytes和string.变长的数据类型由两部分组成,分别是一个有符号整数N(表示

Kafka 0.11客户端集群管理工具AdminClient

很多用户都有直接使用程序API操作Kafka集群的需求.在0.11版本之前,kafka的服务器端代码(即添加kafka_2.**依赖)提供了AdminClient和AdminUtils可以提供部分的集群管理操作,但社区官网主页并没有给出这两个类的使用文档.用户只能自行查看源代码和测试用例才能了解具体的使用方法.倘若使用客户端API的话(即添加kafka_clients依赖),用户必须构造特定的请求并自觉编写代码向指定broker创建Socket连接并发送请求,同样是十分繁琐.故Kafka 0.1

Kafka 0.10问题点滴

15.如何消费内部topic: __consumer_offsets 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter 最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[].核心代码如下: // com.sina.mis.app.ConsumerInnerTopic             ConsumerRecords<byte[], byte[]> records = consumer.poll(512);    

(转)OAuth 2.0授权协议详解和流程

这篇文章主要介绍了OAuth 2.0授权协议详解,本文对OAuth协议做了详解讲解,对OAuth协议的各个方面做了分解,读完本文你就会知道到底啥是OAuth了,需要的朋友可以参考下 OAuth是一个关于授权(authorization)的开放网络标准,在全世界得到广泛应用,目前的版本是2.0版.本文对OAuth 2.0的设计思路和运行流程,做一个简明通俗的解释,主要参考材料为RFC 6749. 一.应用场景 为了理解OAuth的适用场合,让我举一个假设的例子.有一个"云冲印"的网站,可

Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新Client API的使用要点,高可用性测试,以及各种坑 (转载)

Kafka 0.9版本对java client的api做出了较大调整,本文主要总结了Kafka 0.9在集群搭建.高可用性.新API方面的相关过程和细节,以及本人在安装调试过程中踩出的各种坑. 关于Kafka的结构.功能.特点.适用场景等,网上到处都是,我就不再赘述了,直接进入正文 Kafka 0.9集群安装配置 操作系统:CentOS 6.5 1. 安装Java环境 Zookeeper和Kafka的运行都需要Java环境,所以先安装JRE,Kafka默认使用G1垃圾回收器,如果不更改垃圾回收器

Kafka 0.9+Zookeeper3.4.6集群搭建、配置,新版Java Client的使用要点,高可用性测试,以及各种坑(二)

上一节中(点此传送),我们完成了Kafka集群的搭建,本节中我们将介绍0.9版本中的新API,以及Kafka集群高可用性的测试 1. 使用Kafka的Producer API来完成消息的推送 1) Kafka 0.9.0.1的java client依赖: <dependency>     <groupId>org.apache.kafka</groupId>     <artifactId>kafka-clients</artifactId>  

oauth2.0认证协议初解

关于oauth2.0认证协议的好处,这里不再做说明,如果是老鸟,也请不要再往下阅读,本文纯粹是作为一名菜鸟对oauth2.0的理解. 首先,要oauth认证需要几个东西:客户端.服务端以及第三方服务提供方,客户端是相对服务端来讲的,实际上第三方服务提供方往往就是客户端的拥有者,就拿某微博第三方服务来说,新浪微博就是服务端,客户端就是第三方服务提供商提供给服务端也就是新浪微博的身份标识,用来给第三方认证提供一个平台,第三方服务通过这个客户端取得用户对这个客户端的授权,然后第三方服务就可以调用一些新

【原创】Kafka 0.11消息设计

Kafka 0.11版本增加了很多新功能,包括支持事务.精确一次处理语义和幂等producer等,而实现这些新功能的前提就是要提供支持这些功能的新版本消息格式,同时也要维护与老版本的兼容性.本文将详细探讨Kafka 0.11新版本消息格式的设计,其中会着重比较新旧两版本消息格式在设计上的异同.毕竟只有深入理解了Kafka的消息设计,我们才能更好地学习Kafka所提供的各种功能. 1. Kafka消息层次设计 不管是0.11版本还是之前的版本,Kafka的消息层次都是分为两层:消息集合(messa