搭建环境略(伪集群即可以),但要注意Kafka的配置必须配置的,少配了也一样可以用,但是只能单机使用,外部机器无法连接,网上也有说。
host.name=192.168.1.30
advertised.host.name=192.168.1.30
interfaceshost.name=192.168.1.30
0.10.0.0应该和0.9一样缺少log4j的依赖,不能直接log4j TO kafka。 想用的可以依赖kafka-log4j-appender此包即可,或者flume协同
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-log4j-appender</artifactId>
<version>0.10.0.0</version>
</dependency>
?
客户端命令:
消息者(全)
kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic
--from-beginning
生产者
????????bin/kafka-console-producer.sh?--broker-list?localhost:9092?--topic?my-topic
?
JAVA生产、消费(直接上官方例子)
生产都
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("acks", "all");
- props.put("retries ", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- ?
- Producer<String, String> producer = new KafkaProducer<>(props);
- for(int i = 0; i < 100; i++)
- ????producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
- producer.close();
消费者
- ?
- volatile RUNNING =true;
- ?
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "test");//不同ID 可以同时订阅消息
- props.put("enable.auto.commit", "false");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList("foo", "bar" , " my-topic "));//订阅TOPIC
- try {
- ????while(RUNNING) {//轮询
- ????????ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
- ????????for (TopicPartition partition : records.partitions()) {
- ????????????List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
- ????????????for (ConsumerRecord<String, String> record : partitionRecords) {
- ??? ?//可以自定义Handler,处理对应的TOPIC消息(partitionRecords.key())
- ????????????????System.out.println(record.offset() + ": " + record.value());
- ????????????}
- ????????????consumer.commitSync();//同步
- ????????}
- ????}
- } finally {
- ??consumer.close();
- }
?
总结:已经服务化的东西,只能说,一:学学配置,二:学学使用方法,从中增加需要的逻辑。一般都是黑箱,要改底层,得先遇到场景。用起来比较容易
时间: 2024-10-07 11:24:58