添加maven依赖包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency>
建立包结构
建立包结构如下图所示为例:
在log4j.properties中输入:
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
生产者代码
1 package com.juyun.kafka; 2 3 import java.util.Properties; 4 5 import org.apache.log4j.PropertyConfigurator; 6 7 import kafka.javaapi.producer.Producer; 8 import kafka.producer.KeyedMessage; 9 import kafka.producer.ProducerConfig; 10 import kafka.serializer.StringEncoder; 11 12 public class KafkaProducerExample extends Thread { 13 private String topic; 14 15 public KafkaProducerExample(String topic){ 16 super(); 17 this.topic=topic; 18 } 19 20 @Override 21 public void run() { 22 Producer<Integer, String> producer=CreateProducer(); 23 for (int i = 1; i < 10; i++) { 24 String message="message"+i; 25 producer.send(new KeyedMessage<Integer, String>(topic, message)); // 调用producer的send方法发送数据 26 System.out.println("发送:"+message); 27 try { 28 sleep(1000); 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 } 32 } 33 } 34 35 public Producer<Integer, String> CreateProducer(){ 36 Properties props=new Properties(); 37 props.setProperty("zookeeper.connect", "172.16.0.157:2181"); // 与zookeeper建立连接 38 props.setProperty("serializer.class", StringEncoder.class.getName()); // key.serializer.class默认为serializer.class 39 props.setProperty("metadata.broker.list", "172.16.0.157:9092"); // kafka broker对应的主机,格式为host1:port1,host2:port2 40 props.put("request.required.acks","1"); // 等待topic中某个partition leader保存成功的状态反馈 41 Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); // 通过配置文件,创建生产者 42 return producer; 43 } 44 45 public static void main(String[] args){ 46 PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件 47 new KafkaProducerExample("test").start(); // 输入topic,启动线程 48 } 49 50 }
KafkaProducerExample.java
消费者代码
1 package com.juyun.kafka; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 import org.apache.log4j.PropertyConfigurator; 9 10 import kafka.consumer.Consumer; 11 import kafka.consumer.ConsumerConfig; 12 import kafka.consumer.ConsumerIterator; 13 import kafka.consumer.KafkaStream; 14 import kafka.javaapi.consumer.ConsumerConnector; 15 16 17 public class KafkaConsumerExample extends Thread{ 18 private String topic; 19 20 private KafkaConsumerExample(String topic) { 21 super(); 22 this.topic=topic; 23 } 24 25 @Override 26 public void run() { 27 ConsumerConnector consumer = createConsumer(); // 创建消费者连接 28 Map<String,Integer> topicCountMap=new HashMap<String, Integer>(); // 定义一个map 29 topicCountMap.put(topic, 1); 30 // Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流 31 Map<String, List<KafkaStream<byte[], byte[]>>> MessageStreams = consumer.createMessageStreams(topicCountMap); 32 // 取出 topic1对应的 streams 33 KafkaStream<byte[], byte[]> kafkaStream = MessageStreams.get(topic).get(0); 34 // 迭代获取到的流 35 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); 36 while (iterator.hasNext()) { 37 String message = new String(iterator.next().message()); 38 System.out.println("接收到:"+message); 39 } 40 } 41 42 public ConsumerConnector createConsumer(){ 43 Properties properties = new Properties(); 44 properties.setProperty("zookeeper.connect", "172.16.0.157:2181"); 45 properties.put("zookeeper.connectiontimeout.ms", "6000"); 46 properties.setProperty("group.id", "group1"); // 设置这个消费者所在的group 47 // 只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出 48 ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)); 49 return createJavaConsumerConnector; 50 } 51 52 public static void main(String[] args) { 53 PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件 54 new KafkaConsumerExample("test").start(); 55 } 56 }
KafkaConsumerExample.java
执行程序
需要先启动zookeeper
#进入到Zookeeper的bin目录下 cd /opt/zookeeper-3.4.8/bin #启动服务 ./zkServer.sh start
再启动Kafka
#进入到Kafka安装目录 bin/kafka-server-start.sh config/server.properties
并可以同时在命令终端启动生产者和消费者进行检测
#启动生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
#启动消费者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic --from-beginning
时间: 2024-10-15 02:11:57