kafka 消费者拉取消息

本文只跟踪消费者拉取消息的流程。对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient。

入口在 KafkaConsumer#pollOnce 中,抽出主要步骤:

// 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送
fetcher.sendFetches();

// 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll
client.poll(pollTimeout, nowMs, new PollCondition() {
    @Override
    public boolean shouldBlock() {
        // since a fetch might be completed by the background thread, we need this poll condition
        // to ensure that we do not block unnecessarily in poll()
        return !fetcher.hasCompletedFetches();
    }
});

// 返回数据给用户
return fetcher.fetchedRecords();

Fetcher#sendFetches

public synchronized int sendFetches() {
    // 构造拉取消息请求。从哪个节点,哪个分区,什么位置拉取消息
    Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
    for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
        final Node fetchTarget = entry.getKey();
        final FetchSessionHandler.FetchRequestData data = entry.getValue();
        //1. 借助 Builder 构造 FetchRequest 对象
        final FetchRequest.Builder request = FetchRequest.Builder
                .forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
                .isolationLevel(isolationLevel)
                .setMaxBytes(this.maxBytes)
                .metadata(data.metadata())
                .toForget(data.toForget());
        if (log.isDebugEnabled()) {
            log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
        }

        client.send(fetchTarget, request)
                //4. 给 RequestFutureCompletionHandler.future 添加 RequestFutureListener
                .addListener(new RequestFutureListener<ClientResponse>() {
                    @Override
                    public void onSuccess(ClientResponse resp) {
                        synchronized (Fetcher.this) {
                            FetchResponse response = (FetchResponse) resp.responseBody();
                            FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                            if (handler == null) {
                                log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
                                    fetchTarget.id());
                                return;
                            }
                            if (!handler.handleResponse(response)) {
                                return;
                            }

                            Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                            FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
                                FetchResponse.PartitionData fetchData = entry.getValue();

                                log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
                                        isolationLevel, fetchOffset, partition, fetchData);
                                // 10. 把数据放入 completedFetches,最终返回给用户
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
                                        resp.requestHeader().apiVersion()));
                            }

                            sensors.fetchLatency.record(resp.requestLatencyMs());
                        }
                    }

                    @Override
                    public void onFailure(RuntimeException e) {
                        synchronized (Fetcher.this) {
                            FetchSessionHandler handler = sessionHandler(fetchTarget.id());
                            if (handler != null) {
                                handler.handleError(e);
                            }
                        }
                    }
                });
    }
    return fetchRequestMap.size();
}

ConsumerNetworkClient#send

public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) {
    long now = time.milliseconds();
    //2. 使用 RequestFutureCompletionHandler 作为回调函数
    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
    ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
            completionHandler);
    //3. 请求放入 unsent 集合
    unsent.put(node, clientRequest);

    // wakeup the client in case it is blocking in poll so that we can send the queued request
    client.wakeup();
    return completionHandler.future;
}

ConsumerNetworkClient#poll

// 5. 发送 unsent 中的请求,并没有产生网络 io
trySend(now);

// 真实的网络数据写和读
// 6. 发送请求
// 7. 接收响应
// 8. 触发 RequestFutureCompletionHandler 回调
client.poll(0, now);

// 9. 触发 RequestFutureListener 中的回调
firePendingCompletedRequests();

NetworkClient#handleCompletedReceives

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        InFlightRequest req = inFlightRequests.completeNext(source);
        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
            throttleTimeSensor, now);
        if (log.isTraceEnabled()) {
            log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                req.header.apiKey(), req.header.correlationId(), responseStruct);
        }
        AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
        if (req.isInternalRequest && body instanceof MetadataResponse)
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            // 此处给 responses 添加元素
            // return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, null, response);
            // 直接把请求的 callback 赋值给响应
            // 生产者发送消息的 callback,是用户通过参数传入的
            // 消费者拉取消息的 callback,是在 ConsumerNetworkClient#send 指定的,是 RequestFutureCompletionHandler
            responses.add(req.completed(body, now));
    }
}

NetworkClient#completeResponses

private void completeResponses(List<ClientResponse> responses) {
    for (ClientResponse response : responses) {
        try {
            // callback.onComplete(this);
            response.onComplete();
        } catch (Exception e) {
            log.error("Uncaught error in request completion:", e);
        }
    }
}

RequestFutureCompletionHandler#onComplete

public void onComplete(ClientResponse response) {
    this.response = response;
    pendingCompletion.add(this);
}

ConsumerNetworkClient#firePendingCompletedRequests

private void firePendingCompletedRequests() {
    boolean completedRequestsFired = false;
    for (;;) {
        RequestFutureCompletionHandler completionHandler = pendingCompletion.poll();
        if (completionHandler == null)
            break;

        completionHandler.fireCompletion();
        completedRequestsFired = true;
    }

    // wakeup the client in case it is blocking in poll for this future‘s completion
    if (completedRequestsFired)
        client.wakeup();
}

ConsumerNetworkClient.RequestFutureCompletionHandler#fireCompletion

public void fireCompletion() {
    if (e != null) {
        future.raise(e);
    } else if (response.wasDisconnected()) {
        RequestHeader requestHeader = response.requestHeader();
        int correlation = requestHeader.correlationId();
        log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                requestHeader.apiKey(), requestHeader, correlation, response.destination());
        future.raise(DisconnectException.INSTANCE);
    } else if (response.versionMismatch() != null) {
        future.raise(response.versionMismatch());
    } else {
        future.complete(response);
    }
}

RequestFuture#complete

public void complete(T value) {
    try {
        if (value instanceof RuntimeException)
            throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");

        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
        fireSuccess();
    } finally {
        completedLatch.countDown();
    }
}

private void fireSuccess() {
    T value = value();
    while (true) {
        RequestFutureListener<T> listener = listeners.poll();
        if (listener == null)
            break;
        // 终于调到 RequestFutureListener
        listener.onSuccess(value);
    }
}

如果不考虑心跳线程,consumer 第一次 poll 是不会有数据的,此时请求才发出去,响应还没回来,必须在第二次 poll 时,才能同时处理网络读写事件。

跟完之后,个人觉得调用链还是挺长的。一点感觉,全程只有一个线程,但是每次走的分支都不一样,给人的启发就是,单线程只要不等待,速度也很快。

原文地址:https://www.cnblogs.com/allenwas3/p/11617514.html

时间: 2024-07-30 08:41:36

kafka 消费者拉取消息的相关文章

Kafka消费者之提交消息的偏移量

原文链接:https://cloud.tencent.com/developer/article/1462432 一.概述 在新消费者客户端中,消费位移是存储在Kafka内部的主题 __consumer_offsets 中.把消费位移存储起来(持久化)的动作称为 “提交” ,消费者在消费完消息之后需要执行消费位移的提交. 参考下图的消费位移,x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x ,图中也用了 lastCo

RocketMQ 拉取消息-文件获取

看完了上一篇的<RocketMQ 拉取消息-通信模块>,请求进入PullMessageProcessor中,接着 PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中调用了: final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessag

Kafka消费者手动提交消息偏移

生产者每次调用poll()方法时,它总是返回由生产者写入Kafka但还没有消费的消息,如果消费者一致处于运行状态,那么分区消息偏移量就没什么用处,但是如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费可能分配到新的分区,而不是之前处理的那个,为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量制定的地方开始工作.消费者会往一个__consumer_offser的主题发送消息,消息里包含每个分区的偏移量. 1.同步提交 import o

.net MVC 微信公众号 点击菜单拉取消息时的事件推送

官方文档:https://mp.weixin.qq.com/wiki?t=resource/res_main&id=mp1421141016&token=&lang=zh_CN 业务描述:点击菜单,推送消息,消息内容为自定义的读取数据库信息之后,为用户推送的动态消息. 首先需要用代码实现自定义菜单,为对应的菜单指定click事件. 关于自定义菜单的创建及事件指定,请看上一篇文章,本篇主要介绍事件响应的实现. MVC controller 中的代码如下: public void Me

Kafka consumer消息的拉取及偏移的管理

消费者拉取消息并处理主要有4个步骤: 获取消费者所拉取分区的偏移位置OffsetFetchRequest(新的消息是从偏移位置开始的) 创建FetchReqeust,生成Map<Node, FetchRequest>,以消费者所拉取消息的节点为key来分组,所消费的TopicPartition的数据为value,并放入到unsent队列 调用poll方法实际发送请求给相应的node,如果返回成功,在onSecuss方法中,消息被保存在completedFetches中 从completedFe

Kafka消费者——API开发

目录 消费者客户端 订阅主题 订阅分区 取消订阅 订阅总结 消息消费 poll ConsumerRecord 位移提交 自动提交 手动提交 控制和关闭消费 指定位移消费 再均衡 消费者拦截器 消费者客户端 消费步骤: 1.配置消费者客户端参数并创建相应的消费者实例. 2.订阅主题. 3.拉取消息并消费 4.提交消费位移 5.关闭消费者实例 Properties prop = new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_C

kafka 0.10.2 消息消费者

package cn.xiaojf.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Kaf

消息拉取

疑问:PullRequest何时添加? PullMessageService提供延迟添加与立即添加2种方式 疑问:PullRequest是在什么时候创建的呢? 1.上上图中 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest);mq根据PullRequest拉取任务执行完一次消息拉取任务之后,又将PullRequest对象放入到pullRequestQueue,第二个是在Reba

解决 MySQL 比如我要拉取一个消息表中用户id为1的前10条最新数据

我们都知道,各种主流的社交应用或者阅读应用,基本都有列表类视图,并且都有滑到底部加载更多这一功能, 对应后端就是分页拉取数据.好处不言而喻,一般来说,这些数据项都是按时间倒序排列的,用户只关心最新的动态,而不关心几个月甚至几年前消息,所以后端返回给客户端的数据是不会一次性传递全部内容的(不仅耗费流量,而且还给服务器带来巨大压力). 举个例就说MySQL,它已经给我们提供了相应的语句来支持这一功能,那就是limit关键字.比如我要拉取一个消息表中用户id为1的前10条最新数据,SQL语句如下: s