Kafka-0.10.0.0入门

搭建环境略(伪集群即可以),但要注意Kafka的配置必须配置的,少配了也一样可以用,但是只能单机使用,外部机器无法连接,网上也有说。

host.name=192.168.1.30

advertised.host.name=192.168.1.30

interfaceshost.name=192.168.1.30

0.10.0.0应该和0.9一样缺少log4j的依赖,不能直接log4j TO kafka。 想用的可以依赖kafka-log4j-appender此包即可,或者flume协同

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-log4j-appender</artifactId>

<version>0.10.0.0</version>

</dependency>

?

客户端命令:

消息者(全)

kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic
--from-beginning

生产者

????????bin/kafka-console-producer.sh?--broker-list?localhost:9092?--topic?my-topic

?

JAVA生产、消费(直接上官方例子)

生产都

  1. Properties props = new Properties();
  2. props.put("bootstrap.servers", "localhost:9092");
  3. props.put("acks", "all");
  4. props.put("retries ", 1);
  5. props.put("buffer.memory", 33554432);
  6. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  7. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  8. ?
  9. Producer<String, String> producer = new KafkaProducer<>(props);
  10. for(int i = 0; i < 100; i++)
  11. ????producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
  12. producer.close();

消费者

  1. ?
  2. volatile RUNNING =true;
  3. ?
  4. Properties props = new Properties();
  5. props.put("bootstrap.servers", "localhost:9092");
  6. props.put("group.id", "test");//不同ID 可以同时订阅消息
  7. props.put("enable.auto.commit", "false");
  8. props.put("auto.commit.interval.ms", "1000");
  9. props.put("session.timeout.ms", "30000");
  10. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  11. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  12. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  13. consumer.subscribe(Arrays.asList("foo", "bar" , " my-topic "));//订阅TOPIC
  14. try {
  15. ????while(RUNNING) {//轮询
  16. ????????ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
  17. ????????for (TopicPartition partition : records.partitions()) {
  18. ????????????List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
  19. ????????????for (ConsumerRecord<String, String> record : partitionRecords) {
  20. ??? ?//可以自定义Handler,处理对应的TOPIC消息(partitionRecords.key())
  21. ????????????????System.out.println(record.offset() + ": " + record.value());
  22. ????????????}
  23. ????????????consumer.commitSync();//同步
  24. ????????}
  25. ????}
  26. } finally {
  27. ??consumer.close();
  28. }

?

总结:已经服务化的东西,只能说,一:学学配置,二:学学使用方法,从中增加需要的逻辑。一般都是黑箱,要改底层,得先遇到场景。用起来比较容易

时间: 2024-10-07 11:24:58

Kafka-0.10.0.0入门的相关文章

Kafka 0.10.1.0 Cluster的搭建和Topic简单操作实验

[kafka cluster机器]:机器名称 用户名称sht-sgmhadoopdn-01/02/03 root [安装目录]: /root/learnproject/app 1.将scala文件夹同步到集群其他机器(scala 2.11版本,可单独下载解压) [[email protected] app]# scp -r scala [email protected]:/root/learnproject/app/ [[email protected] app]# scp -r scala [

kafka 0.10.2 消息消费者

package cn.xiaojf.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Kaf

kafka 0.10.2 消息生产者

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by 肖建锋 on

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

Kafka 0.10 KafkaConsumer流程简述

ConsumerConfig.scala 储存Consumer的配置 按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的. 1.从poll开始 消费的规则如下: 一个partition只能被同一个ConsumersGroup的一个线程所消费. 线程数小于partition数,某些线程会消费多个partition. 线程数等于partition数,一个线程正好消费一个线程. 当添加消费者线程时,会触发rebalance,partition的分配发送变化

Kafka 0.10问题点滴

15.如何消费内部topic: __consumer_offsets 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter 最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[].核心代码如下: // com.sina.mis.app.ConsumerInnerTopic             ConsumerRecords<byte[], byte[]> records = consumer.poll(512);    

Kafka版本升级 ( 0.10.0 -&gt; 0.10.2 )

升级Kafka集群的版本其实很简单,核心步骤只需要4步,但是我们需要在升级的过程中确保每一步操作都不会"打扰"到producer和consumer的正常运转.为此,笔者在本机搭了一个测试环境进行实际的版本升级实验.在开始之前,简要介绍一下测试环境的部署情况及目标:Kafka 0.10.0.0 双broker测试环境,而目标是把该集群升级到0.10.2版本 两个broker启动时分别读取server.properties和server2.properties. 一.启动测试环境打开两个终

关于CDH5.11.0自带kafka 0.10 bootstrap-server 无法消费

近日需要在项目用到kafka,然后本地使用cdh集成的kafka 进行安装调试,以及些样例代码,sparkstreaming 相关调用kafka 的代码使用的原始的api 而没有走zookeeper,虽然消费者能启动,但无法消费内容. 开始我使用shell下的zk方式是可以消费误认为kafka也是没有问题的,后来想了一下是否shell也可能使用api来访问看下情况. 之后我使用bootstrap-server的方式在shell下进行测试,果然后些的样例代码一样,无法消费. 之后就是无脑的百度,谷

kafka之consumer参数auto.offset.reset 0.10+

https://blog.csdn.net/dingding_ting/article/details/84862776 https://blog.csdn.net/xianpanjia4616/article/details/84347087 kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中) kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest

Kafka0.10.2.0分布式集群安装

一.依赖文件安装 1.1 JDK 参见博文:http://www.cnblogs.com/liugh/p/6623530.html 1.2 Scala 参见博文:http://www.cnblogs.com/liugh/p/6624491.html 1.3 Zookeeper 参见博文:http://www.cnblogs.com/liugh/p/6671460.html 二.文件准备 2.1 文件名称 kafka_2.11-0.10.2.0.tgz 2.2 下载地址 http://kafka.