kafka入门java例子

1,生产者

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {  

        public static void main(String[] args) {
            Properties props = new Properties();
            props.setProperty("metadata.broker.list","10.XX.XX.XX:9092");
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");
            props.put("request.required.acks","1");
            ProducerConfig config = new ProducerConfig(props);
            Producer<String, String> producer = new Producer<String, String>(config);
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("mykafka","test-kafka");
            try {
            	int i =1;
            	while(i < 1000){

            		producer.send(data);
            	}
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.close();
        }
}

2,消费者

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;  

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector; 

public class TestConsumer extends Thread{
        private final ConsumerConnector consumer;
        private final String topic;  

        public static void main(String[] args) {
            TestConsumer consumerThread = new TestConsumer("mykafka");
            consumerThread.start();
        }
        public TestConsumer(String topic) {
            consumer =kafka.consumer.Consumer
                    .createJavaConsumerConnector(createConsumerConfig());
            this.topic =topic;
        }  

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect","10.XX.XX.XX:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
        props.put("group.id", "0");
        props.put("zookeeper.session.timeout.ms","10000");
        return new ConsumerConfig(props);
    }  

    public void run(){
        Map<String,Integer> topickMap = new HashMap<String, Integer>();
        topickMap.put(topic, 1);
        Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap =consumer.createMessageStreams(topickMap);
        KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);
        ConsumerIterator<byte[],byte[]> it =stream.iterator();
        System.out.println("*********Results********");
        while(true){
        	if(it.hasNext()){

        		System.err.println("get data:" +new String(it.next().message()));
        	}
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}  

3,分别启动生产者和消费者,在消费者输出中看到下日志即成功

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
*********Results********
get data:test-kafka

4,启动生产者如果报错如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
	at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
	at kafka.producer.Producer.send(Producer.scala:76)
	at kafka.javaapi.producer.Producer.send(Producer.scala:33)
	at ProducerTest.main(TestProducer.java:21)

需要改动config文件夹下的server.properties中的以下两个属性

zookeeper.connect=localhost:2181改成zookeeper.connect=10.0.30.221:2181
以及默认注释掉的
#host.name=localhost改成host.name=10.0.30.221

kafka入门java例子

时间: 2024-12-08 14:56:10

kafka入门java例子的相关文章

_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)

博文作者:妳那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Kfaka的体系结构 # 学习前言 Kafka的整个学习过程就是自己看官网的文档,出

Kafka入门

文章来源 Kafka入门经典教程http://www.aboutyun.com/thread-12882-1-1.html Kafka官网介绍       http://kafka.apache.org/documentation.html#introduction Kafka剖析(一):Kafka背景及架构介绍  http://www.infoq.com/cn/articles/kafka-analysis-part-1/,这个介绍很全面,重点看它 1.分区 每个分区在Kafka集群的若干服务

全网最通俗易懂的Kafka入门!

摘自:https://www.cnblogs.com/Java3y/p/11982381.html 前言 只有光头才能变强. 文本已收录至我的GitHub仓库,欢迎Star:https://github.com/ZhongFuCheng3y/3y 在这篇之前已经写过两篇基础文章了,强烈建议先去阅读: 什么是ZooKeeper? 什么是消息队列? 众所周知,消息队列的产品有好几种,这里我选择学习Kafka的原因,无他,公司在用. 我司使用的是Kafka和自研的消息队列(Kafka和RocketMQ

漫游Kafka入门篇之简单介绍

原文地址:http://blog.csdn.net/honglei915/article/details/37564521 介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: Kafka将消息以topic为单位进行归纳. 将向Kafka topic发布消息的程序成为producers. 将预订topics并消费消息的程序成为consumer. Kafka以集群的方式运行,

kafka入门三:写第一个Kafka应用

一.整体看一下Kafka 我们知道,Kafka系统有三大组件:Producer.Consumer.broker . producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume). 二.开发一个Producer应用 Producers用来生产消息并把产生的消息推送到Kafka的Broker.Producers可以是各种应用,比如web应用,服务器端应用,代理应用以及log系统等

Kafka教程(一)Kafka入门教程

1 Kafka入门教程 1.1 消息队列(Message Queue) Message Queue消息传送系统提供传送服务.消息传送依赖于大量支持组件,这些组件负责处理连接服务.消息的路由和传送.持久性.安全性以及日志记录.消息服务器可以使用一个或多个代理实例. JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生.发送.接收消息的接口简化企业应用的开发,翻译为Java

Lucene第一个入门学习例子

看Lucene in Action的时候,练习的一个入门例子. 在使用Lucene进行文本内容搜索前,需要先对指定的目录下的文件进行建立索引,代码如下: import java.io.File; import java.io.FileFilter; import java.io.FileReader; import java.io.IOException; import org.apache.lucene.analysis.standard.StandardAnalyzer; import or

Kafka入门介绍

1. Kafka入门介绍 1.1 Apache Kafka是一个分布式的流平台.这到底意味着什么? 我们认为,一个流平台具有三个关键能力: ① 发布和订阅消息.在这方面,它类似一个消息队列或企业消息系统.(生产和消费消息) ② 以容错的方式存储消息流.(存储消息) ③ 当消息流发生时处理它们.(处理消息) 1.1.1 kafka的优势 它应用于两大类应用: ① 构建实时的流数据管道,可靠地获取系统和应用程序之间的数据.(获取数据) ② 构建实时流的应用程序,对数据流进行转换或反应.(处理数据)

Kafka 入门和 Spring Boot 集成

Kafka 入门和 Spring Boot 集成 标签:博客 [TOC] 概述 kafka 是一个高性能的消息队列,也是一个分布式流处理平台(这里的流指的是数据流).由java 和 Scala 语言编写,最早由 LinkedIn 开发,并 2011年开源,现在由 Apache 开发维护. 应用场景 下面列举了一些kafka常见的应用场景. 消息队列 : Kafka 可以作为消息队列使用,可用于系统内异步解耦,流量削峰等场景. 应用监控:利用 Kafka 采集应用程序和服务器健康相关的指标,如应用