Kafka 0.10问题点滴

15.如何消费内部topic: __consumer_offsets

  • 主要是要让它来格式化:GroupMetadataManager.OffsetsMessageFormatter
  • 最后用看了它的源码,把这部分挑选出来,自己解析了得到的byte[]。核心代码如下:
// com.sina.mis.app.ConsumerInnerTopic
            ConsumerRecords<byte[], byte[]> records = consumer.poll(512);            for (ConsumerRecord<byte[], byte[]> record : records) {
                Object offsetKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));                if (offsetKey instanceof OffsetKey) {
                    GroupTopicPartition groupTopicPartition = ((OffsetKey) offsetKey).key();
                    OffsetAndMetadata value = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(record.value()));
                    LOG.info(groupTopicPartition.toString() + "---:---" + value);
                } else {
                    LOG.info("############:{}", offsetKey);
                }
            }

1.For Kafka 0.8.2.x

#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --zookeeper localhost:2181 --topic __consumer_offsets

2.For Kafka 0.9.x.x and 0.10.0.0

#Create consumer configecho "exclude.internal.topics=false" > /tmp/consumer.config#Only consume the latest consumer offsets./kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --zookeeper 10.39.40.98:2181/kafka10 --topic __consumer_offsets

Since version 0.8.2, kafka has had the ability to store consumer offsets in an internal compacted topic called __consumer_offsets

14.Kafka metrics

Kafka 用 Yammer Metrics来存储Server、Client的数据。可以使用插件式的方式获取这些数据,写入到CSV文件。

Kafka 实现了 KafkaCSVMetricsReporter.scala,可以 将metrics写入到CSV文件。

由于没有实现写入ganglia的实现类,所以无法直接从Kafka将metrics写入到ganglia。

document

13.snappy的压缩率

为什么某个topic的HDFS的数据多余Kafka自己统计的流量40%左右。

sina的KafkaProxy都使用了snappy压缩后入kafka。

  • 猜想 30%-40%
  • 需要测试一下:找一批HDFS的文件,写入Kafka,消费出来,写成文件,看看大小差别。

12.Kafka的Consumer读取数据的时候,读哪个partition

High level Consumer的API,默认以Range的方式分配,还有另外一个是RoundRobin。

11.Kafka的Producer发送数据的时候,发送给哪个partition

这是有DefaultPartitioner决定的。
If a partition is specified in the record, use it.

  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose a partition in a round-robin fashion

中文:

  • 有key就hash
  • 没key就Round-robin

0.8.0 版本在没key的时候,是Random的方式。

10.Linkedin的集群GC情况

90%的broker GC暂停时间为21ms左右。每秒进行的young GC小于1次

9.解释一下什么是ZoroCopy(sendfile技术)

传统网络IO流程,一次传送过程:

  1. 从Disk把数据读到内核区的Read Buffer。
  2. 把数据从内核区到用户区Buffer。
  3. 再把数据写入到内核区的Socket Buffer上。
  4. 把数据从Socket Buffer复制到网卡的NIC Buffer上。

Kafka少了中间两步,这就是sendfile技术:

8.kafka如何做到大吞吐量、强大消息堆积能力等特性

  1. 依赖OS文件系统的页缓存 (当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小。
    总结:依赖OS的页缓存能大量减少IO,高效利用内存来作为缓存)
  2. 不使用JVM缓存数据, 内存利用率高
  3. 顺序IO以及O(1)常量时间get、put消息
  4. sendfile技术(零拷贝)

7.一个队列最重要的就是消息丢失问题,kafka是如何处理的

每次发送数据时,Producer都是send()之后就认为已经发送出去了,但其实大多数情况下消息还在内存的MessageSet当中,尚未发送到网络,这时候如果Producer挂掉,那就会出现丢数据的情况。

解决办法: ack机制,一般设置为acks=1,消息只需要被Leader接受并确认即可,这样同时保证了可靠性和效率。

6.Kafka 0.10的Producer做了什么优化

  • MessageSet手段批量顺序写入
  • 数据支持压缩
  • 异步发送

5.为什么kafka是pull模型

push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

4.底层的LogSegment、Index是怎么存储的

  • log:字节流,sendfile、zero copy技术
  • index:稀疏索引,mmap的数据结构-本质是个类,二分查找寻找到offset。

3.一个很重要的问题是当Leader宕机了,怎样在Follower中选举出新的Leader

一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”)。

刚创建的topic一般"preferred replica"是leader。在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。

所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式,通知需要做此修改的Broker。

那么Controller是如何选举leader的?

  • 如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader。
  • 如果replica都不在ISR列表里面,选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。
  • 如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

2. Partition的leader选举是怎么样的

  • Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后写入数据。
  • Consumer(0.8)通过zk找到leader,读取数据。
  • Consumer(0.10)通过Coordinator找到Leader,读取数据。

1. reassign一个topic,Producer、Consumer是否会丢失数据

不会。扩容的时候,新的leader需要从旧有的broker复制数据,跟上以后,会切换成leader。

这个时间期间,Producer、Consumer会向旧有的leader通信。

内部topic:__consumer_offsets
这个topic是用来管理所有的consumer的进度的,这样避免了把消费进度存zk上面影响扩展性。它是由Coordinator来管理的。

如果请求过来的topic是__consumer_offsets,那就启动OffsetManager的异步读
这个异步读会一直读取__consumer_offsets并把消息解码成消费进度放入缓存

queued.max.requests=16

I/O线程可以处理请求的队列大小,若实际请求数超过此大小,网络线程将停止接收新的请求。

confluence

  • 单机partition数的最大值:100 * broker * replica (if you care about latency, it’s probably a good idea to limit the number of partitions per broker to 100 x b x r, where b is the number of brokers in a Kafka cluster and r is the replication factor.)
时间: 2024-10-15 07:22:22

Kafka 0.10问题点滴的相关文章

Kafka 0.10.1.0 Cluster的搭建和Topic简单操作实验

[kafka cluster机器]:机器名称 用户名称sht-sgmhadoopdn-01/02/03 root [安装目录]: /root/learnproject/app 1.将scala文件夹同步到集群其他机器(scala 2.11版本,可单独下载解压) [[email protected] app]# scp -r scala [email protected]:/root/learnproject/app/ [[email protected] app]# scp -r scala [

Kafka 0.10 KafkaConsumer流程简述

ConsumerConfig.scala 储存Consumer的配置 按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的. 1.从poll开始 消费的规则如下: 一个partition只能被同一个ConsumersGroup的一个线程所消费. 线程数小于partition数,某些线程会消费多个partition. 线程数等于partition数,一个线程正好消费一个线程. 当添加消费者线程时,会触发rebalance,partition的分配发送变化

kafka 0.10.2 消息消费者

package cn.xiaojf.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.Kaf

kafka 0.10.2 消息生产者

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by 肖建锋 on

kafka 0.10.2 消息生产者(producer)

package cn.xiaojf.kafka.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka

关于CDH5.11.0自带kafka 0.10 bootstrap-server 无法消费

近日需要在项目用到kafka,然后本地使用cdh集成的kafka 进行安装调试,以及些样例代码,sparkstreaming 相关调用kafka 的代码使用的原始的api 而没有走zookeeper,虽然消费者能启动,但无法消费内容. 开始我使用shell下的zk方式是可以消费误认为kafka也是没有问题的,后来想了一下是否shell也可能使用api来访问看下情况. 之后我使用bootstrap-server的方式在shell下进行测试,果然后些的样例代码一样,无法消费. 之后就是无脑的百度,谷

kafka 0.10.2 部署失败后,重新部署

删除kafka各个节点log目录 删除zookeeper上kafka相关的目录 [[email protected] ~]# zkCli.sh Connecting to localhost:2181 2017-03-22 07:06:47,239 [myid:] - INFO [main:[email protected]100] - Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GM

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

Maven组件如下: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version></dependency> 官网代码如下: pasting /* * Licensed to the Apache Software

Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code

Return: Map[TopicPartition, Long] Code: val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPara("bootstrap.servers").toString) props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaPara("group.id").toString)