Kafka使用Java客户端进行访问

添加maven依赖包

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.2.0</version>
</dependency>

建立包结构

  建立包结构如下图所示为例:

  在log4j.properties中输入:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

生产者代码

 1 package com.juyun.kafka;
 2
 3 import java.util.Properties;
 4
 5 import org.apache.log4j.PropertyConfigurator;
 6
 7 import kafka.javaapi.producer.Producer;
 8 import kafka.producer.KeyedMessage;
 9 import kafka.producer.ProducerConfig;
10 import kafka.serializer.StringEncoder;
11
12 public class KafkaProducerExample extends Thread {
13     private String topic;
14
15     public KafkaProducerExample(String topic){
16         super();
17         this.topic=topic;
18     }
19
20     @Override
21     public void run() {
22         Producer<Integer, String> producer=CreateProducer();
23         for (int i = 1; i < 10; i++) {
24             String message="message"+i;
25             producer.send(new KeyedMessage<Integer, String>(topic, message)); // 调用producer的send方法发送数据
26             System.out.println("发送:"+message);
27             try {
28                 sleep(1000);
29             } catch (InterruptedException e) {
30                 e.printStackTrace();
31             }
32         }
33     }
34
35     public Producer<Integer, String> CreateProducer(){
36         Properties props=new Properties();
37         props.setProperty("zookeeper.connect", "172.16.0.157:2181"); // 与zookeeper建立连接
38         props.setProperty("serializer.class", StringEncoder.class.getName()); // key.serializer.class默认为serializer.class
39         props.setProperty("metadata.broker.list", "172.16.0.157:9092"); // kafka broker对应的主机,格式为host1:port1,host2:port2
40         props.put("request.required.acks","1"); // 等待topic中某个partition leader保存成功的状态反馈
41         Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(props)); // 通过配置文件,创建生产者
42         return producer;
43     }
44
45     public static void main(String[] args){
46         PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件
47         new KafkaProducerExample("test").start(); // 输入topic,启动线程
48     }
49
50 }  

KafkaProducerExample.java

消费者代码

 1 package com.juyun.kafka;
 2
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7
 8 import org.apache.log4j.PropertyConfigurator;
 9
10 import kafka.consumer.Consumer;
11 import kafka.consumer.ConsumerConfig;
12 import kafka.consumer.ConsumerIterator;
13 import kafka.consumer.KafkaStream;
14 import kafka.javaapi.consumer.ConsumerConnector;
15
16
17 public class KafkaConsumerExample extends Thread{
18     private String topic;
19
20     private KafkaConsumerExample(String topic) {
21         super();
22         this.topic=topic;
23     }
24
25     @Override
26     public void run() {
27         ConsumerConnector consumer = createConsumer(); // 创建消费者连接
28         Map<String,Integer> topicCountMap=new HashMap<String, Integer>(); // 定义一个map
29         topicCountMap.put(topic, 1);
30         // Map<String, List<KafkaStream<byte[], byte[]>> 中String是topic, List<KafkaStream<byte[], byte[]>是对应的流
31         Map<String, List<KafkaStream<byte[], byte[]>>> MessageStreams = consumer.createMessageStreams(topicCountMap);
32         // 取出 topic1对应的 streams
33         KafkaStream<byte[], byte[]> kafkaStream = MessageStreams.get(topic).get(0);
34         // 迭代获取到的流
35         ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
36         while (iterator.hasNext()) {
37             String message = new String(iterator.next().message());
38             System.out.println("接收到:"+message);
39         }
40     }
41
42     public ConsumerConnector createConsumer(){
43         Properties properties = new Properties();
44         properties.setProperty("zookeeper.connect", "172.16.0.157:2181");
45         properties.put("zookeeper.connectiontimeout.ms", "6000");
46         properties.setProperty("group.id", "group1"); // 设置这个消费者所在的group
47         // 只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
48         ConsumerConnector createJavaConsumerConnector = Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
49         return createJavaConsumerConnector;
50     }
51
52     public static void main(String[] args) {
53         PropertyConfigurator.configure("C:/Users/juyun/workspace/Kafka/src/main/java/com/juyun/logs/log4j.properties"); // 加载.properties文件
54         new KafkaConsumerExample("test").start();
55     }
56 }  

KafkaConsumerExample.java

执行程序

  需要先启动zookeeper

#进入到Zookeeper的bin目录下
cd /opt/zookeeper-3.4.8/bin
#启动服务
./zkServer.sh start

  再启动Kafka

#进入到Kafka安装目录
bin/kafka-server-start.sh config/server.properties

  并可以同时在命令终端启动生产者和消费者进行检测

#启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic
#启动消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic --from-beginning
时间: 2024-10-15 02:11:57

Kafka使用Java客户端进行访问的相关文章

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

kafka的Java客户端示例代码(kafka_2.12-0.10.2.1)

使用0.9开始增加的KafkaProducer和KafkaConsumer. Pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.o

kafka的Java客户端示例代码(kafka_2.11-0.8.2.2)

使用Producer,ConsumerConnector HelloWorldProducer.java package cn.ljh.kafka.kafka_helloworld; import java.util.Date; import java.util.Properties; import java.util.Random; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; impor

kafka生产者java客户端

producer 包含一个用于保存待发送消息的缓冲池,缓冲池中消息是还没来得及传输到kafka集群的消息. 位于底层的kafka I/O线程负责将缓冲池中的消息转换成请求发送到集群.如果在结束produce时,没有调用close()方法,那么这些资源会发生泄露. 常用配置 bootstrap.servers 用于初始化时建立链接到kafka集群,以host:port形式,多个以逗号分隔host1:port1,host2:port2: acks 生产者需要server端在接收到消息后,进行反馈确认

java客户端作为kafka的consumer报错org.I0Itec.zkclient.exception.ZkTimeoutException

出错现象: java客户端编程作为kafka的消费端,连接kafka的broker报错 出错原因分析: 当服务器配置或者网络环境较差时,会出现连接zk超时的情况出现; 解决方法:将程序中的timeout数值调大 props.put("zookeeper.session.timeout.ms", "15000");

Linux Samba目录服务搭建与Java客户端访问

前言: 本文比较简略,只求快速入门,若要了解详情,推荐一篇文章:http://www.cnblogs.com/mchina/archive/2012/12/18/2816717.html 1,安装samba(大部分linux上默认安装的可能不完整,建议重新安装) #检测是否安装,请注意不同安装包协议命令不尽相同#rpm -qa|grep samba #安装#yum install samba samba-client samba-swat 2,启动samba SMB服务 /etc/init.d/

Kafka JAVA客户端代码示例--高级应用

什么时间使用高级应用? 针对一个消息读取多次 在一个process中,仅仅处理一个topic中的一组partitions 使用事务,确保每个消息只被处理一次 使用高级应用(调用较底层函数)的缺点? SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要) 在应用程序中跟踪上次消息处理的offset 确定一个topic partition的lead broker 手工处理broker leander的改变 使用底层函数(SimpleConsumer)开发的步骤

最好用的 Kafka Json Logger Java客户端,赶紧尝试一下

最好用的 Kafka Json Logger Java客户端. slf4j4json 最好用的 Kafka Json Logger 库:不尝试一下可惜了! Description 一款为 Kafka 提供的 json logger 客户端,支持将 json 格式的 log 输出到 kafka.文件.控制台. 支持 slf4j 的全部功能. 比 KafkaLog4jAppender 更好用,可配置性更好. 支持 close logger, 在程序关闭之前 flush log to kafka. 支

kafka 2.12在linux下的安装部署及java客户端对接

一.下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0 二.配置kafka 2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs 2.2 创建zookeeper数据目录:/tmp/zookeeper 2.3 配置/home/kafka_2.12-2.4.0/config/server.properties   内容如下(SSL证书在下面介绍): ssl.keystore.location=/home/ca/serv