在团800运维工作总结之kafka集群日常工作经验总结

  1. 一些重要的原理

    基本原理什么叫broker partition cg我就不在这里说了,说一些自己总结的原理

    1.kafka有副本的概念,每个副本都分在不同的partition中,这中间分为leader和fllower

    2.kafka消费端的程序一定要和partition数量一致,不可以多,会出现有些consumer获取

    不到数据的现象

    3.producer原理

    producer通过zookeeper获取所连接的topic都在那些partiton中,每个parition的leader是那

    个,针对leader进行写操作,prodcer通过zookeeper的watch机制来记录以上的信息,pro

    ducer为了节省网络的io,还会在本地先把消息buffer起来,并将他们批量发送到broker中

    4.consumer原理

    consumer向broker发送fetch请求,并告知获取的消息offset,在kafka中采用pull方式,消费端

    主动pull消息,优点:消费者可以控制消费的数量

2.kafka生产环境常用命令总结

1.模拟生产端,推送数据

./bin/kafka-console-producer.sh --broker-list 172.16.10.130:9092 --topic deal_exposure_origin

2.模拟消费端,消费数据

./bin/kafka-console-consumer.sh --zookeeper 1172.16.10.140:2181  --topic deal_exposure_origin

3.创建topic,topic partiton数量 副本数 数据过期时间

./kafka-topics.sh --zookeeper spark:2181 --create --topic deal_task_log  --partitions 15 --replication-factor 1 retention.ms 1296000000

3.kafka如何动态的添加副本

1.副本,kafka一定要设置副本,如果之后再加会由于涉及到数据的同步,会把集群的io提升上去

3.如何扩大副本

2.把所有topic的信息记录到json文件中,信息有topic名称,用了哪些partition,副本在那个partition,

并修改json数据,添加副本数

#!/usr/bin/python

from kazoo.client import KazooClient

import random

import json

zk = KazooClient(hosts=‘172.16.11.73:2181‘)

zk.start()

for i in zk.get_children(‘/brokers/topics‘):

b= zk.get(‘/brokers/topics/‘+i)[0]

a = eval(b)[‘partitions‘]

list = []

dict = {}

for key,value in a.items():

if len(value) == 1:

c = {}

c[‘topic‘] = i.encode(‘utf-8‘)

c[‘partition‘] = int(key)

list1 = []

for ii in range(0,3):

while True:

if list1:

pass

else:

for iii in value:

list1.append(iii)

if len(list1) == 3:

break

num = random.randint(0,4)

#print ‘num=‘+str(num),‘value=‘+str(value)

if num not in list1:

list1.append(num)

#print list1

c[‘replicas‘] = list1

list.append(c)

version = eval(b)[‘version‘]

dict[‘version‘] = version

dict[‘partitions‘] = list

#jsondata = json.dumps(dict)

json.dump(dict,open(‘/opt/json/‘+i+‘.json‘,‘w‘))

3.加载json文件

/usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-reassign-partitions.sh --zookeeper 192.168.5.159:2181 --reassignment-json-file /opt/test.json --execute

4.查看是否已经添加了副本

usr/local/kafka_2.9.2-0.8.1.1/bin/kafka-topics.sh --describe --zookeeper 192.168.5.159:2181 --topic testtest

Topic:testtest  PartitionCount:15       ReplicationFactor:2     Configs:

Topic: testtest Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 2    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 3    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 4    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 5    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 6    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 7    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 8    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 9    Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 10   Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 11   Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 12   Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 13   Leader: 0       Replicas: 0,1   Isr: 0,1

Topic: testtest Partition: 14   Leader: 0       Replicas: 0,1   Isr: 0,1

4.kafka集群之间做数据同步

找一个broker节点进行同步

1.创建配置文件mirror_consumer.config

配置文件里写本地的kafka集群zookeeper

定义一个group用来去消费所有的topic,进行同步

zookeeper.connect=172.16.11.43:2181,172.16.11.46:2181,172.16.11.60:2181,172.16.11.67:2181,172.16.11.73:2181

group.id=backup-mirror-consumer-group

2.创建配置文件mirror_producer.config

zookeeper,kafka ip写对端集群的ip

zookeeper.connect=172.17.1.159:2181,172.17.1.160:2181

metadata.broker.list=172.17.1.159:9092,172.17.1.160:9092

3.同步命令

$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config sourceClusterConsumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

参数详解

1. 白名单(whitelist) 黑名单(blacklist)

mirror-maker接受精确指定同步topic的白名单和黑名单。使用java标准的正则表达式,为了方便,逗号(‘,’)被编译为java正则中的(‘|’)。

2. Producer timeout

为了支持高吞吐量,你最好使用异步的内置producer,并将内置producer设置为阻塞模式(queue.enqueueTimeout.ms=-1)。这样可以保证数据(messages)不会丢失。否则,异步producer默认的 enqueueTimeout是0,如果producer内部的队列满了,数据(messages)会被丢弃,并抛出QueueFullExceptions异常。而对于阻塞模式的producer,如果内部队列满了就会一直等待,从而有效的节制内置consumer的消费速度。你可以打开producer的的trace logging,随时查看内部队列剩余的量。如果producer的内部队列长时间处于满的状态,这说明对于mirror-maker来说,将消息重新推到目标Kafka集群或者将消息写入磁盘是瓶颈。

对于kafka的producer同步异步的详细配置请参考$KAFKA_HOME/config/producer.properties文件。关注其中的producer.type和queue.enqueueTimeout.ms这两个字段。

3. Producer 重试次数(retries)

如果你在producer的配置中使用broker.list,你可以设置当发布数据失败时候的重试次数。retry参数只在使用broker.list的时候使用,因为在重试的时候会重新选择broker。

4. Producer 数量

通过设置—num.producers参数,可以使用一个producer池来提高mirror maker的吞吐量。在接受数据(messages)的broker上的producer是只使用单个线程来处理的。就算你有多个消费流,吞吐量也会在producer处理请求的时候被限制。

5. 消费流(consumption streams)数量

使用—num.streams可以指定consumer的线程数。请注意,如果你启动多个mirror maker进程,你可能需要看看其在源Kafka集群partitions的分布情况。如果在每个mirror maker进程上的消费流(consumption streams)数量太多,某些消费进程如果不拥有任何分区的消费权限会被置于空闲状态,主要原因在于consumer的负载均衡算法。

6. 浅迭代(Shallow iteration)与producer压缩

我们建议在mirror maker的consumer中开启浅迭代(shallow iteration)。意思就是mirror maker的consumer不对已经压缩的消息集(message-sets)进行解压,只是直接将获取到的消息集数据同步到producer中。

如果你开启浅迭代(shallow iteration),那么你必须关闭mirror maker中producer的压缩功能,否则消息集(message-sets)会被重复压缩。

7. Consumer 和 源Kafka集群(source cluster)的 socket buffer sizes

镜像经常用在跨集群场景中,你可能希望通过一些配置选项来优化内部集群的通信延迟和特定硬件性能瓶颈。一般来说,你应该对mirror-maker中consumer的socket.buffersize 和源集群broker的socket.send.buffer设定一个高的值。此外,mirror-maker中消费者(consumer)的fetch.size应该设定比socket.buffersize更高的值。注意,套接字缓冲区大小(socket buffer size)是操作系统网络层的参数。如果你启用trace级别的日志,你可以检查实际接收的缓冲区大小(buffer size),以确定是否调整操作系统的网络层。

4.如何检验MirrorMaker运行状况

Consumer offset checker工具可以用来检查镜像对源集群的消费进度。例如:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group KafkaMirror --zkconnect localhost:2181 --topic test-topic

KafkaMirror,topic1,0-0 (Group,Topic,BrokerId-PartitionId)

Owner = KafkaMirror_jkoshy-ld-1320972386342-beb4bfc9-0

Consumer offset = 561154288

= 561,154,288 (0.52G)

Log size = 2231392259

= 2,231,392,259 (2.08G)

Consumer lag = 1670237971

= 1,670,237,971 (1.56G)

BROKER INFO

0 -> 127.0.0.1:9092

注意,–zkconnect参数需要指定到源集群的Zookeeper。另外,如果指定topic没有指定,则打印当前消费者group下所有topic的信息。



5.kafka所使用的磁盘io过高解决方法

问题:kafka所用磁盘io过高

我们生产平台有5台kafka机器,每台机器上分了2块磁盘做parition

最近发现kafka所使用的磁盘io非常高,影响到了生产端推送数据的性能

一开始以为是由于一个推送日志的topic所导致的的,因为每秒推送数据大概在2w左右,

后来把此topic迁移到了其他的kafka集群中还是未见效果

最终iotop发现其实是由于zookeeper持久化的时候导致的

zookeeper持久化的时候也写到kafka所用到的磁盘中

通过此问题说明几点问题

1.kafka用zookeeper,和大家所熟悉的其他应用例如solrcloud codis otter不太一样

一般用zookeeper都是管理集群节点用,而kafka用zookeeper是核心,生产端和消费端都会去

链接zookeeper获取响应的信息

生产端通过链接zookeeper获取topic都用了那些parition,每个parition的副本的leader是那个

消费端链接zookeeper获取offset,消费端消费都会操作对zookeeper的数据进行修改,对io的操作

很频繁

解决方法:

禁止zookeeper做持久化操作

配置文件中添加一行

forceSync=no

问题解决


时间: 2024-10-05 14:08:32

在团800运维工作总结之kafka集群日常工作经验总结的相关文章

老男孩教育运维班100台规模集群阶段性综合上机实战考试

老男孩教育运维班100台规模集群第十关阶段性综合上机实战考试 光学理论有啥用,不拉出来实战遛遛,只能是自欺欺人! 项目要求: 1.全体学员上机实践考试,完成后由排长或班长.或助教打分. 2.时间:3个小时,抄袭0分. (一)上机服务器业务及IP主机名规划 已知5台服务器主机名主机对应信息见下表: 服务器说明 外网IP(NAT) 内网IP(NAT) 主机名 apache web服务器 10.0.0.7/24 172.16.1.7/24 web02 nginx web服务器 10.0.0.8/24

老男孩教育运维班50-100台规模集群全网数据备份项目实战

老男孩教育运维班50-100台规模集群全网数据备份解决方案 项目要求: 1.全体学员上机实践考试,完成后由排长或班长或助教打分. 2.时间:60分钟,抄袭别人0分. 3.本项目提供免费实战讲解视频: http://edu.51cto.com/course/course_id-3497.html 1.基本备份要求 已知3台服务器主机名分别为web01.backup.nfs01,主机信息见下表: 服务器说明 外网IP 内网IP 主机名称 nginx web服务器 10.0.0.8/24 172.16

老男孩教育运维班100台规模集群存储系统搭建及数据实时备份上机实战

老男孩教育运维班0基础起步上机实战系列项目 老男孩教育运维班100台规模集群存储系统搭建及数据实时备份上机实战 项目要求: 1.全体学员上机实践考试,完成后由排长或班长.或助教打分. 2.时间:90分钟,抄袭0分. (一)上机服务器业务及IP主机名规划 已知4台服务器主机名主机对应信息见下表: 服务器说明 外网IP 内网IP 主机名 apache web服务器 10.0.0.7/24 172.16.1.7/24 web02 nginx web服务器 10.0.0.8/24 172.16.1.8/

Linux运维需要懂什么web集群架构知识?

Linux运维需要懂什么web集群架构知识? 在充斥着各种的互联网+的数字时代,IT运维方面也越来越趋于Linux系统的应用,掌握 Linux 运维技术已成为IT 技术人员的必经之路,但是,构建在Linux系统上的高性能.高并发企业级网站集群架构上的网站集群架构,又会涉及到哪些具体的内容呢? 1.需要学习与Linux 相关的基础且重要的知识 Linux 的历史沿革.Linux 的企业级选型.学习环境的搭建.Linux 的企业级系统安装.Linux 系统的基础优化,以及远程连接Linux 及客户端

网站运维技术与实践之集群架构规划

集群架构规划和设计只要是涉及到高并发高流量的项目,基本上都需要. 本文主要围绕两个方面,一个是IDC的规划和选择,另一个是CDN. 一.IDC的规划和选择 IDC的选择是网站上线前要做的最重要的事情之一.哪怕发展初期只有一台服务器,选择一个位置不错的机房托管,都会助益良多. 也许有人会问IDC是什么? 我引用百度百科来回答: IDC为互联网内容提供商(ICP).企业.媒体和各类网站提供大规模.高质量.安全可靠的专业化服务器托管.空间租用.网络批发带宽以及ASP.EC等业务.IDC是对入驻(Hos

在团800运维工作总结之记一次lvs坑

今天突然发现13.14这个lvs的vip代理的所有端口不可用,重启lvs后恢复可用,过会又不可用了 解决方法 通过arpping **13.14看到居然有2个mac地址,一个居然是后端新加机器的mac地址 立马发现原因是由于之前有人在13.14后面加了一台机器,没有在sysctl.conf修改内核的那4个参数,导致13.14穿透到后端机器 具体原因 为什么重启keepalived就好了,因为出现问题后重启LVS会刷新交换机上记录的13.14对应mac,所以会恢复一段时间,但是一旦mac过期后问题

在团800运维工作总结之dns集群使用

1.产生燕郊 蓝汛 亦庄三机房同步dns数据所用到的key rndc-confgen -k yanjiao -c yanjiao -a -r keyboard rndc-confgen -k lanxun -c lx -a -r keyboard rndc-confgen -k yizhuang-c yz -a -r keyboard 2./etc/named.conf // // named.conf // // Provided by Red Hat bind package to conf

在团800运维工作总结之salt的使用

来这个公司第一件事就是推出了salt,因为要结合自动化上线使用 salt-net-api 获取tocken 1.curl -k http://127.0.0.1:8000/login -H "Accept: application/x-yaml" -d username="saltapi" -d password="abc/123" -d eauth='pam' 2.curl -k http://192.168.10.169:8000/ -H &

在团800运维工作总结之haproxy---rsyslog----kafka---collector--es--kibana

一下是我在单位对haproxy进行日志分析的一整套流程 我们一直都是处在维护es集群的配置,并没有把一整套流程 包括收集端的代码,全部自己搞定一次,而且线上收集日志的时候我们一般都用的logstash,但是业界很多人都说logstash不管是性能上还有稳定性上都不是很好,logstash的优点在于配置简便,这次我选用了rsyslog 今天就这haproxy日志,我把整个流程给大家走一遍,就算是让大家了解下 具体流程如下 haproxy----local2级别----rsyslog----kafk