Kafka 客户端实现逻辑分析

这里主要分析kafka 客户端实现 (代码分析以perl kafka实现为准)

kafka客户端分为生产者和消费者,生产者发送消息,消费者获取消息.

在kafka协议里客户端通信中用到的最多的四个协议命令是fetch,fetchoffset,send,metadata.这四个分别是获取消息,获取offset,发送消息,获取metadata.剩下的其他协议命令大多都是kafka server内部通信用到的.offsetcommit这个命令在有些语言的client api的实现里给出了接口可以自己提交offset.但是在perl的实现里并没有.

先看看直接producer和consumer的代码

my $request = {
        ApiKey                              => $APIKEY_PRODUCE,
        CorrelationId                       => $self->{CorrelationId},
        ClientId                            => $self->{ClientId},
        RequiredAcks                        => $self->{RequiredAcks},
        Timeout                             => $self->{Timeout} * 1000,
        topics                              => [
            {
                TopicName                   => $topic,
                partitions                  => [
                    {
                        Partition           => $partition,
                        MessageSet          => $MessageSet,
                    },
                ],
            },
        ],
    };

    foreach my $message ( @$messages ) {
        push @$MessageSet, {
            Offset  => $PRODUCER_ANY_OFFSET,
            Key     => $key,
            Value   => $message,
        };
    }

    return $self->{Connection}->receive_response_to_request( $request, $compression_codec );

代码并未完全贴上.核心代码就这一部分.最后一行代码可以看见最终调用connection::receive_response_to_request函数.再上面的部分是设置消息格式.和消息内容的数据结构.

my $request = {
        ApiKey                              => $APIKEY_FETCH,
        CorrelationId                       => $self->{CorrelationId},
        ClientId                            => $self->{ClientId},
        MaxWaitTime                         => $self->{MaxWaitTime},
        MinBytes                            => $self->{MinBytes},
        topics                              => [
            {
                TopicName                   => $topic,
                partitions                  => [
                    {
                        Partition           => $partition,
                        FetchOffset         => $start_offset,
                        MaxBytes            => $max_size // $self->{MaxBytes},
                    },
                ],
            },
        ],
    };

    my $response = $self->{Connection}->receive_response_to_request( $request );

这是consumer的获取消息的核心部分代码.最后同producer一样.代码结构也相似.同样是设置消息数据结构然后发送.只是最后多了代码返回处理的部分.消息返回处理的部分就不再贴上详细说明了.有兴趣自行去cpan上看源代码.

下面看看最核心的函数代码.

sub receive_response_to_request {
    my ( $self, $request, $compression_codec ) = @_;

    local $Data::Dumper::Sortkeys = 1 if $self->debug_level;

    my $api_key = $request->{ApiKey};  //这里获取请求类型,是发送消息,还是获取消息和offset的.

# WARNING: The current version of the module limited to the following:
# supports queries with only one combination of topic + partition (first and only).

    my $topic_data  = $request->{topics}->[0];  //这些消息具体处理就略过不提了.
    my $topic_name  = $topic_data->{TopicName};
    my $partition   = $topic_data->{partitions}->[0]->{Partition};

    if (  //这里是比较关键的.判断是否有完整的metadata信息.没有metadata信息就通过fetch meta命令获取.
           !%{ $self->{_metadata} }         # the first request
        || ( !$self->{AutoCreateTopicsEnable} && defined( $topic_name ) && !exists( $self->{_metadata}->{ $topic_name } ) )
    ) {        //updata_metadata函数就是封装了fetch metadata请求命令发送给kafka 来获取metadata信息.在这个地方处理不同语言里处理逻辑多少有些差别.php-kafka中有两种方式,一种通过这里的这个方法.另一种是通过zookeeper获取meta信息.在使用的时候需要指定zookeeper地址.
        $self->_update_metadata( $topic_name )  # hash metadata could be updated
            # FATAL error
            or $self->_error( $ERROR_CANNOT_GET_METADATA, format_message( "topic = ‘%s‘", $topic_name ) );
    }
    my $encoded_request = $protocol{ $api_key }->{encode}->( $request, $compression_codec );  //这里将消息格式化成网络字节序.

    my $CorrelationId = $request->{CorrelationId} // _get_CorrelationId;

    say STDERR sprintf( ‘[%s] compression_codec = %d, request: %s‘,
            scalar( localtime ),
            $compression_codec // ‘<undef>‘,
            Data::Dumper->Dump( [ $request ], [ ‘request‘ ] )
        ) if $self->debug_level;

    my $attempts = $self->{SEND_MAX_ATTEMPTS};
    my ( $ErrorCode, $partition_data, $server );
    ATTEMPTS:
    while ( $attempts-- ) {  //在while里进行发送尝试.java版客户端的三次尝试即是这里同样的逻辑
        REQUEST:
        {
            $ErrorCode = $ERROR_NO_ERROR;            //这里差早topic分区对应的leader,成功则进行leader连接发送请求
            if ( defined( my $leader = $self->{_metadata}->{ $topic_name }->{ $partition }->{Leader} ) ) {   # hash metadata could be updated
                unless ( $server = $self->{_leaders}->{ $leader } ) {  //没有找到对应leader的server就跳过此次请求尝试,更新metadata并进行下一次尝试
                    $ErrorCode = $ERROR_LEADER_NOT_FOUND;
                    $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
                    last REQUEST;       # go to the next attempt  //在这里跳出主逻辑块.进行块之后的动作.
                }

                # Send a request to the leader
                if ( !$self->_connectIO( $server ) ) {  //这里连接此topic分区的leader
                    $ErrorCode = $ERROR_CANNOT_BIND;
                } elsif ( !$self->_sendIO( $server, $encoded_request ) ) {  //这里向这个leader发送请求
                    $ErrorCode = $ERROR_CANNOT_SEND;
                }
                if ( $ErrorCode != $ERROR_NO_ERROR ) {  //判断动作是否成功
                    $self->_remember_nonfatal_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error}, $server, $topic_name, $partition );
                    last REQUEST;    # go to the next attempt
                }

                my $response; //这里处理返回情况.如果发送的produce请求并且没有任何response返回.则构建一个空的response返回.
                if ( $api_key == $APIKEY_PRODUCE && $request->{RequiredAcks} == $NOT_SEND_ANY_RESPONSE ) {

                    # Do not receive a response, self-forming own response
                    $response = {
                        CorrelationId                           => $CorrelationId,
                        topics                                  => [
                            {
                                TopicName                       => $topic_name,
                                partitions                      => [
                                    {
                                        Partition               => $partition,
                                        ErrorCode               => 0,
                                        Offset                  => $BAD_OFFSET,
                                    },
                                ],
                            },
                        ],
                    };
                } else {  //这里获取response.并从网络字节序转换成字符格式.
                    my $encoded_response_ref;
                    unless ( $encoded_response_ref = $self->_receiveIO( $server ) ) {
                        if ( $api_key == $APIKEY_PRODUCE ) {
# WARNING: Unfortunately, the sent package (one or more messages) does not have a unique identifier
# and there is no way to verify the delivery of data
                            $ErrorCode = $ERROR_SEND_NO_ACK;

                            # Should not be allowed to re-send data on the next attempt
                            # FATAL error
                            $self->_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error} );
                        } else {
                            $ErrorCode = $ERROR_CANNOT_RECV;
                            $self->_remember_nonfatal_error( $ErrorCode, $self->{_IO_cache}->{ $server }->{error}, $server, $topic_name, $partition );
                            last REQUEST;   # go to the next attempt
                        }
                    }
                    if ( length( $$encoded_response_ref ) > 4 ) {   # MessageSize => int32
                        $response = $protocol{ $api_key }->{decode}->( $encoded_response_ref );
                        say STDERR sprintf( ‘[%s] response: %s‘,
                                scalar( localtime ),
                                Data::Dumper->Dump( [ $response ], [ ‘response‘ ] )
                            ) if $self->debug_level;
                    } else {
                        $self->_error( $ERROR_RESPONSEMESSAGE_NOT_RECEIVED );
                    }
                }

                $response->{CorrelationId} == $CorrelationId
                    # FATAL error
                    or $self->_error( $ERROR_MISMATCH_CORRELATIONID );
                $topic_data     = $response->{topics}->[0];
                $partition_data = $topic_data->{ $api_key == $APIKEY_OFFSET ? ‘PartitionOffsets‘ : ‘partitions‘ }->[0];

                if ( ( $ErrorCode = $partition_data->{ErrorCode} ) == $ERROR_NO_ERROR ) {
                    return $response;
                } elsif ( exists $RETRY_ON_ERRORS{ $ErrorCode } ) {
                    $self->_remember_nonfatal_error( $ErrorCode, $ERROR{ $ErrorCode }, $server, $topic_name, $partition );
                    last REQUEST;   # go to the next attempt
                } else {
                    # FATAL error
                    $self->_error( $ErrorCode, format_message( "topic = ‘%s‘, partition = %s", $topic_name, $partition ) );
                }
            }
        }

        # Expect to possible changes in the situation, such as restoration of connection
        say STDERR sprintf( ‘[%s] sleeping for %d ms before making request attempt #%d (%s)‘,
                scalar( localtime ),
                $self->{RETRY_BACKOFF},
                $self->{SEND_MAX_ATTEMPTS} - $attempts + 1,
                $ErrorCode == $ERROR_NO_ERROR ? ‘refreshing metadata‘ : "ErrorCode ${ErrorCode}",
            ) if $self->debug_level;
        sleep $self->{RETRY_BACKOFF} / 1000;

        $self->_update_metadata( $topic_name )   //最重要的逻辑在这里.可以看见上面失败则跳出REQUEST块,直接到这里执行更新动作.更新完之后再进行下一次尝试.这个逻辑应对着topic 分区的leader动态切换的.现有leader死了,切换到其他的leader上来.客户端能对此作出应对.
            # FATAL error
            or $self->_error( $ErrorCode || $ERROR_CANNOT_GET_METADATA, format_message( "topic = ‘%s‘, partition = %s", $topic_name, $partition ) );
    }

    # FATAL error
    if ( $ErrorCode ) {
        $self->_error( $ErrorCode, format_message( "topic = ‘%s‘%s", $topic_data->{TopicName}, $partition_data ? ", partition = ".$partition_data->{Partition} : q{} ) );
    } else {
        $self->_error( $ERROR_UNKNOWN_TOPIC_OR_PARTITION, format_message( "topic = ‘%s‘, partition = %s", $topic_name, $partition ) );
    }

    return;
}

上面主要分析核心逻辑实现.可以看见的是.consumer在消费的时候并没有手动提交过offset.所以这里的逻辑实现即是offset自动由kafka server去更新提交.producer和consumer的request里均需要指定topic分区.所以实际上在真正的api底层是没有对topic分区做负载的.一些具有负载功能的其他语言的api均由客户端内部实现.并非kafka server提供的功能.

时间: 2024-11-09 02:04:09

Kafka 客户端实现逻辑分析的相关文章

kafka 客户端封装

kafka客户端封装源码. 1.为什么进行封装? kafka官方自带的客户端,需要对参数进行设置,如下代码,很多参数的key都是字符串,这样对于编程人员来说非常不友好.参数很多的时候,有两种处理方式:(1)传一个config类进去解析:(2)使用建造者模式,笔者在此就使用建造者模式,对官方客户端进行简单封装,使之易用. 官方的例子如下: 1 Properties props = new Properties(); 2 props.put("bootstrap.servers", &qu

Erlang 编写 Kafka 客户端之最简单入门

Erlang 编写 Kafka 客户端之最简单入门 费劲周折,终于测通了 erlang 向kafka 发送消息,使用了ekaf 库,参考: An advanced but simple to use, Kafka producer written in Erlang https://github.com/helpshift/ekaf 1 准备kafka客户端 准备2台机器,一台是ekaf运行的kafka客户端(192.168.191.2),一台是kafka服务端(zookeeper+kafka)

Apache kafka客户端开发-java

apache kafka中国社区QQ群:162272557 1.依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1</version> </dependency> 2.producer程序开发例子 2.1 producer参数说明 #指定kafka节点列表,

Kafka客户端二次封装扩展总体设计

前言背景 消息系统经过多年使用和运维管理平台开发迭代,能较好支持支撑业务发展,公司主流语言为java,但缺乏一个基于Kafka二次封装简单好用的java客户端.遇到问题如下所示: 使用好kafka客户端对业务要求高,非专业技术方向很难有精力全面掌握 异常情况会catch不全 客户端生产消息及双活机房容灾缺失 集群升级难度增加,因为无法全面及时掌握客户端信息(kafka版本.groupid) 不支持动态配置更新,业务使用错误及引发的潜在故障无法及时修正,例如Producer写入倾斜导致磁盘报警,参

kafka客户端代码解析

转载:http://backend.blog.163.com/blog/static/202294126201431724652597/ 可以使用服务器端下载的kafka二进制包及依赖,也可以通过mavne获取(注意实测发现该方式拿到的包是用jdk7打的): <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <vers

spring集成mqtt客户端相关逻辑分析

概述 mqtt客户端有很多java实现,官网上列出了下面这些: Eclipse Paho Java Xenqtt Includes a client library, mock broker for unit/integration testing, and applications to support enterprise needs like using a cluster of servers as a single client, an HTTP gateway, etc. MeQan

c++ kafka 客户端rdkafka报Receive failed: Disconnected问题原因以及解决方法

%3|1538976114.812|FAIL|rdkafka#producer-1| [thrd:kafka-server:9092/bootstrap]: kafka-server:9092/0: Receive failed: Disconnected%3|1538976114.812|ERROR|rdkafka#producer-1| [thrd:kafka-server:9092/bootstrap]: kafka-server:9092/0: Receive failed: Disco

消息队列 Kafka 的基本知识及 .NET Core 客户端

前言 最新项目中要用到消息队列来做消息的传输,之所以选着 Kafka 是因为要配合其他 java 项目中,所以就对 Kafka 了解了一下,也算是做个笔记吧. 本篇不谈论 Kafka 和其他的一些消息队列的区别,包括性能及其使用方式. 简介 Kafka 是一个实现了分布式的.具有分区.以及复制的日志的一个服务.它通过一套独特的设计提供了消息系统中间件的功能.它是一种发布订阅功能的消息系统. 一些名词 如果要使用 Kafka ,那么在 Kafka 中有一些名词需要知道,文本不讨论这些名词是否在其他

Kafka及 .NET Core 客户端

消息队列 Kafka 的基本知识及 .NET Core 客户端 消息队列 Kafka 的基本知识及 .NET Core 客户端 前言 最新项目中要用到消息队列来做消息的传输,之所以选着 Kafka 是因为要配合其他 java 项目中,所以就对 Kafka 了解了一下,也算是做个笔记吧. 本篇不谈论 Kafka 和其他的一些消息队列的区别,包括性能及其使用方式. 简介 Kafka 是一个实现了分布式的.具有分区.以及复制的日志的一个服务.它通过一套独特的设计提供了消息系统中间件的功能.它是一种发布