Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code

Return: Map[TopicPartition, Long]

Code:

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPara("bootstrap.servers").toString)
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaPara("group.id").toString)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

val kc: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

kc.partitionsFor(new String(topic)).asScala.map{partitionInfo =>

val topicPartition = new TopicPartition(topic, partitionInfo.partition())
kc.assign(Seq(topicPartition).asJava)
kc.seekToBeginning(Seq(topicPartition).asJava)
topicPartition ->  kc.position(topicPartition)
}.toMap

Key point: Scala code call Java lib

原文地址:https://www.cnblogs.com/yjyyjy/p/10678438.html

时间: 2024-11-07 13:01:39

Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code的相关文章

Kafka版本升级 ( 0.10.0 -> 0.10.2 )

升级Kafka集群的版本其实很简单,核心步骤只需要4步,但是我们需要在升级的过程中确保每一步操作都不会"打扰"到producer和consumer的正常运转.为此,笔者在本机搭了一个测试环境进行实际的版本升级实验.在开始之前,简要介绍一下测试环境的部署情况及目标:Kafka 0.10.0.0 双broker测试环境,而目标是把该集群升级到0.10.2版本 两个broker启动时分别读取server.properties和server2.properties. 一.启动测试环境打开两个终

kafka0.9.0及0.10.0配置属性

问题导读 1.borker包含哪些属性?2.Producer包含哪些属性?3.Consumer如何配置? borker(0.9.0及0.10.0)配置Kafka日志本身是由多个日志段组成(log segment).一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据 * borker配置就是指配置server.properties文件 * 最小配置 通常情况下需要在减压缩kafka后,修改config/server.proper

【实战笔记】锦标赛0.10$+0.01$ 第446名

手牌A8s,筹码只有3个大盲了,在cut-off位置,前面所有人都fold,只有上家miausita跟了一个大盲,这个时候我选择了all-in搏一把,果然如预期,除了miausita其他人都fold了. 结局是悲催的,被人主宰踢脚然后出局了.现在回顾这个牌局,总的是没有什么遗憾的,运气不在我这一边.以事后诸葛亮的角度,这个时候ante是60,我手里筹码还有1960,坚持10个hand不是问题,A8s的踢脚还是太小,应该等到一副对子或者有个大点的踢脚才放手一搏的. ----------------

Zend Studio9.0 10.0 11.0 所有版本破解补丁 无限期试用 注册码方法

破解Zend Studio三步曲 破解Zend Studio步骤1:关闭Zend Studio 破解Zend Studio步骤2:删除以下文件或者文件夹 文件夹:C:\Users\Administrator\.zend文件夹:C:\Users\Administrator\.ZendStudio文件:C:\Users\Administrator\.zs 破解Zend Studio步骤3:启动Zend Studio 完美解决Zend Studio破解.注册码.试用问题! 方案来源:http://my

Kafka 0.10问题点滴

15.如何消费内部topic: __consumer_offsets 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter 最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[].核心代码如下: // com.sina.mis.app.ConsumerInnerTopic             ConsumerRecords<byte[], byte[]> records = consumer.poll(512);    

Kafka-0.10.0.0入门

搭建环境略(伪集群即可以),但要注意Kafka的配置必须配置的,少配了也一样可以用,但是只能单机使用,外部机器无法连接,网上也有说. host.name=192.168.1.30 advertised.host.name=192.168.1.30 interfaceshost.name=192.168.1.30 0.10.0.0应该和0.9一样缺少log4j的依赖,不能直接log4j TO kafka. 想用的可以依赖kafka-log4j-appender此包即可,或者flume协同 <dep

在vs2010中编译log4cxx-0.10.0详细方法(从下载、编译、解决错误详细介绍)(转载)

本文一共包含了17个步骤,按照下面的步骤就可以完成vs2010中编译log4cxx的工作了. 1. 下载 log4cxx 以及 apr 和 apr-util 源码: a) http://www.apache.org/dyn/closer.cgi/logging/log4cxx/0.10.0/apache-log4cxx-0.10.0.zip b) http://archive.apache.org/dist/apr/apr-1.2.11-win32-src.zip c) http://archi

关于storm0.10.0版本的一个小bug

最近搭建了一个storm环境,发现在提交一个topology之后,supervisor总是会无缘无故的死掉,日志如下 [2000] the maxSleepTimeMs [60000] the maxRetries [5]2016-04-09 16:30:05.719 b.s.event [ERROR] Error when processing eventjava.lang.RuntimeException: org.apache.thrift7.transport.TTransportExc

在vs2010中编译log4cxx-0.10.0详细方法

本文一共包含了17个步骤,按照下面的步骤就可以完成vs2010中编译log4cxx的工作了. 1. 下载 log4cxx 以及 apr 和 apr-util 源码: a) http://www.apache.org/dyn/closer.cgi/logging/log4cxx/0.10.0/apache-log4cxx-0.10.0.zip b) http://archive.apache.org/dist/apr/apr-1.2.11-win32-src.zip c) http://archi