Return: Map[TopicPartition, Long]
Code:
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPara("bootstrap.servers").toString)
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaPara("group.id").toString)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
val kc: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
kc.partitionsFor(new String(topic)).asScala.map{partitionInfo =>
val topicPartition = new TopicPartition(topic, partitionInfo.partition())
kc.assign(Seq(topicPartition).asJava)
kc.seekToBeginning(Seq(topicPartition).asJava)
topicPartition -> kc.position(topicPartition)
}.toMap
Key point: Scala code call Java lib
原文地址:https://www.cnblogs.com/yjyyjy/p/10678438.html