我使用的kafka版本是:0.7.2
jdk版本是:1.6.0_20
http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。
分布式发布订阅消息系统 Kafka 架构设计 http://www.linuxidc.com/Linux/2013-11/92751.htm
Apache Kafka 代码实例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm
Kafka使用入门教程 http://www.linuxidc.com/Linux/2014-07/104470.htm
Producer Code
import java.util.*; import kafka.message.Message; import kafka.producer.ProducerConfig; import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.ProducerData; public class ProducerSample { public static void main(String[] args) { ProducerSample ps = new ProducerSample(); Properties props = new Properties(); props.put("zk.connect", "127.0.0.1:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-message2"); producer.send(data); producer.close(); } } Consumer Code import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; import kafka.message.MessageAndMetadata; public class ConsumerSample { public static void main(String[] args) { // specify some consumer properties Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("zk.connectiontimeout.ms", "1000000"); props.put("groupid", "test_group"); // Create the connection to the cluster ConsumerConfig consumerConfig = new ConsumerConfig(props); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); // create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume HashMap<String, Integer> map = new HashMap<String, Integer>(); map.put("test-topic", 4); Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map); List<KafkaStream<Message>> streams = topicMessageStreams.get("test-topic"); // create list of 4 threads to consume from each of the partitions ExecutorService executor = Executors.newFixedThreadPool(4); // consume the messages in the threads for (final KafkaStream<Message> stream : streams) { executor.submit(new Runnable() { public void run() { for (MessageAndMetadata msgAndMetadata : stream) { // process message (msgAndMetadata.message()) System.out.println("topic: " + msgAndMetadata.topic()); Message message = (Message) msgAndMetadata.message(); ByteBuffer buffer = message.payload(); <SPAN style="WHITE-SPACE: pre"> </SPAN>byte[] bytes = new byte[message.payloadSize()]; buffer.get(bytes); String tmp = new String(bytes); System.out.println("message content: " + tmp); } } }); } } }
分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码
运行ProducerSample:
运行ConsumerSample:
时间: 2024-11-09 03:13:19