源码分析Kafka之Producer

Kafka是一款很棒的消息系统,可以看看我之前写的 后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。

要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size等非必要Properties,通过这个简单的接口可以控制Producer大部分行为,实例化后就可以调用send方法发送消息了。

核心实现是这个方法:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
return doSend(interceptedRecord, callback);//②
}

通过不同的模式可以实现发送即忘(忽略返回结果)、同步发送(获取返回的future对象,回调函数置为null)、异步发送(设置回调函数)三种消息模式。

我们来看看消息类ProducerRecord有哪些属性:

private final String topic;//主题
private final Integer partition;//分区
private final Headers headers;//头
private final K key;//键
private final V value;//值
private final Long timestamp;//时间戳
它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。

①中ProducerInterceptors(有0 ~ 无穷多个,形成一个拦截链)对ProducerRecord进行拦截处理(比如打上时间戳,进行审计与统计等操作)

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// 不抛出异常,继续执行下一个拦截器
if (record != null)
log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}

如果用户有定义就进行处理并返回处理后的ProducerRecord,否则直接返回本身。

然后②中doSend真正发送消息,并且是异步的(源码太长只保留关键):

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 序列化 key 和 value
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
}
// 计算分区获得主题与分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
// 回调与事务处理省略。
Header[] headers = record.headers().toArray();
// 消息追加到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 该批次满了或者创建了新的批次就要唤醒IO线程发送该批次了,也就是sender的wakeup方法
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (Exception e) {
// 拦截异常并抛出
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

下面是计算分区的方法:

private int partition(ProducerRecord<K, V> record,
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
// 消息有分区就直接使用,否则就使用分区器计算
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey,
record.value(), serializedValue, cluster);
}

默认的分区器DefaultPartitioner实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。

/**

  • The default partitioning strategy:
  • <ul>
  • <li>If a partition is specified in the record, use it
  • <li>If no partition is specified but a key is present choose a partition based on a hash of the key
  • <li>If no partition or key is present choose a partition in a round-robin fashion
    */
    public class DefaultPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {//key为空
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区
    if (availablePartitions.size() > 0) {//有分区,取模就行
    int part = Utils.toPositive(nextValue) % availablePartitions.size();
    return availablePartitions.get(part).partition();
    } else {// 无分区,
    return Utils.toPositive(nextValue) % numPartitions;
    }
    } else {// key 不为空,计算key的hash并取模获得分区
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    }
    private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
    counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
    AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
    if (currentCounter != null) {
    counter = currentCounter;
    }
    }
    return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin
    }
    }

以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。

Sender(是一个Runnable,被包含在一个IO线程ioThread中,该线程不断从RecordAccumulator队列中的读取消息并通过Selector将数据发送给Broker)的wakeup方法,实际上是KafkaClient接口的wakeup方法,由NetworkClient类实现,采用了NIO,也就是java.nio.channels.Selector.wakeup()方法实现。

Sender的run中主要逻辑是不停执行准备消息和等待消息:

long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④
③完成消息设置并保存到信道中,然后监听感兴趣的key,由KafkaChannel实现。
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
// transportLayer的一种实现中的相关方法
public void addInterestOps(int ops) {
key.interestOps(key.interestOps() | ops);
}

④主要是Selector的poll,其select被wakeup唤醒:

public void poll(long timeout) throws IOException {
/ check ready keys /
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);//wakeup使其停止阻塞
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels that have buffered data (but nothing more from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
}

其中pollSelectionKeys方法会调用如下方法完成消息发送:

public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}

Send是一次数据发包,一般由ByteBufferSend或者MultiRecordsSend实现,其writeTo调用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer实现,区分是否使用ssl:

public long writeTo(GatheringByteChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn‘t happen.");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
return written;
}
public int write(ByteBuffer src) throws IOException {
return socketChannel.write(src);
}

到此就把Producer的业务相关逻辑处理和非业务相关的网络 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。

比如顺序保证就是max.in.flight.requests.per.connection,InFlightRequests的doSend会进行判断(由NetworkClient的canSendRequest调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性

public boolean canSendMore(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

再比如可靠性,通过设置acks,Sender中sendProduceRequest的clientRequest加入了回调函数:

RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());//调用completeBatch
}
};
/**

  • 完成或者重试投递,这里如果acks不对就会重试
  • @param batch The record batch
  • @param response The produce response
  • @param correlationId The correlation id for the request
  • @param now The current POSIX timestamp in milliseconds
    */
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
    long now, long throttleUntilTimeMs) {
    }
    public class ProduceResponse extends AbstractResponse {
    /**
  • Possible error code:
  • INVALID_REQUIRED_ACKS (21)
    */
    }

kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。

原文地址:http://blog.51cto.com/13917525/2311751

时间: 2024-10-03 04:56:55

源码分析Kafka之Producer的相关文章

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada

kafka C客户端librdkafka producer源码分析

简介 kafka网站上提供了C语言的客户端librdkafka,地址在这. librdkafka是使用C语言根据apache kafka 协议实现的客户端.另外这个客户端还有简单的c++接口.客户端作者对这个客户端比较上心,经常会修改bug并提交新功能. librdkafka的基本原理和我之前博客说的java版producer类似,一个线程向队列中加数据,另一个线程通过非阻塞的方式从队列中取出数据,并写入到broker. 源码分析 源码包含两个文件夹src和src-cpp src是用c实现的源码

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod

apache kafka源码分析-Producer分析---转载

原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读1.Kafka提供了Producer类作为java producer的api,此类有几种发送方式?2.总结调用producer.send方法包含哪些流程?3.Producer难以理解的在什么地方? producer的发送方式剖析Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式.sync架构图 async架构图 调用流程如下:

Kafka源码分析及图解原理之Producer端

一.前言 任何消息队列都是万变不离其宗都是3部分,消息生产者(Producer).消息消费者(Consumer)和服务载体(在Kafka中用Broker指代).那么本篇主要讲解Producer端,会有适当的图解帮助理解底层原理. 一.开发应用 首先介绍一下开发应用,如何构建一个KafkaProducer及使用,还有一些重要参数的简介. 1.1 一个栗子 1 /** 2 * Kafka Producer Demo实例类. 3 * 4 * @author GrimMjx 5 */ 6 public

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析 一.概述 kafka默认使用DefaultPartitioner类作为默认的partition策略规则,具体默认设置是在ProducerConfig类中(如下图) 二.DefaultPartitioner.class 源码分析 1.类关系图 2.源码分析 public class DefaultPartitioner implements Partitioner { //缓存map key->to

从源码分析如何优雅的使用 Kafka 生产者

前言 在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确.高效的发送消息. 内容较多,对源码感兴趣的朋友请系好安全带??(源码基于 v0.10.0.0 版本分析).同时最好是有一定的 Kafka 使用经验,知晓基本的用法. 简单的消息发送 在分析之前先看一个简单的消息发送是怎么样的.(以下代码基于 SpringBoot 构建.) 首先创建一个 org.apache.kafka.clients.producer.Producer

RocketMQ 源码分析

RocketMQ 源码分析 RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异).接下来记录下自己阅读源码的一些探索. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务:生产者和消费者是消息的源头和归宿. 在知道各个角色的基本位置后,就该让程序跑

Android:Otto源码分析

Otto源码分析 Otto是一个轻量级的EventBus,它的使用非常简单,我们使用一个Bus的单例,所有需要产生事件(@Produce bus.post(new YourEvent(-)))或者处理事件(@Subscribe)的对象,在create时register,销毁destroy时unregister即可. 使用 @Subscribe 订阅事件,也就是事件的处理者,它有且仅有一个参数YourEvent,每一个Subscribe对应处理一个YourEvent.Event用于连接(匹配)po