java实现Kafka生产者示例

使用java实现Kafka的生产者


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

package com.lisg.kafkatest;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.Partitioner;

import kafka.producer.ProducerConfig;

import kafka.serializer.StringEncoder;

/**

 * Kafka生产者

 * @author lisg

 *

 */

public class KafkaProducer {

    public static void main(String[] args) {

        

        Properties props = new Properties();

        //根据这个配置获取metadata,不必是kafka集群上的所有broker,但最好至少有两个

        props.put("metadata.broker.list", "vm1:9092,vm2:9092");

        //消息传递到broker时的序列化方式

        props.put("serializer.class", StringEncoder.class.getName());

        //zk集群

        props.put("zookeeper.connect", "vm1:2181");

        //是否获取反馈

        //0是不获取反馈(消息有可能传输失败)

        //1是获取消息传递给leader后反馈(其他副本有可能接受消息失败)

        //-1是所有in-sync replicas接受到消息时的反馈

        props.put("request.required.acks", "1");

//      props.put("partitioner.class", MyPartition.class.getName());

        

        //创建Kafka的生产者, key是消息的key的类型, value是消息的类型

        Producer<Integer, String> producer = new Producer<Integer, String>(

                new ProducerConfig(props));

        

        int count = 0;

        while(true) {

            String message = "message-" + ++count;

            //消息主题是test

            KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", message);

            //message可以带key, 根据key来将消息分配到指定区, 如果没有key则随机分配到某个区

//          KeyedMessage<Integer, String> keyedMessage = new KeyedMessage<Integer, String>("test", 1, message);

            producer.send(keyedMessage);

            System.out.println("send: " + message);

            try {

                Thread.sleep(1000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

        

//      producer.close();

    }

}

/**

 * 自定义分区类

 *

 */

class MyPartition implements Partitioner {

    public int partition(Object key, int numPartitions) {

        return key.hashCode()%numPartitions;

    }

    

}

来自为知笔记(Wiz)

附件列表

时间: 2024-12-10 13:19:53

java实现Kafka生产者示例的相关文章

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

java实现Kafka的消费者示例

使用java实现Kafka的消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 8

Java版Kafka使用及配置解释

Java版Kafka使用及配置解释 一.Java示例 kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考.kafka的安装请参考官方文档. 引入Maven库 首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如下: <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.1

kafka生产者

1.kafka生产者是线程安全的,她允许多个线程共享一个kafka实例 2.kafka管理一个简单的后台线程,所有的IO操作以及与每个broker的tcp连接通信,如果没有正确的关闭生产者可能会造成资源泄露. kafka总共有以下的这些生产者实例   KafkaProducer(java.util.Map<java.lang.String,java.lang.Object> configs)           A producer is instantiated by providing a

RocketMQ生产者示例程序

转载请注明出处:http://www.cnblogs.com/xiaodf/ 本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ. 程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> <operator> <parameters> &

Kafka 系列(三)—— Kafka 生产者详解

一.生产者发送消息的过程 首先介绍一下 Kafka 生产者发送消息的过程: Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区.在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输. 接下来,数据被传给分区器.如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情.如果没有指定分区 ,那么分区器

kafka 生产者

KafkaProducer 创建一个 KafkaThread 来运行 Sender.run 方法. 1. 发送消息的入口在 KafkaProducer#doSend 中,但其实是把消息加入到 batches 中: kafka 生产者是按 batch 发送消息,RecordAccumulator 类有个变量 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches,KafkaProducer#doSend 方法会把当前的这

Kafka 生产者、消费者与分区的关系

kafka 生产者.消费者与分区的关系 背景 最近和海康整数据对接, 需要将海康产生的结构化数据拿过来做二次识别. 基本的流程: 海康大数据 --> kafka server --> 平台 Kafka 的 topic 正常过车 topic: BAYONET_VEHICLEPASS 违法过车 topic: BAYONET_VEHICLEALARM 前言 首先我们需要对kafka中的一些名词有一定的了解, 有过一些使用经验, 一般来说, 生产者发送消息到主题, 而消费者从主题消费数据 ( 我初次接

Kafka生产者——结合spring开发

目录 Kafka生产者端 可靠性保证: spring-kafka生产端 Kafka生产者端 可靠性保证: producer向broker发送消息数据,需要有一定的可靠性,至少要保证数据: 1.不丢失 2.不重复 producer提供了一些参数,在编写producer是进行合理设置和编写,就可以保证数据的可靠性. acks 参数配置 为保证producer发送的数据能够可靠的发送到指定topic,topic的每个partition收到消息后,都需要向producer发送ack(acknowledg