kafka 基础01

总结:
   1.kafka 中可以分步不同的组,消息可以被不同组里面的消费者多次消费
   2. 观察zookeeper中kafka中的信息:
[zk: air00:2181(CONNECTED) 8] ls /
[consumers, config, controller, admin, brokers, zookeeper, controller_epoch]
[zk: air00:2181(CONNECTED) 9] ls /consumers
[test01, test02]
[zk: air00:2181(CONNECTED) 10] ls /consumers/test01
[offsets, owners, ids]
[zk: air00:2181(CONNECTED) 11] ls /consumers/test01/offsets
[test]
[zk: air00:2181(CONNECTED) 12] ls /consumers/test01/offsets/test
[1, 0]
[zk: air00:2181(CONNECTED) 13] 
    3. 新来的消费者,不能获取老的数据

可以看出消费者的信息存在于zookeeper中的节点里面
生产者:
package com.kafka.test;
import java.util.*;  
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;  
import kafka.javaapi.producer.Producer;  
public class Producer01 {

	public static void main(String[] args) {  
		String topic="test";
        Properties props = new Properties();  //9092
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "air00:9092");
  
        ProducerConfig config = new ProducerConfig(props);  
        Producer<String, String> producer = new Producer<String, String>(config);  
        producer.send(new KeyedMessage<String, String>(topic, "test" ));
        producer.close();  
    }  
}

消费者:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class Consumer01 {

	static String groupId="test01";
	static String topic="test";
	public static void main(String[] args) {
	Properties props = new Properties();
    props.put("zookeeper.connect","air00:2181,air01:2181,air02:2181");
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
    		new ConsumerConfig(props));
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while(it.hasNext())
      System.out.println(new String(it.next().message()));
    }   
}
时间: 2024-08-29 11:01:55

kafka 基础01的相关文章

安卓基础01

安卓基础01 SDK System images 这是在创建模拟器时需要的system image,也就是在创建模拟器时CPU/ABI项需要选择的,下载并解压后,将解压出的整个文件夹复制或者移动到 your sdk 路径/system-images文件夹下即可, 如果没有 system-images目录就先 创建此文件夹,然后打开SDK Manager,打开Tools(工 具)菜单选择Options(选项)菜单项打开Android SDK Manager Setting对话框,点击Clear C

iOS基础 01 构建HelloWorld,剖析并真机测试

iOS基础 01 构建HelloWorld,剖析并真机测试 前言: 从控制台输出HelloWorld是我们学习各种语言的第一步,也是我们人生中非常重要的一步. 多年之后,我希望我们仍能怀有学习上进的心情,继续以HelloWorld去认识这世界上更多的东西. 本篇以HelloWorld作为切入点,向大家系统介绍什么事iOS应用以及如何使用Xcode创建iOS应用. 目录: 1. 创建HelloWorld工程 1.1. 设计界面 1.2. 真机测试 2. Xcode中的iOS工程模板 2.1. Ap

C#面向对象基础01

面向对象不是取代面向过程的类.对象."人"是类,"张三"是人这个类的对象.类是抽象的,对象是具体的.按钮就是类,某个按钮就是对象.对象可以叫做类的实例.类就像int,对象就像10.字段field(和某个对象相关的变量),字段就是类的状态.人这个 类有姓名.年龄.身高等字段.类不占内存,对象才占内存.方法:方法就是累能够执行的动作,比如问好.吃饭等.类的继承,类之间可以有继承关系,比如电脑类可以从"电器"类继承,这样的好处是"电脑&quo

C#语言基础01

Console.WriteLine("hello"); Console.ReadKey();// 按一个按键继续执行 string s=Console.ReadLine();//用户输入文字的时候程序 是暂停的 ,用户输入玩 必点回车,把用户输入的作为返回值,声明一个string 类型的变量(容器)s,用s来放ReadLine函数返回的值. Console.WriteLine(s); /*inti1=10;int i2=20; Console.WriteLine(i1+ "+

python基础01 Hello World!

作者:徐佳 欢迎转载,也请保留这段声明.谢谢! 摘要:简单的Hello Word! python 命令行 如已经安装python,那么在linux命令行中输入 $python 将进入python.乱吼在命令行提示符>>>后面输入 print ('Hello World!') 随后在屏幕上输出: Hello World! 写一段小程序 另一个使用Python的方法,是写一个Python程序.用文本编辑器写一个.py结尾的文件,比如说hello.py 在hello.py中写入如下,并保存:

Linux基础01 学会使用命令帮助

Linux基础01 学会使用命令帮助 概述 在linux终端,面对命令不知道怎么用,或不记得命令的拼写及参数时,我们需要求助于系统的帮助文档:linux系统内置的帮助文档很详细,通常能解决我们的问题,我们需要掌握如何正确的去使用它们:在只记得部分命令关键字的场合,我们可通过man -k来搜索:需要知道某个命令的简要说明,可以使用whatis:而更详细的介绍,则可用info命令:查看命令在哪个位置,我们需要使用which:而对于命令的具体参数及使用方法,我们需要用到强大的man:下面分别介绍: 命

黑马程序员--Java基础--01基本概念

1.Java的三大技术架构 JAVAEE(Java Platform Enterprise Edition):  开发企业环境下的应用程序,主要针对web程序开发: JAVASE(Java Platform Standard Edition): 完成桌面应用程序的开发,是其他两者的基础 JAVAME(Java Platform Micro Edition): 开发电子消费产品和嵌入式设备,如手机中的程序 我对三大技术构架理解 1.三大技术是指哪三大技术呢? 答:三大技术分别指JAVAEE,JAV

【C++基础 01】堆和栈的概念

好记性不如烂笔头,学习c++的时间也不是很久,趁着这段时间看 <C++ Primer>将学习笔记整理一下,与君共勉. ==================================================================== 首先要区分一下概念: [数据结构的栈和堆] 堆:也叫优先队列,是一棵完全二叉树,它的特点是父节点的值大于(小于)两个子节点的值(分别称为大顶堆和小顶堆).它常用于管理算法执行过程中的信息,应用场景包括堆排序,优先队列等. 栈:具有后进先出性质

AngularJS基础01 从HelloWorld说起

作者:arccosxy  转载请注明出处:http://www.cnblogs.com/arccosxy/ 准备工作 首先,创建一个名为index.html的HTML文件,代码如下: <!DOCTYPE html> <head> <title>Learning AngularJS</title> </head> <body> </body> </html> 接下来就是加载angular.js库,访问https: