Kafka编程实例

编程

Producer是一个应用程序,它创建消息并发送它们到Kafka broker中。这些producer在本质上是不同。比如,前端应用程序,后端服务,代理服务,适配器对于潜在的系统,Hadoop对于的Producer。这些不同的Producer能够使用不同的语言实现,比如java、C和Python。下面的这部图表解释了消息producer的Kafka API.

下面将详细介绍如果编写一个简单的Producer和Consumer应用程序。

发送简单消息给Kafka broker,Producer端编写类ClusterProducer。

public classClusterProducer extends Thread {
    private static final Log log =LogFactory.getLog(ClusterProducer.class);

    public void sendData() {
        Random rnd = new Random();
        Properties props =PropertiesParser.getProperties(PropertiesSettings.PRODUCER_FILE_NAME);
        if (props == null) {
            log.error("can't loadspecified file " + PropertiesSettings.PRODUCER_FILE_NAME);
           return;
        }
        //set the producer configurationproperties
        ProducerConfig config = newProducerConfig(props);
        Producer<String, String> producer= new Producer<String, String>(config);

        //Send the data
        int count = 1;
        KeyedMessage<String, String>data;
        while (count < 100) {
            String sign = "*";
            String ip = "192.168.2."+ rnd.nextInt(255);
            StringBuffer sb = newStringBuffer();
            for (int i = 0; i < count; i++){
                sb.append(sign);
            }
            log.info("set data:" +sb);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            data = new KeyedMessage<String,String>(PropertiesSettings.TOPIC_NAME, ip, sb.toString());
            producer.send(data);
            count++;
        }
        producer.close();
    }

    public void run() {
        sendData();
    }

    public static void main(String[] args) {
        new ClusterProducer().sendData();
    }
}

定于Consumer获取端,获取对应topic的数据:

public class Consumerextends Thread {
    private static final Log log =LogFactory.getLog(Consumer.class);
    private final ConsumerConnector consumer;
    private final String topic;

    public Consumer(String topic) {
        consumer =kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfigcreateConsumerConfig() {
        Properties props = new Properties();
       props.put("zookeeper.connect", KafkaProperties.zkConnect);
        props.put("group.id",KafkaProperties.groupId);
       props.put("zookeeper.session.timeout.ms", "400");
       props.put("zookeeper.sync.time.ms", "200");
       props.put("auto.commit.interval.ms", "1000");

        return new ConsumerConfig(props);

    }

    public void run() {
        Map<String, Integer>topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, newInteger(1));
        Map<String,List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]>stream = consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]>it = stream.iterator();
        while (it.hasNext()) {
            log.info("+message: " +new String(it.next().message()));
        }
    }

    public static void main(String[] args) {
        Consumer client = new Consumer("cluster_statistics_topic");
        client.run();
    }
}

分别执行上面的代码,可以发送或者得到对应topic信息。

Enjoy yourself!(*^__^*) ……

时间: 2024-11-10 04:20:04

Kafka编程实例的相关文章

HBase编程实例

摘要:在前文中安装了Hbase,通过Hbase shell可以进行一些操作,但是和实际的编程实例联系起来不方便,因此本文介绍有关Hbase编程的实例. 一.使用Eclipse开发HBase应用程序 1,在Eclipse中新建一个Java Project,命名为HBaseTest,然后右键Properties中选择Java Build Path,选择Add External Jars,将HBase/lib目录下的jar包导入进来. 2,在工程根目录下创建Conf文件夹,将HBase/Conf下的h

c编程实例:809*??=800*??+9*???+1

程序代码: #include<stdio.h>#include<stdio.h>void main(){ int c; int i,j,k; printf("start computing!!!"); for(i=10;i<100;i++){ for(j=100;j<1000;j++){ c=i*809-1-9*j; k=c%800; if(k==0){ k=c/800; if(k>10&&k<100) printf(&q

python 编程实例 1

#python 100 例 1.py #题目:有 1.2.3.4 个数字,能组成多少个互不相同且无重复数字的三位数?都是多 #少? a = {} c = 1 for i in range(1,5): for j in range(1,5): for k in range(1,5): if (i != j,i !=k ,j!= k): #                print (i,j,k) a[c]=(i,j,k) c = c + 1 print (a) #把结果输入到字典 a中,并用c记数

python 编程实例 2

#python 100 2.py #题目:企业发放的奖金根据利润提成.利润 (I)低于或等于 10 万元时,奖金可提 10%:利 #润高 于 10 万元,低于 20 万元时,低于 10 万元的部分按 10%提成,高于 10 万元的部分, #可可提  成 7.5%:20 万到 40 万之间时,高于 20 万元的部分,可提成 5%:40 万到 60 万之间 #时高于 40 万元的部分,可提成 3%:60 万到 100 万之间时,高于 60 万元的部分,可提成 #1.5%,高于 100 万元时,超过

python 编程实例 3

#python 100 例 3.py #题目:一个整数,它加上100后是一个完全平方数,再加上168又是一个完全平方数.求这个数. import math for x in range(1,100000): y = int(math.sqrt(x + 100)) z = int(math.sqrt(x + 268)) if ( x + 100 == y*y ) and ( x + 268 == z*z): print (x) python 编程实例 3,布布扣,bubuko.com

python 编程实例 4

#python 100例 4.py #输入一个日期,判断这一天是一年中的第几天. import time #print (time.strftime("%Y%m%d%H%M%S")) #当前时间 #print (time.time()) #当前时间的秒数,从1970年1月1日开始计算 b = input("输入一个日期如(20121012): ") #输入要计算的日期 a = b[0:4]+'0101' #获取输入日期的年份并加上1月1日,从当年的1月1日开始计算

python 编程实例 5

#题目:输入三个整数 x,y,z,请把这三个数由小到大输出. #1.程序分析:我们想办法把最小的数放到 x 上,先将 x 与 y 进行比较,如果 x>y 则将 x 与 y #的值交换,再比较X 和Z比较. x = int(input("输入一个正整数X:")) y = int(input("输入一个正整数Y:")) z = int(input("输入一个正整数Z:")) if x >y: if x > z: if y >z

python 编程实例 6

#python 100 例 6.py #输出9*9口决 for i in range(1,10): for j in range(1,10): a = i * j print (i ,"*",j ,"=",a ) python 编程实例 6,布布扣,bubuko.com

python 编程实例 7

#python 100 例 9.py #用*打印出一个棱形 a = int(input("biangchang: ")) #获取由几个* 边长的棱形 i = 1 j = 1 while i<a+1: print ("   "*(a-i)," * "*(2*i-1)) i = i+1 while j<a+1: print ("   "*j," * "*(2*(a-j)-1)) j = j+1 py