kafka生产者java客户端

producer

  包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息。
  位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群。如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露。
 

常用配置

bootstrap.servers

 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2; 

acks

生产者需要server端在接收到消息后,进行反馈确认的尺度,主要用于消息的可靠性传输;acks=0表示生产者不需要来自server的确认;acks=1表示server端将消息保存后即可发送ack,而不必等到其他follower角色的都收到了该消息;acks=all(or acks=-1)意味着server端将等待所有的副本都被接收后才发送确认。

retries

生产者发送失败后,重试的次数 

batch.size

当多条消息发送到同一个partition时,该值控制生产者批量发送消息的大小,批量发送可以减少生产者到服务端的请求数,有助于提高客户端和服务端的性能。 

linger.ms

默认情况下缓冲区的消息会被立即发送到服务端,即使缓冲区的空间并没有被用完。可以将该值设置为大于0的值,这样发送者将等待一段时间后,再向服务端发送请求,以实现每次请求可以尽可能多的发送批量消息。 

batch.size

batch.size和linger.ms是两种实现让客户端每次请求尽可能多的发送消息的机制,它们可以并存使用,并不冲突。 

buffer.memory

生产者缓冲区的大小,保存的是还未来得及发送到server端的消息,如果生产者的发送速度大于消息被提交到server端的速度,该缓冲区将被耗尽。 

key.serializer,value.serializer

说明了使用何种序列化方式将用户提供的key和vaule值序列化成字节。

Producer

  

public class Producer {

    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
    private KafkaProducer<String, String> kafkaProducer;
    private Random random = new Random();
    private String topic;
    private int retry;  

    public Producer() {
        this("my_init");
    }

    public Producer(String topic) {
        this(topic,3);
    }

    public Producer(String topic,int retry) {
         this.topic = topic;
         this.retry = retry;
         if (null == kafkaProducer) {
             Properties props = new Properties();
             InputStream inStream = null;
             try {
                 inStream = this.getClass().getClassLoader().getResourceAsStream("kafka-producer.properties");
                 props.load(inStream);
                 kafkaProducer = new KafkaProducer<String, String>(props);
             } catch (IOException e) {
                 LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
             } finally {
                 if (null != inStream) {
                     try {
                         inStream.close();
                     } catch (IOException e) {
                         LOGGER.error("kafkaProducer初始化失败:" + e.getMessage(), e);
                     }
                 }
             }
         }
    }

    /**
     * 通过kafkaProducer发送消息
     * @param topic 消息接收主题
     * @param partitionNum  哪一个分区
     * @param retry  重试次数
     * @param message 具体消息值
     */
    public RecordMetadata sendKafkaMessage(final String message) {  

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", message);
        Future<RecordMetadata> meta = kafkaProducer.send(record, new Callback() {              //send方法是异步的,添加消息到缓存区等待发送,并立即返回,这使生产者通过批量发送消息来提高效率
            public void onCompletion(RecordMetadata recordMetadata,Exception exception) {
                if (null != exception) {
                    LOGGER.error("kafka发送消息失败:" + exception.getMessage(),exception);
                    retryKakfaMessage(message);
                }
            }
        });
        RecordMetadata metadata = null;
        try {
            metadata = meta.get();
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {}
        return metadata;
    }  

    /**
     * 当kafka消息发送失败后,重试
     */
    private void retryKakfaMessage(final String retryMessage) {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", retryMessage);
        for (int i = 1; i <= retry; i++) {
            try {
                kafkaProducer.send(record);
                return;
            } catch (Exception e) {
                LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
                retryKakfaMessage(retryMessage);
            }
        }
    }  

    /**
     * kafka实例销毁
     */
    public void close() {
        if (null != kafkaProducer) {
            kafkaProducer.flush();
            kafkaProducer.close();
        }
    }  

    public String getTopic() {
        return topic;
    }  

    public void setTopic(String topic) {
        this.topic = topic;
    }  

    public int getRetry() {
        return retry;
    }  

    public void setRetry(int retry) {
        this.retry = retry;
    }
}

TestProducer

  

public class TestProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for(int i=0;i<3;i++){
            executor.submit(new Runnable() {
                @Override
                public void run() {
                     String topic = "2017-11-6-test";
                     Producer p = new Producer(topic);
                     for(int n=1;n<=5;n++){
                         String str = "hello world => "+n;
                         RecordMetadata message = p.sendKafkaMessage(str);
                         LOGGER.info("发送信息: "+message.topic()+"---"+message.partition()+"---"+message.offset());
                     }
                     p.close();
                }
            });
        }
        System.out.println("this is main");
        executor.shutdown();//这个表示 线程执行完之后自动退出
        System.out.println("hello world");
    }
}

原文地址:https://www.cnblogs.com/weishao-lsv/p/8159430.html

时间: 2024-08-02 19:01:35

kafka生产者java客户端的相关文章

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

Kafka使用Java客户端进行访问

添加maven依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifac

kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

使用0.9开始增加的KafkaProducer和KafkaConsumer. Pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.o

kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)

使用Producer,ConsumerConnector HelloWorldProducer.java package cn.ljh.kafka.kafka_helloworld; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; impor

kafka 生产者java编码

public class KafkaProducerDemo { public static void main(String[] args) throws InterruptedException { /* Properties props = new Properties(); * props.put("bootstrap.servers", "localhost:9092"); * props.put("acks", "all&q

kafka 2.12在linux下的安装部署及java客户端对接

一.下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0 二.配置kafka 2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs 2.2 创建zookeeper数据目录:/tmp/zookeeper 2.3 配置/home/kafka_2.12-2.4.0/config/server.properties   内容如下(SSL证书在下面介绍): ssl.keystore.location=/home/ca/serv

java客户端作为kafka的consumer报错org.I0Itec.zkclient.exception.ZkTimeoutException

出错现象: java客户端编程作为kafka的消费端,连接kafka的broker报错 出错原因分析: 当服务器配置或者网络环境较差时,会出现连接zk超时的情况出现; 解决方法:将程序中的timeout数值调大 props.put("zookeeper.session.timeout.ms", "15000");

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

java实现Kafka生产者示例

使用java实现Kafka的生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 package com.lisg.kafkatest; import