kafka producer consumer

package demo;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class producer {
    private final Producer<String, String> producer;
    public final static String TOPIC = "test";

    private producer() {
        Properties props = new Properties();
        // 此处配置的是kafka的端口
        props.put("metadata.broker.list", "192.168.152.20:9092");

        // 配置value的序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        // 配置key的序列化类
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");

        // request.required.acks
        // 0, which means that the producer never waits for an acknowledgement
        // from the broker (the same behavior as 0.7). This option provides the
        // lowest latency but the weakest durability guarantees (some data will
        // be lost when a server fails).
        // 1, which means that the producer gets an acknowledgement after the
        // leader replica has received the data. This option provides better
        // durability as the client waits until the server acknowledges the
        // request as successful (only messages that were written to the
        // now-dead leader but not yet replicated will be lost).
        // -1, which means that the producer gets an acknowledgement after all
        // in-sync replicas have received the data. This option provides the
        // best durability, we guarantee that no messages will be lost as long
        // as at least one in sync replica remains.
        props.put("request.required.acks", "-1");

        producer = new Producer<String, String>(new ProducerConfig(props));
    }

    void produce() {
        int messageNo = 1000;
        final int COUNT = 10000;

        while (messageNo < COUNT) {
            String key = String.valueOf(messageNo);
            String data = "hello kafka message " + key;
            producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
            System.out.println(data);
            messageNo++;
        }
    }

    public static void main(String[] args) {
        new producer().produce();
    }
}
package demo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class consumer {
    private final ConsumerConnector consumer;

    private consumer() {
        Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", "192.168.152.20:2181");

        //group 代表一个消费组
        props.put("group.id", "jd-group");

        //zk连接超时
        props.put("zookeeper.session.timeout.ms", "4000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        //序列化类
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        ConsumerConfig config = new ConsumerConfig(props);

        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    }

    void consume() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(producer.TOPIC, new Integer(1));

        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

        Map<String, List<KafkaStream<String, String>>> consumerMap =
                consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(producer.TOPIC).get(0);
        ConsumerIterator<String, String> it = stream.iterator();
        while (it.hasNext())
            System.out.println(it.next().message());
    }

    public static void main(String[] args) {
        new consumer().consume();
    }
}

其中,zookeeper的地址,在kafka config/consummer.properties的目录下,要配置下

时间: 2024-11-03 11:52:49

kafka producer consumer的相关文章

Kafka 学习笔记之 Producer/Consumer (Scala)

既然Kafka使用Scala写的,最近也在慢慢学习Scala的语法,虽然还比较生疏,但是还是想尝试下用Scala实现Producer和Consumer,并且用HashPartitioner实现消息根据key路由到指定的partition. Producer: import java.util.Properties import kafka.producer.ProducerConfig import kafka.producer.Producer import kafka.producer.Ke

kafka java producer consumer实践

java提供了方便的API进行kafka消息处理.简单总结一下: 学习参考:http://www.itnose.net/st/6095038.html POM配置(关于LOG4J的配置参看 http://www.cnblogs.com/huayu0815/p/5341712.html) <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka

kafka producer源码

producer接口: /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this

Flink kafka producer with transaction support

Background In flink 1.5 above, it provides a new kafka producer implementation: FlinkKafkaProducer011, aligning with kafka 0.11 above that supports transaction. Kafka transaction allows multiple kafka messages sent by producer to deliver in an atomic

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

Apache Kafka Producer For Beginners

在我们上一篇Kafka教程中,我们讨论了Kafka Cluster.今天,我们将通过示例讨论Kafka Producer.此外,我们将看到KafkaProducer API和Producer API.此外,我们将学习Kafka Producer中的配置设置.最后,我们将在Kafka Producer教程中讨论简单的生产者应用程序.为了将消息发布到Apache Kafka主题,我们使用Kafka Producer. 那么,让我们详细探讨Apache Kafka Producer. 卡夫卡初学者制片

Kafka Producer 拦截器

Kafka中的拦截器(Interceptor)是0.10.x.x版本引入的一个功能,一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器.本篇主要讲述的是Kafka Producer端的拦截器,它主要用来对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理. 使用Kafka Producer端的拦截器非常简单,主要是实现ProducerInterceptor接口,此接口包含4个方法: ProducerRecord<K,

五 、 Kafka producer 拦截器(interceptor) 和 六 、Kafka Streaming案例

五 Kafka producer 拦截器(interceptor) 5.1 拦截器原理 Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定 制化控制逻辑. 对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会 对消息做一些定制化需求,比如修改消息等.同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(in

JAVA多线程(七)模式-Producer Consumer

Producer Consumer 生产者创建数据,通过中介控制流量并安全传递给消费者. 适用环境 生产者生产数据的速度与消费者处理数据的速度不一致,中介者通过缓存和阻塞对消费者的数据压力进行调整. 样例 4生产者生产产品,放入市场,2消费者消费. 产品 package ProducerConsumer; public class Product { private String prdId=null; public Product(String prdId) { this.prdId=prdI