Kafka之生产者消费者示例

本例以kafka2.10_0.10.0.0为例,不同版本的kafka Java api有些区别!

增加maven依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.0.0</version>
</dependency>

生产者

package com.zns.kafka;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducerTest {
    public static String topicName = "test";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");// 9092是kafka默认端口
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 1; i <= 10; i++) {
            ProducerRecord<String, String> message = new ProducerRecord<String, String>(topicName, "hello world " + i);
            producer.send(message);
            producer.flush();
        }
    }
}

消费者

package com.zns.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumerTest {
    public static String topicName = "test";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:2181");// 2181是zookeeper默认端口
        props.put("group.id", "test-group");
        props.put("zookeeper.session.timeout.ms", "100000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,
                keyDecoder, valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(topicName).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("收到消息:" + it.next().message());
        }
    }
}

确保启动运行了zookeeper和kafka

先后启动运行生产者和消费者,可以看到消费者端接收到了消息...

原文地址:https://www.cnblogs.com/zengnansheng/p/10389745.html

时间: 2024-10-11 12:25:35

Kafka之生产者消费者示例的相关文章

python自动化--语言基础线程、生产者消费者示例

进程与线程的区别:进程不共享空间,线程共享地址空间 线程共享空间优缺点:优点:多线程给用户的体验好些,处理速度快些缺点:共享地址空间相互影响 1 import threading 2 import time 3 4 class Mythreading(threading.Thread): 5 def __init__(self,threadID,name,counter): 6 threading.Thread.__init__(self) #固定格式 7 self.threadID = thr

基于Kafka的生产者消费者消息处理本地调试

(尊重劳动成果,转载请注明出处:http://blog.csdn.net/qq_25827845/article/details/68174111冷血之心的博客) Kafka下载地址:http://download.csdn.net/download/qq_25827845/9798176 安装解压即可 配置修改zookeeper.properties 与 server.properties修改为本地路径,如图所示: 将config文件夹中的zookeeper.properties 与 serv

多线程模拟生产者消费者示例之BlockQuue

public class Test { public static void main(String[] args){ //创建一个阻塞队列,边界为1 BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1); new Thread(new PThread(queue)).start(); new Thread(new CThread(queue)).start(); }} /** * 生产者 */class P

Linux组件封装(五)一个生产者消费者问题示例

生产者消费者问题是计算机中一类重要的模型,主要描述的是:生产者往缓冲区中放入产品.消费者取走产品.生产者和消费者指的可以是线程也可以是进程. 生产者消费者问题的难点在于: 为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区,它就是所谓的临界资源. 生产者往缓冲区放物品时,如果缓冲区已满,那么需要等待,一直到消费者取走产品为止. 消费者取走产品时,如果没有物品,需要等待,一直到有生产者放入为止. 第一个问题属于互斥问题,我们需要使用一把互斥锁,来实现对缓冲区的安全访问. 后两个属于同步问题,两类

java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4

package org.rui.thread.block2; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedB

【Apache Kafka】Kafka安装及简单示例

(一)Apache Kafka安装 1.安装环境与前提条件 ??安装环境:Ubuntu16.04 ??前提条件: ubuntu系统下安装好jdk 1.8以上版本,正确配置环境变量 ubuntu系统下安装好scala 2.11版本 安装ZooKeeper(注:kafka自带一个Zookeeper服务,如果不单独安装,也可以使用自带的ZK) 2.安装步骤 ??Apache基金会开源的这些软件基本上安装都比较方便,只需要下载.解压.配置环境变量三步即可完成,kafka也一样,官网选择对应版本下载后直接

并发编程基础之生产者消费者模式

一:概念 生产者消费者模式是java并发编程中很经典的并发情况,首先有一个大的容器,生产者put元素到 容器中,消费者take元素出来,如果元素的数量超过容器的容量时,生产者不能再往容器中put元素 ,处于阻塞状态,如果元素的数量等于0,则消费者不能在从容器中take数据,处于阻塞状态. 二:示例 /** * */ package com.hlcui.main; import java.util.LinkedList; import java.util.concurrent.ExecutorSe

生产者消费者模式(转)

本文转载自博文系列架构设计:生产者/消费者模式.文中对原文格式进行了稍加整理. 概述 今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场.由于该模式很重要,打算分几个帖子来介绍.今天这个帖子先来扫盲一把.如果你对这个模式已经比较了解,请跳过本扫盲帖,直接看下一个帖子(关于该模式的具体应用) . 看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名<Design Patterns: E

11.python并发入门(part8 基于线程队列实现生产者消费者模型)

一.什么是生产者消费者模型? 生产者就是生产数据的线程,消费者指的就是消费数据的线程. 在多线程开发过程中,生产者的速度比消费者的速度快,那么生产者就必须等待消费者把数据处理完,生产者才会产生新的数据,相对的,如果消费者处理数据的速度大于生产者,那么消费者就必须等待生产者. 为了解决这种问题,就有了生产者消费者模型. 生产者与消费者模型,是通过一个容器,来解决生产者和消费者之间的耦合性问题,生产者和消费者之间并不会直接通信,这样生产者就无需等待消费者处理完数据,生产者可以直接把数据扔给队列,这个