- 一些重要的原理
基本原理什么叫broker partition cg我就不在这里说了,说一些自己总结的原理
1.kafka有副本的概念,每个副本都分在不同的partition中,这中间分为leader和fllower
2.kafka消费端的程序一定要和partition数量一致,不可以多,会出现有些consumer获取
不到数据的现象
3.producer原理
producer通过zookeeper获取所连接的topic都在那些partiton中,每个parition的leader是那
个,针对leader进行写操作,prodcer通过zookeeper的watch机制来记录以上的信息,pro
ducer为了节省网络的io,还会在本地先把消息buffer起来,并将他们批量发送到broker中
4.consumer原理
consumer向broker发送fetch请求,并告知获取的消息offset,在kafka中采用pull方式,消费端
主动pull消息,优点:消费者可以控制消费的数量
2.kafka生产环境常用命令总结
1.模拟生产端,推送数据
./bin/kafka-console-producer.sh --broker-list 172.16.10.130:9092 --topic deal_exposure_origin
2.模拟消费端,消费数据
./bin/kafka-console-consumer.sh --zookeeper 1172.16.10.140:2181 --topic deal_exposure_origin
3.创建topic,topic partiton数量 副本数 数据过期时间
./kafka-topics.sh --zookeeper spark:2181 --create --topic deal_task_log --partitions 15 --replication-factor 1 retention.ms 1296000000
3.kafka如何动态的添加副本
1.副本,kafka一定要设置副本,如果之后再加会由于涉及到数据的同步,会把集群的io提升上去
3.如何扩大副本
2.把所有topic的信息记录到json文件中,信息有topic名称,用了哪些partition,副本在那个partition,
并修改json数据,添加副本数
#!/usr/bin/python
from kazoo.client import KazooClient
import random
import json
zk = KazooClient(hosts=‘172.16.11.73:2181‘)
zk.start()
for i in zk.get_children(‘/brokers/topics‘):
b= zk.get(‘/brokers/topics/‘+i)[0]
a = eval(b)[‘partitions‘]
list = []
dict = {}
for key,value in a.items():
if len(value) == 1:
c = {}
c[‘topic‘] = i.encode(‘utf-8‘)
c[‘partition‘] = int(key)
list1 = []
for ii in range(0,3):
while True:
if list1:
pass
else:
for iii in value:
list1.append(iii)
if len(list1) == 3:
break
num = random.randint(0,4)
#print ‘num=‘+str(num),‘value=‘+str(value)
if num not in list1:
list1.append(num)
#print list1
c[‘replicas‘] = list1
list.append(c)
version = eval(b)[‘version‘]
dict[‘version‘] = version
dict[‘partitions‘] = list
#jsondata = json.dumps(dict)
json.dump(dict,open(‘/opt/json/‘+i+‘.json‘,‘w‘))
3.加载json文件
/usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-reassign-partitions.sh --zookeeper 192.168.5.159:2181 --reassignment-json-file /opt/test.json --execute
4.查看是否已经添加了副本
usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --describe --zookeeper 192.168.5.159:2181 --topic testtest
Topic:testtest PartitionCount:15 ReplicationFactor:2 Configs:
Topic: testtest Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 4 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 5 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 6 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 7 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 8 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 9 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 10 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 11 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 12 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 13 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: testtest Partition: 14 Leader: 0 Replicas: 0,1 Isr: 0,1
4.kafka集群之间做数据同步
找一个broker节点进行同步
1.创建配置文件mirror_consumer.config
配置文件里写本地的kafka集群zookeeper
定义一个group用来去消费所有的topic,进行同步
zookeeper.connect=172.16.11.43:2181,172.16.11.46:2181,172.16.11.60:2181,172.16.11.67:2181,172.16.11.73:2181
group.id=backup-mirror-consumer-group
2.创建配置文件mirror_producer.config
zookeeper,kafka ip写对端集群的ip
zookeeper.connect=172.17.1.159:2181,172.17.1.160:2181
metadata.broker.list=172.17.1.159:9092,172.17.1.160:9092
3.同步命令
$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"
参数详解
1. 白名单(whitelist) 黑名单(blacklist)
mirror-maker接受精确指定同步topic的白名单和黑名单。使用java标准的正则表达式,为了方便,逗号(‘,’)被编译为java正则中的(‘|’)。
2. Producer timeout
为了支持高吞吐量,你最好使用异步的内置producer,并将内置producer设置为阻塞模式(queue.enqueueTimeout.ms=-1)。这样可以保证数据(messages)不会丢失。否则,异步producer默认的 enqueueTimeout是0,如果producer内部的队列满了,数据(messages)会被丢弃,并抛出QueueFullExceptions异常。而对于阻塞模式的producer,如果内部队列满了就会一直等待,从而有效的节制内置consumer的消费速度。你可以打开producer的的trace logging,随时查看内部队列剩余的量。如果producer的内部队列长时间处于满的状态,这说明对于mirror-maker来说,将消息重新推到目标Kafka集群或者将消息写入磁盘是瓶颈。
对于kafka的producer同步异步的详细配置请参考$KAFKA_HOME/config/producer.properties文件。关注其中的producer.type和queue.enqueueTimeout.ms这两个字段。
3. Producer 重试次数(retries)
如果你在producer的配置中使用broker.list,你可以设置当发布数据失败时候的重试次数。retry参数只在使用broker.list的时候使用,因为在重试的时候会重新选择broker。
4. Producer 数量
通过设置—num.producers参数,可以使用一个producer池来提高mirror maker的吞吐量。在接受数据(messages)的broker上的producer是只使用单个线程来处理的。就算你有多个消费流,吞吐量也会在producer处理请求的时候被限制。
5. 消费流(consumption streams)数量
使用—num.streams可以指定consumer的线程数。请注意,如果你启动多个mirror maker进程,你可能需要看看其在源Kafka集群partitions的分布情况。如果在每个mirror maker进程上的消费流(consumption streams)数量太多,某些消费进程如果不拥有任何分区的消费权限会被置于空闲状态,主要原因在于consumer的负载均衡算法。
6. 浅迭代(Shallow iteration)与producer压缩
我们建议在mirror maker的consumer中开启浅迭代(shallow iteration)。意思就是mirror maker的consumer不对已经压缩的消息集(message-sets)进行解压,只是直接将获取到的消息集数据同步到producer中。
如果你开启浅迭代(shallow iteration),那么你必须关闭mirror maker中producer的压缩功能,否则消息集(message-sets)会被重复压缩。
7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes
镜像经常用在跨集群场景中,你可能希望通过一些配置选项来优化内部集群的通信延迟和特定硬件性能瓶颈。一般来说,你应该对mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer设定一个高的值。此外,mirror-maker中消费者(consumer)的fetch.size应该设定比socket.buffersize更高的值。注意,套接字缓冲区大小(socket buffer size)是操作系统网络层的参数。如果你启用trace级别的日志,你可以检查实际接收的缓冲区大小(buffer size),以确定是否调整操作系统的网络层。
4.如何检验MirrorMaker运行状况
Consumer offset checker工具可以用来检查镜像对源集群的消费进度。例如:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topic
KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0
Consumer offset = 561154288
= 561,154,288 (0.52G)
Log size = 2231392259
= 2,231,392,259 (2.08G)
Consumer lag = 1670237971
= 1,670,237,971 (1.56G)
BROKER INFO
0 -> 127.0.0.1:9092
注意,–zkconnect参数需要指定到源集群的Zookeeper。另外,如果指定topic没有指定,则打印当前消费者group下所有topic的信息。
5.kafka所使用的磁盘io过高解决方法
问题:kafka所用磁盘io过高
我们生产平台有5台kafka机器,每台机器上分了2块磁盘做parition
最近发现kafka所使用的磁盘io非常高,影响到了生产端推送数据的性能
一开始以为是由于一个推送日志的topic所导致的的,因为每秒推送数据大概在2w左右,
后来把此topic迁移到了其他的kafka集群中还是未见效果
最终iotop发现其实是由于zookeeper持久化的时候导致的
zookeeper持久化的时候也写到kafka所用到的磁盘中
通过此问题说明几点问题
1.kafka用zookeeper,和大家所熟悉的其他应用例如solrcloud codis otter不太一样
一般用zookeeper都是管理集群节点用,而kafka用zookeeper是核心,生产端和消费端都会去
链接zookeeper获取响应的信息
生产端通过链接zookeeper获取topic都用了那些parition,每个parition的副本的leader是那个
消费端链接zookeeper获取offset,消费端消费都会操作对zookeeper的数据进行修改,对io的操作
很频繁
解决方法:
禁止zookeeper做持久化操作
配置文件中添加一行
forceSync=no
问题解决