引入maven包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.0.0</version> </dependency>
一、同步发送消息
package com.example.demo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class SynProducer { private static Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "47.52.199.51:9092"); props.put("acks", "all"); props.put("retries", 2); props.put("batch.size", 16384); props.put("linger.ms", 1000); props.put("buffer.memory", 33554432); props.put("client.id", "producer-syn-1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(getProps()); for(int i=0; i< 1000; i++){ ProducerRecord<String, String> record = new ProducerRecord<>("test-1", "topic_"+i,"test-"+i); Future<RecordMetadata> metadataFuture = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = metadataFuture.get(); System.out.println("发送成功!"); System.out.println("topic:"+recordMetadata.topic()); System.out.println("partition:"+recordMetadata.partition()); System.out.println("offset:"+recordMetadata.offset()); } catch (InterruptedException|ExecutionException e) { System.out.println("发送失败!"); e.printStackTrace(); } } } }
原文地址:https://www.cnblogs.com/wangzhuxing/p/10099894.html
时间: 2024-10-09 16:04:09