kafka的生产者类

  1 import com.*.message.Configuration;
  2 import org.apache.kafka.clients.producer.Callback;
  3 import org.apache.kafka.clients.producer.ProducerRecord;
  4 import org.apache.kafka.clients.producer.RecordMetadata;
  5 import org.apache.kafka.common.serialization.ByteArraySerializer;
  6 import org.slf4j.Logger;
  7 import org.slf4j.LoggerFactory;
  8
  9 import java.io.IOException;
 10 import java.util.List;
 11 import java.util.concurrent.Future;
 12 import java.util.concurrent.atomic.AtomicInteger;
 13
 14 /**
 15  * kafka消息生产者
 16  *
 17  * @author
 18  * @version V1.0
 19  * @param <T>
 20  * @modify by user: {修改人} 2015-4-14
 21  * @modify by reason:{方法名}:{原因}
 22  */
 23 public abstract class KafkaProducer<T> implements IProducer<T>{
 24
 25     private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
 26     public AtomicInteger sendNum = new AtomicInteger();
 27
 28     private String topic;
 29     private Integer partition = null;
 30     private org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
 31
 32     /**
 33      * 创建一个新的实例KafkaProducer.
 34      *
 35      * @param configuration
 36      *            配置
 37      * @param topic
 38      *            消息主题
 39      */
 40     public KafkaProducer(Configuration configuration, String topic) {
 41         this.topic = topic;
 42         producer = new org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]>(configuration.getConfig(),
 43                 new ByteArraySerializer(), new ByteArraySerializer());
 44     }
 45
 46     /**
 47      * 创建一个新的实例KafkaProducer.
 48      *
 49      * @param configuration
 50      *            配置
 51      * @param topic
 52      *            消息主题
 53      * @param partition
 54      *            分区数
 55      */
 56     public KafkaProducer(Configuration configuration, String topic, Integer partition) {
 57         this(configuration, topic);
 58         this.partition = partition;
 59     }
 60
 61     /**
 62      * 消息序列化方法
 63      *
 64      * @author
 65      * @param msgInfo
 66      * @return
 67      */
 68     protected abstract byte[] serialize(T msgInfo);
 69
 70     /*
 71      * (non-Javadoc)
 72      *
 73      * @see com.hikvision.bsp.message.producer.IProducer#send(T)
 74      */
 75     @Override
 76     public boolean send(final T msg) {
 77         return send(msg, new Callback() {
 78             @Override
 79             public void onCompletion(RecordMetadata metadata, Exception exception) {
 80                 if (exception != null) {
 81                     LOGGER.error("send message ,error info:{}", exception.toString());
 82                 }
 83             }
 84         });
 85     }
 86
 87     /*
 88      * (non-Javadoc)
 89      *
 90      * @see com.hikvision.bsp.message.producer.IProducer#send(T,
 91      * org.apache.kafka.clients.producer.Callback)
 92      */
 93     @Override
 94     public boolean send(final T msg, final Callback callback) {
 95         if (msg == null) {
 96             LOGGER.error("send msg is null. ");
 97             return false;
 98         }
 99
100         // serialize list to protostuff, and send msg
101         byte[] data = serialize(msg);
102         try {
103             producer.send(new ProducerRecord<byte[], byte[]>(topic, partition, null, data), callback);
104         } catch (Throwable e) {
105             LOGGER.error("send msg faild.", e);
106             return false;
107         }
108         return true;
109     }
110
111     @Override
112     public boolean synSend(final T msg) {
113         if (msg == null) {
114             LOGGER.error("send msg is null. ");
115             return false;
116         }
117
118         // serialize list to protostuff, and send msg
119         byte[] data = serialize(msg);
120         Future<?> future = null;
121         try {
122             future = producer.send(new ProducerRecord<byte[], byte[]>(topic, partition, null, data));
123         } catch (Throwable e) {
124             LOGGER.error("send msg faild.", e);
125             return false;
126         } finally {
127             try {
128                 future.get();
129             } catch (Exception e) {
130                 LOGGER.error("send msg faild.", e);
131                 return false;
132             }
133         }
134         return true;
135     }
136
137     /*
138      * (non-Javadoc)
139      *
140      * @see com.hikvision.bsp.message.producer.IProducer#send(java.util.List)
141      */
142     @Override
143     public boolean send(List<T> msgList) {
144         // check arguments validation
145         if (msgList.size() == 0) {
146             LOGGER.error("send msg is null. ");
147             return false;
148         }
149         long curTime = System.currentTimeMillis();
150
151         for (final T msg : msgList) {
152             send(msg);
153         }
154         LOGGER.info("Send {} messages to kafka take: {}ms", msgList.size(), (System.currentTimeMillis() - curTime));
155         return true;
156     }
157
158     @Override
159     public void close() throws IOException {
160         if (producer != null) {
161             producer.close();
162         }
163     }
164 }
时间: 2024-10-25 18:47:01

kafka的生产者类的相关文章

kafka中生产者和消费者API

使用idea实现相关API操作,先要再pom.xml重添加Kafka依赖: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <artifactId>jmxtools&

初识 Kafka Producer 生产者

目录 1.KafkaProducer 概述 2.KafkaProducer 类图 3.KafkaProducer 简单示例 温馨提示:整个 Kafka Client 专栏基于 kafka-2.3.0 版本. @(本节目录) 1.KafkaProducer 概述 根据 KafkaProducer 类上的注释上来看 KafkaProducer 具有如下特征: KafkaProducer 是线程安全的,可以被多个线程交叉使用. KafkaProducer 内部包含一个缓存池,存放待发送消息,即 Pro

【kafka】生产者API 回调 同步

普通实现 public class MyProducer { public static void main(String[] args) { /** * 创建Kafka生产者配置信息:ProducerConfig类中记录了Kafka需要的所有参数信息 * 1.指定连接的Kafka集群 * 2.ack应答级别 * 3.发送失败的重试次数 * 4.批次大小(一次发送多少大小数据) * 5.等待时间 * 6.RecordAccumulator缓冲区大小 * 7.指定key,value序列化类 */

使用java创建kafka的生产者和消费者

创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111        [[email protected] kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3    查看Kafka的主题详情        [[email protected] kafka]# bin/kafka-topics.sh

基于Kafka的生产者消费者消息处理本地调试

(尊重劳动成果,转载请注明出处:http://blog.csdn.net/qq_25827845/article/details/68174111冷血之心的博客) Kafka下载地址:http://download.csdn.net/download/qq_25827845/9798176 安装解压即可 配置修改zookeeper.properties 与 server.properties修改为本地路径,如图所示: 将config文件夹中的zookeeper.properties 与 serv

Kafka之生产者消费者示例

本例以kafka2.10_0.10.0.0为例,不同版本的kafka Java api有些区别! 增加maven依赖 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.0.0</version> </dependency> 生产者 package com.zns.k

使用kafka作为生产者生产数据_到_hbase

配置文件: agent.sources = r1agent.sinks = k1agent.channels = c1 ## sources configagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.r1.kafka.bootstrap.servers = 192.168.80.128:9092,192.168.80.129:9092,192.168.80.130:9092agent.

使用kafka作为生产者生产数据到hdfs(单节点)

关键:查看kafka官网的userguide agent.sources = kafkaSourceagent.channels = memoryChannelagent.sinks = hdfsSink agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.kafkaSource.zookeeperConnect = 192.168.57.11:2181agent.sour

使用kafka作为生产者生产数据到hdfs

关键:查看kafka官网的userGuide 配置文件: agent.sources = r1agent.sinks = k1agent.channels = c1 ## sources configagent.sources.r1.type = org.apache.flume.source.kafka.KafkaSourceagent.sources.r1.kafka.bootstrap.servers = 192.168.80.128:9092,192.168.80.129:9092,19