kafka安装以及入门demo

jdk:1.6.0_25 64位

kafka:2.9.2-0.8.2.1

kafka 官方下载地址 http://apache.fayea.com/kafka/0.8.2.1/kafka_2.9.2-0.8.2.1.tgz

tar -zxvf kafka_2.9.2-0.8.2.1.tgz -C /usr/local/ && mv kafka_2.9.2-0.8.2.1 kafka
cd /usr/local/kafka
vi config/zookeeper.properties

dataDir=/usr/local/kafka/zookeeper

vi config/server.properties
broker.id=0
port=9092
hostname=192.168.194.110
log.dirs=/usr/local/kafka/kafka-logs
zookeeper.connect=192.168.194.110:2181

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka broker

bin/kafka-server-start.sh config/server.properties &

查看启动状态

jps
14867 QuorumPeerMain ###存在代表zookeeper服务启动正常

14919 Kafka ###代表kafka broker启动成功

关闭防火墙

service iptables stop

单机版kafka producer consumer 消息发送和接收测试

bin/kafka-console-producer.sh --broker-list 192.168.194.110:9092 --topic test ###启动producer

bin/kafka-console-consumer.sh --zookeeper 192.168.194.110:2181 --topic test --from-beginning

然后在producer端 输入需要发送的msg内容 回车 看consumer端是否接收到消息

[2015-09-11 13:58:00,470] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:526)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

producer端出现此异常 请将producer监听的broker 使用ip地址 不要用localhost

到此 单机版kafka已搭建并测试成功了

使用java来调用kafka的producer发消息和consumer来发消息

maven添加依赖

1 <dependency>  
2         <groupId>org.apache.kafka</groupId>  
3         <artifactId>kafka_2.10</artifactId>  
4         <version>0.8.2.0</version>  
5     </dependency>

kafka消息生产者KafkaProducer

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer extends Thread {
      
      
      
    public static void main(String[] args) {  
        Properties props = new Properties();
        props.put("metadata.broker.list", "192.168.194.110:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        
        //配置key的序列化类
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        Random rnd = new Random();
         
        long runtime = new Date().getTime();
         
        String ip = "192.168.2." + rnd.nextInt(255);
         
        String msg = runtime + ",www.example.com," + ip;
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "test-key",msg);
        producer.send(data);
    }  
       
}

kafka消息消费者 KafkaConsumer

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer{
      
      
    private static ConsumerConnector consumer = null;
    public static void main(String[] args) {  
         Properties props = new Properties();
         //zookeeper 配置
         props.put("zookeeper.connect", "192.168.194.110:2181");

//group 代表一个消费组
         props.put("group.id", "jd-group");

//zk连接超时
         props.put("zookeeper.session.timeout.ms", "4000");
         props.put("zookeeper.sync.time.ms", "200");
         props.put("auto.commit.interval.ms", "1000");
         props.put("auto.offset.reset", "smallest");
         //序列化类
         props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
         
         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
         topicCountMap.put("test", new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
         StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map<String, List<KafkaStream<String, String>>> consumerMap = 
                 consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
         KafkaStream<String, String> stream = consumerMap.get("test").get(0);
         ConsumerIterator<String, String> it = stream.iterator();
         while (it.hasNext())
             System.out.println(it.next().message());
    }  
       
}

分别启动producer 和consumer 即可进行简单的消息发送和接收

结果:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
1441952197141,www.example.com,192.168.2.86

时间: 2024-11-05 16:04:11

kafka安装以及入门demo的相关文章

kafka安装以及入门

一.安装 下载最新版kafka,Apache Kafka,然后上传到Linux,我这里有三台机器,192.168.127.129,130,131 . 进入上传目录,解压到/usr/local目录下 tar -zxvf kafka_2.11-0.11.0.0.tgz -C /usr/local/ 进入/usr/local目录 cd /usr/local 然后改一下名字(这一步可有可无) mv kafka_2.11-0.11.0.0/ kafka 进入kafka目录,编辑配置文件 cd kafka/

kafka安装和使用

kafka安装和启动 kafka的背景知识已经讲了很多了,让我们现在开始实践吧,假设你现在没有Kafka和ZooKeeper环境. Step 1: 下载代码 下载0.10.0.0版本并且解压它. > tar -xzf kafka_2.11-0.10.0.0.tgz > cd kafka_2.11-0.10.0.0 Step 2: 启动服务 运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zooke

windows下游戏服务器端框架Firefly安装说明及demo运行

原地址:http://blog.csdn.net/wangqiuyun/article/details/11150503 本来公司一个网游服务器端选定了pomelo框架,后来出了个Firefly,为做一个对比,决定研究一下Firefly.看了一下Firefly,感觉头大,python的,本人python小白,只好慢慢折腾,一天下来总算装上了Firefly框架,并把他的那个开源网游<暗黑世界>服务器端跑了起来,特此记录共享! 其实关于这个框架的安装,他们的官网和BBS是有教程的只是太零散,并且面

Apache Hadoop2.x 边安装边入门

完整PDF版本:<Apache Hadoop2.x边安装边入门> 目录 第一部分:Linux环境安装 第一步.配置Vmware NAT网络 一. Vmware网络模式介绍 二. NAT模式配置 第二步.安装Linux操作系统 三. Vmware上安装Linux系统 四.设置网络 五.修改Hostname 六.配置Host 七.关闭防火墙 八.关闭selinux 第三步.安装JDK 九.安装Java JDK 第二部分:Hadoop本地模式安装 第四步. Hadoop部署模式 第五步.本地模式部署

Apache CouchDB安装及入门 &nbsp;

1. 从Apache CouchDB官网下载最新的版本,目前最新版本为1.6.1. 2. 运行"setup-couchdb-1.6.1_R16B02.exe"文件,并将couchdb设置为Windows服务,这样就不用每次都启动服务. 3. 在浏览器中运行"http://127.0.0.1:5984",出现下面的内容说明安装成功. {"couchdb":"Welcome","uuid":"4f58

mybaits入门demo配置文件详解(二)

第一篇文章: mybaits开发环境准备及入门demo(一) 一.配置文件mybatis-config.xml 官方是这么说的:MyBatis 的配置文件包含了影响 MyBatis 行为甚深的设置(settings)和属性(properties)信息 在MyBatis 的配置文件中 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//myba

mybaits入门demo映射文件详解(三)

第二篇文章:  mybaits入门demo配置文件详解(二) Mapper XML 文件 MyBatis 的真正强大在于它的映射语句,也是它的魔力所在.由于它的异常强大,映射器的 XML 文件就显得相对简单.如果拿它跟具有相同功能的 JDBC 代码进行对比,你会立即发现省掉了将近 95% 的代码.MyBatis 就是针对 SQL 构建的,并且比普通的方法做的更好. SQL 映射文件有很少的几个顶级元素(按照它们应该被定义的顺序): cache – 给定命名空间的缓存配置. cache-ref –

bower安装使用入门详情

bower安装使用入门详情 bower自定义安装:安装bower需要先安装node,npm,git全局安装bower,命令:npm install -g bower进入项目目录下,新建文件1.txt然后命令行进入项目目录下,输入命令重命名该文件为.bowerrc:修改文件名,命令:rename 1.txt .bowerrc*说明下为何要命令行修改文件名,因为直接修改的话电脑不让修改啊,另外上面命令用于window~~.bowerrc文件内容填充,如下:(js/lib是通过bower下载的文件存放

nodejs学习笔记之安装、入门

由于项目需要,最近开始学习nodejs.在学习过程中,记录一些必要的操作和应该注意的点. 首先是如何安装nodejs环境?(我用的是windows 7环境,所以主要是windows 7的例子.如果想看linux下的安装可以参考http://www.cnblogs.com/meteoric_cry/archive/2013/01/04/2844481.html) 1. nodejs提供了一些安装程序,可以去官网(http://nodejs.org/download/)按照自己的机器进行下载,下载完