1. 使用 KafkaProducer 发送消息,是按 batch 发送的,producer 首先把消息放入 ProducerBatch 中:
org.apache.kafka.clients.producer.internals.ProducerBatch
2. KafkaProduer 类中有一个 Thread 属性,负责 IO,发送和接收数据:
this.sender = new Sender(logContext, client, this.metadata, this.accumulator, maxInflightRequests == 1, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, Time.SYSTEM, this.requestTimeoutMs, config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start();
Sender 类实现了 Runnable 接口,封装了具体的逻辑,发送消息和接收响应都在这个类中。
// 发送消息 long pollTimeout = sendProducerData(now); // 接收响应 client.poll(pollTimeout, now);
3. 执行回调
org.apache.kafka.clients.producer.internals.Sender#completeBatch()
原文地址:https://www.cnblogs.com/allenwas3/p/9768149.html
时间: 2024-10-09 09:55:12