kafka 生产者

KafkaProducer 创建一个 KafkaThread 来运行 Sender.run 方法。

1. 发送消息的入口在 KafkaProducer#doSend 中,但其实是把消息加入到 batches 中:

kafka 生产者是按 batch 发送消息,RecordAccumulator 类有个变量 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches,
KafkaProducer#doSend 方法会把当前的这条消息放入到 ProducerBatch 中。然后调用 Sender#wakeup 方法,尝试唤醒阻塞的 io 线程。

2. 从 batches 取出数据发送,入口在 Sender.run,主要的逻辑抽象为 2 步:

2.1 NetworkClient.send
这里的 send 不是真正的网络发送,先把 ProducerReuquest 序列化成 Send 对象,然后加入到 inFlightRequests 的头部,调用 selector 的 send,实则是 KafkaChannel.setSend()

Send send = request.toSend(nodeId, header);

this.inFlightRequests.add(inFlightRequest);

selector.send(inFlightRequest.send);

2.2 NetworkClient.poll
真正的网络发送

Selector#pollSelectionKeys 处理网络读写事件,发送消息即写事件,同时把响应存放在 Selector#completedReceives 中
producer 发送消息,如果 acks = -1 和 1,即 producer 请求需要响应,
在 NetworkClient#handleCompletedSends 中,把不需要响应的请求,从 inFlightRequests 中删除
在 NetworkClient#handleCompletedReceives 处理响应
producer 设置了 ack 的值是固定的,producer 要么都需要响应,要么都不需要响应。
新的请求加在头部,收到的响应对应最旧的请求,即尾部的请求。

3. 主要的类
KafkaProducer: 直接暴露给用户的 api 类;Sender: 主要管理 ProducerBatch
NetworkClient: ProducerBatch 是对象,通过网络发送需要序列化,该类管理连接,更接近 io 层
Selector 对 java nio Selector 的封装
KafkaChannel

4. ByteBuffer

// ByteBuffer 的使用
// ByteBuffer 初始是写模式
public static void main(String[] args) throws UnsupportedEncodingException {
    // capacity = 512, limit = 512, position = 0
    ByteBuffer buffer = ByteBuffer.allocate(512);
    buffer.put((byte)‘h‘);
    buffer.put((byte)‘e‘);
    buffer.put((byte)‘l‘);
    buffer.put((byte)‘l‘);
    buffer.put((byte)‘o‘);

    // limit = position, position = 0
    buffer.flip();

    // 获取字节数
    int len = buffer.remaining();
    byte[] dst = new byte[len];
    buffer.get(dst);
    System.out.println(new String(dst));
    // 结论:ByteBuffer 只是对 byte[] 的封装
}

//SocketChannel
//输出
//SocketChannel#write(java.nio.ByteBuffer)
//读取输入
//SocketChannel#read(java.nio.ByteBuffer)

原文地址:https://www.cnblogs.com/allenwas3/p/11615210.html

时间: 2024-11-05 15:52:55

kafka 生产者的相关文章

kafka生产者

1.kafka生产者是线程安全的,她允许多个线程共享一个kafka实例 2.kafka管理一个简单的后台线程,所有的IO操作以及与每个broker的tcp连接通信,如果没有正确的关闭生产者可能会造成资源泄露. kafka总共有以下的这些生产者实例   KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)           A producer is instantiated by providing a

java实现Kafka生产者示例

使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.lisg.kafkatest; import

Kafka 系列(三)—— Kafka 生产者详解

一.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区.在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输. 接下来,数据被传给分区器.如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情.如果没有指定分区 ,那么分区器

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Kafka生产者——结合spring开发

目录 Kafka生产者端 可靠性保证: spring-kafka生产端 Kafka生产者端 可靠性保证: producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据: 1.不丢失 2.不重复 producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性. acks 参数配置 为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledg

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

kafka 生产者消费者 api接口

生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class KafkaProducer  extends Thread{ private String topic

详解Kafka生产者Producer配置

基本配置 metadata.broker.list:broker服务器集群列表,格式为 host1:port1, host2:port2 ... producer.type:消息发送类型同步还是异步,默认为同步 compression.codec:消息的压缩格式,默认为none不压缩,也可以为gzip, snappy, lz4 serializer.class:消息加密格式,默认为kafka.serializer.DefaultEncoder compressed.topics:主题的压缩格式,

kafka生产者java客户端

producer 包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息. 位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群.如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露. 常用配置 bootstrap.servers 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2: acks 生产者需要server端在接收到消息后,进行反馈确认