kafka producer batch 发送消息

1. 使用 KafkaProducer 发送消息,是按 batch 发送的,producer 首先把消息放入 ProducerBatch 中:
org.apache.kafka.clients.producer.internals.ProducerBatch
2. KafkaProduer 类中有一个 Thread 属性,负责 IO,发送和接收数据:
            this.sender = new Sender(logContext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxInflightRequests == 1,
                    config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                    acks,
                    retries,
                    metricsRegistry.senderMetrics,
                    Time.SYSTEM,
                    this.requestTimeoutMs,
                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                    this.transactionManager,
                    apiVersions);
            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
            this.ioThread.start();

Sender 类实现了 Runnable 接口,封装了具体的逻辑,发送消息和接收响应都在这个类中。

// 发送消息
long pollTimeout = sendProducerData(now);
// 接收响应
client.poll(pollTimeout, now);

3. 执行回调

org.apache.kafka.clients.producer.internals.Sender#completeBatch()

原文地址:https://www.cnblogs.com/allenwas3/p/9768149.html

时间: 2024-10-09 09:55:12

kafka producer batch 发送消息的相关文章

【原创】Kafka接受发送消息对象Object基础版

首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1) kafka常用的发送消息的方法如下: Properties props = new Properties(); props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181"); props.put("serializer.class", "kafka

java客户端向单机版kafka发送消息没有接收到

kafka版本:kafka_2.11-0.10.0.0 在kafka服务器命令发送消息,消费者可以接受到, 但是在java客户端向kafka发送消息时消费者接受不到, 在kafka/config/sever.properties把这个注解打开 advertised.listeners=PLAINTEXT://ip.137:9092        #本机服务器ip 意思就是说:hostname.port都会广播给producer.consumer.如果你没有配置了这个属性的话,则使用listene

Kafka Producer相关代码分析

Kafka Producer相关代码分析 标签(空格分隔): kafka Kafka Producer将用户的消息发送到Kafka集群(准确讲是发送到Broker).本文将分析Producer相关的代码实现. 类kafka.producer.Producer 如果你自己实现Kafka客户端来发送消息的话,你就是用到这个类提供的接口来发送消息.(如果你对如何利用Producer API来发送消息还不是很熟悉的话,可以参看官方的例子).这个类提供了同步和异步两种方式来发送消息. 异步发送消息是基于同

spring kafka producer 生产者

pom引入jar包 ··· org.springframework.kafka spring-kafka 1.1.1.RELEASE ··· ### spring配置 ``` ``` ### kafka.properties 配置文件 ``` kafka.producer.servers=127.0.0.1:9091,127.0.0.1:9092 kafka.producer.retries=1 kafka.producer.batch.size=4096 kafka.producer.ling

关于高并发下kafka producer send异步发送耗时问题的分析

最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍.所以就对kafka API send 的源码进行了一下跟踪和分析,在此总结记录一下. 首先看springboot下 kafka producer 的使用 在config中进行配置,向IOC容器中注入DefaultKa

kafka 0.8.2 消息生产者 producer

package com.hashleaf.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * 消息生产者 * @author xiaojf [email protected] * @since 2015-7-15 下午10:50:01 */

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

Golang之发送消息至kafka

windows下安装zookeeper 1.安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 2.安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 3.重命名conf/zoo_sample.cfg 为conf/zoo.cfg 4.编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\ 4.运行bin/zkServer.cmd 启动结果如下: 安装kafka 1

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag