Kafka的Producer和Consumer的示例

我使用的kafka版本是:0.7.2

jdk版本是:1.6.0_20

http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。

分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm

Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

Kafka使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm

Producer Code

import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;

public class ProducerSample {

 public static void main(String[] args) {
  ProducerSample ps = new ProducerSample();

  Properties props = new Properties();
  props.put("zk.connect", "127.0.0.1:2181");
  props.put("serializer.class", "kafka.serializer.StringEncoder");

  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message2");
  producer.send(data);
  producer.close();
 }
}

Consumer Code

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

public class ConsumerSample {

 public static void main(String[] args) {
                // specify some consumer properties
  Properties props = new Properties();
  props.put("zk.connect", "localhost:2181");
  props.put("zk.connectiontimeout.ms", "1000000");
  props.put("groupid", "test_group");

                // Create the connection to the cluster
  ConsumerConfig consumerConfig = new ConsumerConfig(props);
  ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

                // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume
  HashMap<String, Integer> map = new HashMap<String, Integer>();
  map.put("test-topic", 4);
  Map<String, List<KafkaStream<Message>>> topicMessageStreams =
    consumerConnector.createMessageStreams(map);
  List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic");

                // create list of 4 threads to consume from each of the partitions
  ExecutorService executor = Executors.newFixedThreadPool(4);

                // consume the messages in the threads
  for (final KafkaStream<Message> stream : streams) {
   executor.submit(new Runnable() {
    public void run() {
     for (MessageAndMetadata msgAndMetadata : stream) {
      // process message (msgAndMetadata.message())
      System.out.println("topic: " + msgAndMetadata.topic());
      Message message = (Message) msgAndMetadata.message();
      ByteBuffer buffer = message.payload();
     <SPAN style="WHITE-SPACE: pre"> </SPAN>byte[] bytes = new byte[message.payloadSize()];
      buffer.get(bytes);
      String tmp = new String(bytes);
      System.out.println("message content: " + tmp);
     }
    }
   });
  }

 }
}

分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码

运行ProducerSample:

运行ConsumerSample:

时间: 2024-11-09 03:13:19

Kafka的Producer和Consumer的示例的相关文章

Kafka的Producer和Consumer源码学习

先解释下两个概念: high watermark (HW) 它表示已经被commited的最后一个message offset(所谓commited, 应该是ISR中所有replica都已写入),HW以下的消息都已被ISR中各个replica同步,从而保持一致.HW以上的消息可能是脏数据:部分replica写成功,但最终失败了. Kafka Partition:  1> 均衡各个Broker之间的数据和请求压力: 2> 分摊处理不同的消费者进程: 3> 在partition内可以保证局部

kafka java producer consumer实践

java提供了方便的API进行kafka消息处理.简单总结一下: 学习参考:http://www.itnose.net/st/6095038.html POM配置(关于LOG4J的配置参看 http://www.cnblogs.com/huayu0815/p/5341712.html) <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka

springboot kafka集成(实现producer和consumer)

本文介绍如何在springboot项目中集成kafka收发message. 1.先解决依赖 springboot相关的依赖我们就不提了,和kafka相关的只依赖一个spring-kafka集成包 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE<

Kafka之Producer

通过https://www.cnblogs.com/tree1123/p/11243668.html 已经对consumer有了一定的了解.producer比consumer要简单一些. 一.旧版本producer 0.9.0.0版本以前,是由scala编写的旧版本producer. 入口类:kafka.producer.Producer 代码示例: Properties properties = new Properties(); properties.put("metadata.broker

Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 本文的图片是通过PPT截图出的,读者如果修改意见请联系我 一.Consumer为何需要实现多线程 假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息.该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用AP

Kafka学习-Producer和Customer

在上一篇kafka入门的基础之上,本篇主要介绍Kafka的生产者和消费者. Kafka 生产者 kafka Producer发布消息记录到Kakfa集群.生产者是线程安全的,在线程之间共享生产者实例.一个简单的例子,使用producer发送一个有序的key/value(键值对),放到java的main方法里就能直接运行, public class ProducerDemo { private static final String KAFKA_TOPIC="kafka-topic";

Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题.有如下几个常见的实现方法: 1. wait()/notify() 2. lock & condition 3. BlockingQueue 下面来逐一分析. 1. wait()/notify() 第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行:这也是最原始的实现. 1 public class WaitNotifyBroker<T> implements Broker&

.net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(二)

依据Partition和Consumer的Rebalance策略,找到Kafka.Client Rebalance代码块,还原本地环境,跟踪调试,发现自定义Consumer Group 的Consumer并没有分配到PartionID,如下图. 据此,基本就可以定位到不同组Consumer无法覆盖Partition的问题根源了. 仔细阅读Rebalance代码,发现Kafka.Client 在获取consumer时,并没有根据Group做筛选,获取到的是所有组的Consumer,如下图 (此处只

057 Java中kafka的Producer程序实现

1.需要启动的服务 这里启动的端口是9092. bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibeifeng.com:2181/kafka 2.producer的程序 1 package com.jun.it; 2 import kafka.javaapi.producer.Producer; 3 import kafka.producer.KeyedMessage; 4 import kafk