org.apache.kafka.clients.KafkaClient

(依据于0.10.0.0版本)

这个接口的唯一实现类就是NetworkClient,它被用于实现Kafka的consumer和producer. 这个接口实际上抽象出来了Kafka client与网络交互的方式。

为了对它的API有清楚的认识,先要了解下Kafka protocol所要求的client和broker对于网络请求的处理规则。

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

The server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well. The broker‘s request processing allows only a single in-flight request per connection in order to guarantee this ordering. Note that clients can (and ideally should) use non-blocking IO to implement request pipelining and achieve higher throughput. i.e., clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer. All requests are initiated by the client, and result in a corresponding response message from the server except where noted.

这一段的信息量挺大的。

顺序性

首先,broker按照请求被发送的顺序处理请求,并且按照同样的顺序发送响应。因为Kafka对消息的顺序性有如下的保证:

  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
  • A consumer instance sees messages in the order they are stored in the log.

为了实现这种顺序性保证,最简单可靠的行为就是"The broker‘s request processing allows only a single in-flight request per connection in order to guarantee this ordering. ", 也就是说对于一个TCP连接,broker的请求处理链条中只会有一个正在处理的(in-flight)消息.

那么,Kafka在broker端需不需要缓存待处理的消息呢?

首先,如果缓存请求的话,可能会占用大量内存.其次,如果缓存请求的话,在请求处理出错时,会使得Kafka client难以控制消息的顺序,因为本质上,这种缓存使得client的请求是异步处理的.而如果不进行缓存,那么broker的行为对于client而言更容易理解.

所以,broker是不会在本地缓存请求的.当它从某个连接读取一个请求之后,就会停止从这个连接继续读取请求.也就是说对于每个TCP连接,broker的处理流程是:接收一个请求 -> 处理请求 -> 发送响应 -> 接收下一个请求 -> ...

具体的做法,可以在kafka.network.Processor(也就是reactive模型里的subRactor) 找到,在其run方法中,对于已经完整读取的request和发送完毕的response, 有以下的处理

        selector.completedReceives.asScala.foreach { receive =>
          try {
            val channel = selector.channel(receive.source)
            val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
              channel.socketAddress)
            val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
            requestChannel.sendRequest(req) //把请求送入requestChannel,以后request handler会从中取出request来处理
            selector.mute(receive.source) //停止从这个request的来源(并不只用host来区分)读取消息
          } catch {
            case e @ (_: InvalidRequestException | _: SchemaException) =>
              // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
              error("Closing socket for " + receive.source + " because of error", e)
              close(selector, receive.source)
          }
        }
        selector.completedSends.asScala.foreach { send =>
          val resp = inflightResponses.remove(send.destination).getOrElse {
            throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
          }
          resp.request.updateRequestMetrics()
          selector.unmute(send.destination) //将已发送完毕的response的源设为可读的
        }

可见,对于正在处理的请求,broker不会从它的来源再读取新的消息,直至请求被处理完毕,并且其响应被发送完毕。

预抓取

另一方面,对于client,如果它接收到上一个请求的响应之后,才开始生成新的请求,然后再发送新请求,那么在等待响应的过程中,client就处理等待状态,这样挺没效率.因此,"clients can send requests even while awaiting responses for preceding requests since the outstanding requests will be buffered in the underlying OS socket buffer",也就是说client可以在等待响应的过程中继续发送请求,因为即使broker不去通过网络读这些请求,这些请求也会被缓存在OS的socket buffer中,因此,当broker处理完之前的请求,就可以立即读出来新的请求.不过,如果client这么做的话,会使得它的行为更复杂(因为涉及到出错时的顺序性).

对于consumer,在接收到响应之前难以确定下一次fetch开始的offset,因此在收到前一个fetch respones之后才发送下一次fetch request是比较稳妥的做法.不过如果可以比较准确判断fetch响应包含消息的数目,比而提前发出fetch request,的确有可能会提交consumer的性能.

而且,"收到fetch respone"和"用户处理完fetch到的消息"这两个时间点还是有所不同的,在收到fetch response之后,把抓取到的消息交给用户处理之前,发出下一个fetch request,这样可以提高consumer抓取的效率.新的consumer-KafkaConsumer的确是这么做的.这是KafkaConsumer的poll方法里的一段代码(用户通过执行这个poll方法来获取消息)

 do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE that we use quickPoll() in this case which disables wakeups and delayed
                    // task execution since the consumed positions has already been updated and we
                    // must return these records to users to process before being interrupted or
                    // auto-committing offsets
                    fetcher.sendFetches(metadata.fetch());
                    client.quickPoll();
                    return this.interceptors == null
                        ? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

中间的那一大段就是在说这个事情,但是它考虑的情况比刚才提到的要复杂一些.

首先,如果pollOnce得到的records不为空,就要把这些records返回给用户,所以在此之前要先发送一批fetch rquest(利用Fetcher#sendFetches).如果为空的话,在do-while循环里的pollOnce会发送新的fetch request. 

其次,由于Fetcher的sendFetches并不会执行网络IO操作,而只是生成并且缓存fetch request,所以还需要利用ConsumerNetworkClient的quickPoll方法来执行一次IO操作把这些fetch request发出去.但是由于此时用户还没有得到这次pollOnce返回的records, 因此不能进行auto-commit操作,否则就会把还没返回给用户的records给commit了,并且也不能使得处理的过程被别的线程中断,因为这样用户也拿不到这些records了.所以,这里调用quickPoll,quickPoll会禁止wakeUp,并且不执行DelayedTasks(因为AutoCommitTask就是通过DelayedTask机制执行的).


API

KafkaClient,是producer和consumer与broker通信的接口,它的设计就建立在上边的协议的基础上。这个类包括了与连接状态和请求-响应状态有关的方法。producer和consumer实际使用的它的实现类是NetworkClient。以下方法的作用结合了KafkaClient和NetworkClient的注释,但以NetworkClient的实现为标准。

public boolean isReady(Node node, long now) 查看某个结点是否准备好发送新请求了。由于是给client用的,因此这里的“node"就是broker

public boolean ready(Node node, long now)是到指定node的连接已经被创建好并且可以发送请求。如果连接没有创建,就创建到这个node的连接。

public long connectionDelay(Node, long now) 基于连接状态,返回需要等待的时间。连接的状态有三种:disconnected, connecting, connected.  如果是disconnected状态,就返回reconnect的backoff time。当connecting或者connected,就返回Long.MAX_VALUE,因为此时需要等待别的事件发生(比如连接成功,或者收到响应)

public long connectionFailed(Node node)  查看到这个node的连接是否失败。

public void send(ClientRequest request, long now) 把这个request放入发送队列。如果request是要发给还没有连接好的node的,那么就会抛出IllegalStateException异常, 这是一个运行时异常。

public List<ClientResponse> poll(long timeout, long now) 对于socket进行读写操作。

public void close(String nodeId) 关闭到指定node的连接

public Node leastLoadedNode(long now) 选择有最少的未发送请求的node,要求这些node至少是可以连接的。这个方法会优先选择有可用的连接的节点,但是如果所有的已连接的节点都在使用,它就会选择还没有建立连接的节点。这个方法绝对不会选择忆经断开连接的节点或者正在reconnect backoff阶段的连接。

public int inFlightRequestCount() 所有已发送但还没收到响应的请求的总数

public int inFlightRequestCount(String nodeId) 对于某个特定node的in-flight request总数

public RequestHandler nextRequestHanlder(ApiKeys key) 为某种请求构造它的请求头。按照Kafka Protoocl, request包括以下部分:

RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage

  ApiKey => int16

  ApiVersion => int16

  CorrelationId => int32

  ClientId => string

  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest

而这个方法构造了ApiKey, ApiVersion, CoorelationId和ClientId,作为请求的头部,request handler在源码里有对应类org.apache.kafka.common.requests.RequestHandler。

ApiKey表示请求的种类, 如produce request, fetch request, metadata request等。

puclic RequestHandler nextRequestHandler(ApiKey key, short version)  构造请求的头部,使用特定版本号。

public void wakeup() 如果这个client正在IO阻塞状态,就唤醒它。


总结

Kafka protocol的一些细节,在Kafka client的接口设计中得到了体现.并且,有一些小细节是挺有意思的.

下面会看一下NetworkClient,它是KafkaClient接口的实现.

时间: 2024-10-02 01:02:33

org.apache.kafka.clients.KafkaClient的相关文章

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 本文的图片是通过PPT截图出的,读者如果修改意见请联系我 一.Consumer为何需要实现多线程 假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息.该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用AP

初识Apache Kafka+JAVA程序实例

本文是从英文的官网摘了翻译的,用作自己的整理和记录.水平有限,欢迎指正.版本是: kafka_2.10-0.10.0.0 一.基础概念 主题:Kafka maintains feeds of messages in categories called topics. 生产者:We'll call processes that publish messages to a Kafka topic producers. 消费者:We'll call processes that subscribe t

Apache Kafka 0.11版本新功能简介

Apache Kafka近日推出0.11版本.这是一个里程碑式的大版本,特别是Kafka从这个版本开始支持"exactly-once"语义(下称EOS, exactly-once semantics).本文简要介绍一下0.11版本主要的功能变更,下面中的每一项都值得专门写篇文章好好聊聊. 一.修改unclean.leader.election.enabled默认值 Kafka社区终于下定决心要把这个参数的默认值改成false,即不再允许出现unclean leader选举的情况,在正确

Apache Kafka Producer For Beginners

在我们上一篇Kafka教程中,我们讨论了Kafka Cluster.今天,我们将通过示例讨论Kafka Producer.此外,我们将看到KafkaProducer API和Producer API.此外,我们将学习Kafka Producer中的配置设置.最后,我们将在Kafka Producer教程中讨论简单的生产者应用程序.为了将消息发布到Apache Kafka主题,我们使用Kafka Producer. 那么,让我们详细探讨Apache Kafka Producer. 卡夫卡初学者制片

【Apache Kafka】Kafka安装及简单示例

(一)Apache Kafka安装 1.安装环境与前提条件 ??安装环境:Ubuntu16.04 ??前提条件: ubuntu系统下安装好jdk 1.8以上版本,正确配置环境变量 ubuntu系统下安装好scala 2.11版本 安装ZooKeeper(注:kafka自带一个Zookeeper服务,如果不单独安装,也可以使用自带的ZK) 2.安装步骤 ??Apache基金会开源的这些软件基本上安装都比较方便,只需要下载.解压.配置环境变量三步即可完成,kafka也一样,官网选择对应版本下载后直接

Apache Kafka(三)- Kakfa CLI 使用

1. Topics CLI 1.1  首先启动 zookeeper 与 kafka > zookeeper-server-start.sh config/zookeeper.properties … INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory) INFO Expiring session 0x100ab41939d0000, timeout of 6000m

Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

ERROR [main] [org.apache.spark.streaming.StreamingContext] - Error starting the context, marking it as stopped org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(Kafka

Apache Kafka: Next Generation Distributed Messaging System---reference

Introduction Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed at LinkedIn Corporation and later on became a part of Apache project. Kafka is a fast, scalable, distributed in nature by its design, partition