057 Java中kafka的Producer程序实现

1.需要启动的服务

  这里启动的端口是9092。

    bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibeifeng.com:2181/kafka

  

2.producer的程序

  1 package com.jun.it;
  2 import kafka.javaapi.producer.Producer;
  3 import kafka.producer.KeyedMessage;
  4 import kafka.producer.ProducerConfig;
  5 import java.util.Properties;
  6 import java.util.Random;
  7 import java.util.concurrent.atomic.AtomicBoolean;
  8 public class JavaKafkaProducer {
  9     public static final char[] chars = "qazwsxedcrfvtgbyhnujmikolp0123456789".toCharArray();
 10     public static final int charsLength = chars.length;
 11     public static final Random random = new Random(System.currentTimeMillis());
 12     private Producer<String, String> producer = null;
 13
 14     private String topicName = null;
 15     private String brokerList = null;
 16     private boolean isSync = true; // 默认为同步
 17
 18     /**
 19      * 构造函数
 20      *
 21      * @param topicName
 22      * @param brokerList
 23      */
 24     public JavaKafkaProducer(String topicName, String brokerList) {
 25         this(topicName, brokerList, true);
 26     }
 27
 28     /**
 29      * 构造函数,主要是产生producer
 30      *
 31      * @param topicName
 32      * @param brokerList
 33      * @param isSync
 34      */
 35     public JavaKafkaProducer(String topicName, String brokerList, boolean isSync) {
 36         // 赋值
 37         this.topicName = topicName;
 38         this.brokerList = brokerList;
 39         this.isSync = isSync;
 40
 41         // 1. 给定配置信息:参考http://kafka.apache.org/082/documentation.html#producerconfigs
 42         Properties props = new Properties();
 43         // kafka集群的连接信息
 44         props.put("metadata.broker.list", this.brokerList);
 45         // kafka发送数据方式
 46         if (this.isSync) {
 47             // 同步发送数据
 48             props.put("producer.type", "sync");
 49         } else {
 50             // 异步发送数据
 51             props.put("producer.type", "async");
 52             /**
 53              * 0: 不等待broker的返回
 54              * 1: 表示至少等待1个broker返回结果
 55              * -1:表示等待所有broker返回数据接收成功的结果
 56              */
 57             props.put("request.required.acks", "0");
 58         }
 59         // key/value数据序列化的类
 60         /**
 61          * 默认是:DefaultEncoder, 指发送的数据类型是byte类型
 62          * 如果发送数据是string类型,必须更改StringEncoder
 63          */
 64         props.put("serializer.class", "kafka.serializer.StringEncoder");
 65
 66         // 2. 构建Kafka的Producer Configuration上下文
 67         ProducerConfig config = new ProducerConfig(props);
 68
 69         // 3. 构建Kafka的生产者:Producerr
 70         this.producer = new Producer<String, String>(config);
 71     }
 72
 73     /**
 74      * 关闭producer连接
 75      */
 76     public void closeProducer() {
 77         producer.close();
 78     }
 79
 80     /**
 81      * 提供给外部应用调用的直接运行发送数据代码的方法
 82      *
 83      * @param threadNumbers
 84      * @param isRunning
 85      */
 86     public void run(int threadNumbers, final AtomicBoolean isRunning) {
 87         for (int i = 0; i < threadNumbers; i++) {
 88             new Thread(new Runnable() {
 89                 public void run() {
 90                     int count = 0;
 91                     while (isRunning.get()) {
 92                         // 只有在运行状态的情况下,才发送数据
 93                         KeyedMessage<String, String> message = generateMessage();
 94                         // 发送数据
 95                         producer.send(message);
 96                         count++;
 97                         // 打印一下
 98                         if (count % 100 == 0) {
 99                             System.out.println("Count = " + count + "; message:" + message);
100                         }
101
102                         // 假设需要休息一下
103                         try {
104                             Thread.sleep(random.nextInt(100) + 10);
105                         } catch (InterruptedException e) {
106                             // nothings
107                         }
108                     }
109                     System.out.println("Thread:" + Thread.currentThread().getName() + " send message count is:" + count);
110                 }
111             }).start();
112         }
113     }
114
115     /**
116      * 产生一个随机的Kafka的KeyedMessage对象
117      *
118      * @return
119      */
120     public KeyedMessage<String, String> generateMessage() {
121         String key = generateString(3) + "_" + random.nextInt(10);
122         StringBuilder sb = new StringBuilder();
123         int numWords = random.nextInt(5) + 1; // [1,5]单词
124         for (int i = 0; i < numWords; i++) {
125             String word = generateString(random.nextInt(5) + 1); // 单词中字符最少1个最多5个
126             sb.append(word).append(" ");
127         }
128         String message = sb.toString().trim();
129         return new KeyedMessage(this.topicName, key, message);
130     }
131
132     /**
133      * 随机生产一个给定长度的字符串
134      *
135      * @param numItems
136      * @return
137      */
138     public static String generateString(int numItems) {
139         StringBuilder sb = new StringBuilder();
140         for (int i = 0; i < numItems; i++) {
141             sb.append(chars[random.nextInt(charsLength)]);
142         }
143         return sb.toString();
144     }
145 }

3.测试类

 1 package com.jun.it;
 2
 3 import java.util.concurrent.atomic.AtomicBoolean;
 4
 5 public class JavaKafkaProducerTest {
 6     public static void main(String[] args) {
 7         String topicName = "beifeng";
 8         String brokerList = "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093";
 9         int threadNums = 10;
10         AtomicBoolean isRunning = new AtomicBoolean(true);
11         JavaKafkaProducer producer = new JavaKafkaProducer(topicName, brokerList);
12         producer.run(threadNums, isRunning);
13
14         // 停留60秒后,进行关闭操作
15         try {
16             Thread.sleep(1000);
17         } catch (InterruptedException e) {
18             // nothings
19         }
20         isRunning.set(false);
21
22         // 关闭连接
23         producer.closeProducer();
24     }
25 }

4.效果

  

二:使用自定义的分区器

1.分区器

 1 package com.jun.it;
 2
 3 import kafka.producer.Partitioner;
 4 import kafka.utils.VerifiableProperties;
 5
 6 public class JavaKafkaPartitioner implements Partitioner {
 7     /**
 8      * 默认无参构造函数
 9      */
10     public JavaKafkaPartitioner() {
11         this(new VerifiableProperties());
12     }
13
14     /**
15      * 该构造函数必须给定
16      *
17      * @param properties 初始化producer的时候给定的配置信息
18      */
19     public JavaKafkaPartitioner(VerifiableProperties properties) {
20         // nothings
21     }
22
23     @Override
24     public int partition(Object key, int numPartitions) {
25         String tmp = (String) key;
26         int index = tmp.lastIndexOf(‘_‘);
27         int number = Integer.valueOf(tmp.substring(index + 1));
28         return number % numPartitions;
29     }
30 }

2.producer类重新修改

  1 package com.jun.it;
  2 import kafka.javaapi.producer.Producer;
  3 import kafka.producer.KeyedMessage;
  4 import kafka.producer.ProducerConfig;
  5 import java.util.Properties;
  6 import java.util.Random;
  7 import java.util.concurrent.atomic.AtomicBoolean;
  8 public class JavaKafkaProducer {
  9     public static final char[] chars = "qazwsxedcrfvtgbyhnujmikolp0123456789".toCharArray();
 10     public static final int charsLength = chars.length;
 11     public static final Random random = new Random(System.currentTimeMillis());
 12     private Producer<String, String> producer = null;
 13
 14     private String topicName = null;
 15     private String brokerList = null;
 16     private boolean isSync = true; // 默认为同步
 17     private String partitionerClass = null; // 数据分区器class类
 18
 19     /**
 20      * 构造函数
 21      *
 22      * @param topicName
 23      * @param brokerList
 24      */
 25     public JavaKafkaProducer(String topicName, String brokerList) {
 26         this(topicName, brokerList, true, null);
 27     }
 28
 29     /**
 30      * 构造函数
 31      *
 32      * @param topicName
 33      * @param brokerList
 34      * @param partitionerClass
 35      */
 36     public JavaKafkaProducer(String topicName, String brokerList, String partitionerClass) {
 37         this(topicName, brokerList, true, partitionerClass);
 38     }
 39
 40     /**
 41      * 构造函数,主要是产生producer
 42      *
 43      * @param topicName
 44      * @param brokerList
 45      * @param isSync
 46      */
 47     public JavaKafkaProducer(String topicName, String brokerList, boolean isSync, String partitionerClass) {
 48         // 赋值
 49         this.topicName = topicName;
 50         this.brokerList = brokerList;
 51         this.isSync = isSync;
 52         this.partitionerClass = partitionerClass;
 53
 54         // 1. 给定配置信息:参考http://kafka.apache.org/082/documentation.html#producerconfigs
 55         Properties props = new Properties();
 56         // kafka集群的连接信息
 57         props.put("metadata.broker.list", this.brokerList);
 58         // kafka发送数据方式
 59         if (this.isSync) {
 60             // 同步发送数据
 61             props.put("producer.type", "sync");
 62         } else {
 63             // 异步发送数据
 64             props.put("producer.type", "async");
 65             /**
 66              * 0: 不等待broker的返回
 67              * 1: 表示至少等待1个broker返回结果
 68              * -1:表示等待所有broker返回数据接收成功的结果
 69              */
 70             props.put("request.required.acks", "0");
 71         }
 72         // key/value数据序列化的类
 73         /**
 74          * 默认是:DefaultEncoder, 指发送的数据类型是byte类型
 75          * 如果发送数据是string类型,必须更改StringEncoder
 76          */
 77         props.put("serializer.class", "kafka.serializer.StringEncoder");
 78
 79         // 给定分区器的class参数
 80         if (this.partitionerClass != null && !this.partitionerClass.trim().isEmpty()) {
 81             // 默认是:DefaultPartiioner,基于key的hashCode进行hash后进行分区
 82             props.put("partitioner.class", this.partitionerClass.trim());
 83         }
 84
 85         // 2. 构建Kafka的Producer Configuration上下文
 86         ProducerConfig config = new ProducerConfig(props);
 87
 88         // 3. 构建Kafka的生产者:Producerr
 89         this.producer = new Producer<String, String>(config);
 90     }
 91
 92     /**
 93      * 关闭producer连接
 94      */
 95     public void closeProducer() {
 96         producer.close();
 97     }
 98
 99     /**
100      * 提供给外部应用调用的直接运行发送数据代码的方法
101      *
102      * @param threadNumbers
103      * @param isRunning
104      */
105     public void run(int threadNumbers, final AtomicBoolean isRunning) {
106         for (int i = 0; i < threadNumbers; i++) {
107             new Thread(new Runnable() {
108                 public void run() {
109                     int count = 0;
110                     while (isRunning.get()) {
111                         // 只有在运行状态的情况下,才发送数据
112                         KeyedMessage<String, String> message = generateMessage();
113                         // 发送数据
114                         producer.send(message);
115                         count++;
116                         // 打印一下
117                         if (count % 100 == 0) {
118                             System.out.println("Count = " + count + "; message:" + message);
119                         }
120
121                         // 假设需要休息一下
122                         try {
123                             Thread.sleep(random.nextInt(100) + 10);
124                         } catch (InterruptedException e) {
125                             // nothings
126                         }
127                     }
128                     System.out.println("Thread:" + Thread.currentThread().getName() + " send message count is:" + count);
129                 }
130             }).start();
131         }
132     }
133
134     /**
135      * 产生一个随机的Kafka的KeyedMessage对象
136      *
137      * @return
138      */
139     public KeyedMessage<String, String> generateMessage() {
140         String key = generateString(3) + "_" + random.nextInt(10);
141         StringBuilder sb = new StringBuilder();
142         int numWords = random.nextInt(5) + 1; // [1,5]单词
143         for (int i = 0; i < numWords; i++) {
144             String word = generateString(random.nextInt(5) + 1); // 单词中字符最少1个最多5个
145             sb.append(word).append(" ");
146         }
147         String message = sb.toString().trim();
148         return new KeyedMessage(this.topicName, key, message);
149     }
150
151     /**
152      * 随机生产一个给定长度的字符串
153      *
154      * @param numItems
155      * @return
156      */
157     public static String generateString(int numItems) {
158         StringBuilder sb = new StringBuilder();
159         for (int i = 0; i < numItems; i++) {
160             sb.append(chars[random.nextInt(charsLength)]);
161         }
162         return sb.toString();
163     }
164 }

3.测试类

 1 package com.jun.it;
 2
 3 import java.util.concurrent.atomic.AtomicBoolean;
 4
 5 public class JavaKafkaProducerTest {
 6     public static void main(String[] args) {
 7         String topicName = "beifeng";
 8         String brokerList = "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093";
 9         String partitionerClass = "com.jun.it.JavaKafkaPartitioner";
10         int threadNums = 10;
11         AtomicBoolean isRunning = new AtomicBoolean(true);
12         JavaKafkaProducer producer = new JavaKafkaProducer(topicName, brokerList,partitionerClass);
13         producer.run(threadNums, isRunning);
14
15         // 停留60秒后,进行关闭操作
16         try {
17             Thread.sleep(1000);
18         } catch (InterruptedException e) {
19             // nothings
20         }
21         isRunning.set(false);
22
23         // 关闭连接
24         producer.closeProducer();
25     }
26 }

4.效果

  

原文地址:https://www.cnblogs.com/juncaoit/p/9426106.html

时间: 2024-11-09 18:48:43

057 Java中kafka的Producer程序实现的相关文章

java中子类继承父类程序执行顺序问题

Java中,new一个类的对象,类里面的静态代码块.非静态代码.无参构造方法.有参构造方法.类的一般方法等部分,它们的执行顺序相对来说比较简单,用程序也很容易验证.比如新建一个测试父类. public class FatherTest { private String name; FatherTest(){ System.out.println("--父类的无参构造函数--"); } FatherTest(String name){ this.name=name; System.out

java中的继承 黑马程序员

通过继承实现代码复用.Java中所有的类都是通过直接或间接地继承java.lang.Object类得到的.继承而得到的类称为子类,被继承的类称为父类.子类不能继承父类中访问权限为private的成员变量和方法.子类可以重写父类的方法,及命名与父类同名的成员变量.但Java不支持多重继承,即一个类从多个超类派生的能力. class A { A(){} private int x=10;//A类的私有成员变量(不可以被继承) protected int y=20;//A类的保护成员变量(可以被继承)

Java中static方法、程序入口函数main方法的继承问题

对于继承问题,说明如下: 1.若父类中属性或方法使用private修饰,则不能在子类中访问:对于使用protected.default.public修饰的方法或属性,都是可以在子类中访问的. 2.在1中所述的可被子类访问的前提下,对于static修饰的方法或属性,表示该属性或方法是独立于具体的对象事例:它们也都是可以被子类继承的. 3.对于父类中的入口函数(main方法)也是可以被子类继承的.ps:具体的可以使用命令行运行java程序进行验证.

对上次“对字符串进行简单的字符数字统计 探索java中的List功能 ”程序,面向对象的改进

之前的随笔中的程序在思考后发现,运用了太多的static 函数,没有将面向对象的思想融入,于是做出了一下修改: 1 import java.util.ArrayList; 2 import java.util.Collections; 3 import java.util.List; 4 import java.util.Scanner; 5 6 7 public class classtest { 8 9 List<String> number=new ArrayList<String

Java中的窗口应用程序的设计

1.创建一个窗口程序(JFrame),标题栏起名为"浏览器",有一个菜单条,有"文件"."编辑"."查看"3个菜单."文件"菜单有两个菜单项,一项是"打开",一项是"保存","打开"项做成子菜单,有两个菜单项"打开x"和"打开y","保存"项就是普通的菜单项."编辑"菜

黑马程序员------Java中多线程学习总结(一)

Java培训.Android培训.iOS培训..Net培训</a>.期待与您交流! 一.多线程的概念 进程:是一种“自包容”的运行程序,有自己的地址空间. 基于进程的特点是允许计算机同时运行两个或更多的程序 线程:是进程内部单一的一个顺序控制流 . 基于线程的多任务处理环境中,线程是最小的处理单位. 在Java中,一个应用程序可以包含多个线程.每个线程执行特定的任务,并可与其他线程并发执行.多线程使系统的空转时间减少,提高了CPU的利用率.多线程编程隐藏了CPU在任务之间切换的事实. 二.创建

java中native方法的使用

native关键字说明其修饰的方法是一个原生态方法,方法对应的实现不是在当前文件,而是在用其他语言(如C和C++)实现的文件中.Java语言本身不能对操作系统底层进行访问和操作,但是可以通过JNI接口调用其他语言来实现对底层的访问. JNI是Java本机接口(JavaNative Interface),是一个本机编程接口,它是Java软件开发工具箱(javaSoftware Development Kit,SDK)的一部分.JNI允许Java代码使用以其他语言编写的代码和代码库.Invocati

JAVA中SSL证书认证通讯

SSL通讯服务端 /******************************************************************** * 项目名称    :rochoc   <p> * 包名称      :rochoc.net.security <p> * 文件名称    :SSLServer   <p> * 编写者     :luoc    <p> * 编写日期    :2005-6-30    <p> * 程序功能(类

Java中native的用法

原文来自:http://blog.csdn.net/funneies/article/details/8949660 native关键字说明其修饰的方法是一个原生态方法,方法对应的实现不是在当前文件,而是在用其他语言(如C和C++)实现的文件中.Java语言本身不能对操作系统底层进行访问和操作,但是可以通过JNI接口调用其他语言来实现对底层的访问. JNI是Java本机接口(Java Native Interface),是一个本机编程接口,它是Java软件开发工具箱(Java Software