kafka生产者

1、kafka生产者是线程安全的,她允许多个线程共享一个kafka实例

2、kafka管理一个简单的后台线程,所有的IO操作以及与每个broker的tcp连接通信,如果没有正确的关闭生产者可能会造成资源泄露。

kafka总共有以下的这些生产者实例

 
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)
          A producer is instantiated by providing a set of key-value pairs as configuration.
KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer)
          A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value Serializer.
KafkaProducer(java.util.Properties properties)
          A producer is instantiated by providing a set of key-value pairs as configuration.
KafkaProducer(java.util.Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer)
          A producer is instantiated by providing a set of key-value pairs as configuration, a key and a value Serializer.

以及相应的方法

 void close()
          Close this producer.
 java.util.Map<MetricName,? extends Metric> metrics()
          Return a map of metrics maintained by the producer
 java.util.List<PartitionInfo> partitionsFor(java.lang.String topic)
          Get a list of partitions for the given topic for custom partition assignment.
 java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)
          Asynchronously send a record to a topic.
 java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)
          Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.

主要介绍send方法

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record,
                                                        Callback callback)send方法是异步的,消息到缓冲区后接着发送消息,不会确认数据是否已经存入kafka,调用callback函数有效的了解当send发送失败时能够抛出异常。
时间: 2024-11-05 13:47:05

kafka生产者的相关文章

java实现Kafka生产者示例

使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.lisg.kafkatest; import

Kafka 系列(三)—— Kafka 生产者详解

一.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区.在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输. 接下来,数据被传给分区器.如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情.如果没有指定分区 ,那么分区器

kafka 生产者

KafkaProducer 创建一个 KafkaThread 来运行 Sender.run 方法. 1. 发送消息的入口在 KafkaProducer#doSend 中,但其实是把消息加入到 batches 中: kafka 生产者是按 batch 发送消息,RecordAccumulator 类有个变量 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches,KafkaProducer#doSend 方法会把当前的这

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Kafka生产者——结合spring开发

目录 Kafka生产者端 可靠性保证: spring-kafka生产端 Kafka生产者端 可靠性保证: producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据: 1.不丢失 2.不重复 producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性. acks 参数配置 为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledg

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

kafka 生产者消费者 api接口

生产者 import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class KafkaProducer  extends Thread{ private String topic

详解Kafka生产者Producer配置

基本配置 metadata.broker.list:broker服务器集群列表,格式为 host1:port1, host2:port2 ... producer.type:消息发送类型同步还是异步,默认为同步 compression.codec:消息的压缩格式,默认为none不压缩,也可以为gzip, snappy, lz4 serializer.class:消息加密格式,默认为kafka.serializer.DefaultEncoder compressed.topics:主题的压缩格式,

kafka生产者java客户端

producer 包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息. 位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群.如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露. 常用配置 bootstrap.servers 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2: acks 生产者需要server端在接收到消息后,进行反馈确认