kafka知识体系-日常运维命令

本文主要讲解kafka日常运维的命令,包括topic管理、性能测试脚本。

kafka版本0.10.0,安装步骤见大数据平台搭建-kafka集群的搭建

常用脚本

如下所有的命令均基于KAFKA_HOME=/wls/oracle/kafka ,服务器列表如下:

10.20.112.59
10.20.112.64
10.20.112.65
10.20.116.129
10.20.116.175

创建topic

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --create --topic TEST --replication-factor 2 --partitions 3  

其中replication-factor后数字表示副本个数,partitions的数字表示分区个数。

查看topic

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --list

更改topic配置

配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值,即/wls/oracle/kafka/config/server.propertiestopic属性配置。

topic创建完成后,随着项目的进展,可能存在对特定topic配置的更改,涉及到的常用更改项如下。

单个消息比较大,需要调整broker能接收消息的最大字节数

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --config max.message.bytes=128000

或者使用/wls/oracle/kafka/bin/kafka-configs.sh脚本也行
/wls/oracle/kafka/bin/kafka-configs.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --entity-type topics --entity-name TEST --alter --add-config max.message.bytes=128000

kafka扩容或者读取性能遇到瓶颈时,可能会考虑增加分区数

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --partitions 6  

如果涉及到多个配置的更改,则依次用 --conf key=value并列配置即可

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --alter --topic TEST --config max.message.bytes=128000  --config flush.messages=1

其他可选配置

Property Default Server Default Property note
cleanup.policy delete log.cleanup.policy 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
flush.messages None log.flush.interval.messages log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失
flush.ms None log.flush.interval.ms 仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
index.interval.bytes 4096 log.index.interval.bytes 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
message.max.bytes 1,000,000 message.max.bytes 表示消息的最大大小,单位是字节
min.cleanable.dirty.ratio 0.5 log.cleaner.min.cleanable.ratio 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
retention.bytes None log.retention.bytes topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
retention.ms None log.retention.minutes 数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据
log.retention.bytes和log.retention.minutes达到要求,都会执行删除,会被topic创建时的指定参数覆盖
segment.bytes 1 GB log.segment.bytes topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
segment.index.bytes 10 MB log.index.size.max.bytes 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.roll.hours 7 days log.roll.hours 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖

查看topic

查看topic分区和副本分布情况

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --describe --topic TEST

查看topic配置

/wls/oracle/kafka/bin/kafka-configs.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --entity-type topics --entity-name TEST --describe

删除topic

/wls/oracle/kafka/bin/kafka-topics.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka --delete --topic TEST

需要注意的是执行这段命令后控制台输出

Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true. 

也就是说执行删除命令,不是真正删除,而是标记删除,实际只是在zookeeper上添加/admin/delete_topics/test节点,需要确认提前打开delete.topic.enable开关。

如果遇到标记删除失败,可以考虑手工删除,步骤如下:

1.登录zookeeer客户端,删除zookeeperTESTtopic节点

/wls/oracle/zookeeper/bin/zkCli.sh
rmr /kafka/brokers/topics/TEST

其中/kafka为我们自己kafka在zookeeper的根目录,不同集群可能不太一致。

2.删除kafka数据文件

rm -rf /wls/oracle/bigdata/kafka/kafka-logs-*/TEST*

其中/wls/oracle/bigdata/kafka/kafka-logs-*server.properties中配置的log.dirs目录,具体可参考

大数据平台搭建-kafka集群的搭建

发送消息

/wls/oracle/kafka/bin/kafka-console-producer.sh --broker-list 10.20.112.59:9092,10.20.112.64:9092,10.20.112.65:9092,10.20.116.129:9092,10.20.116.175:9092 --topic TEST

接收消息

/wls/oracle/kafka/bin/kafka-console-consumer.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka  --topic TEST --from-beginning

其中--from-beginning表示从头开始消费kafka队列TEST中的消息,如果没有该选项,则消费最新的消息。

查看topic消费者offset

/wls/oracle/kafka/bin/kafka-consumer-offset-checker.sh --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka  --topic TEST --group mygroup

性能测试

kafka官方提供了优化参数的性能测试脚本

生产者

/wls/oracle/kafka/bin/kafka-producer-perf-test.sh --num-records 100000000 --record-size 10 --topic TEST  --producer-props bootstrap.servers=10.20.112.59:9092,10.20.112.64:9092,10.20.112.65:9092,10.20.116.129:9092,10.20.116.175:9092 acks=all

num-records表示发送的总消息量,

record-size表示消息的大小,

producer-props表示生产者的配置,可以并列写多项

消费者

/wls/oracle/kafka/bin/kafka-consumer-perf-test.sh --messages 100000000 --topic TEST --zookeeper 10.20.112.59:2181,10.20.112.64:2181,10.20.112.65:2181,10.20.116.129:2181,10.20.116.175:2181/kafka  --threads 3 --num-fetch-threads 3 --compression-codec 0 --group mygroup --message-size

messages表示消费的消息总量

threads表示处理消息的线程数

num-fetch-threads表示拉取消息的线程数

compression-codec表示压缩方式,0-不压缩,1-GZIP,2-Snappy,3-LZ4

此外还可以使用consumer.config指定其他配置,具体参考http://kafka.apache.org/0100/documentation.html

以物理机(非本kafka集群)测试运行结果如下:

start.time,end.time,data.consumed.in.MB,MB.sec,data.consumed.in.nMsg,nMsg.sec
2017-06-14 17:55:42:312,2017-06-14 17:59:26:754,216230.5107,963.4138,30000000,133664.8221

本文详细讲述了kafka日常运维的命令,包括topic管理、性能测试,一一记录,以免忘记。

本文参考:

http://kafka.apache.org/0100/documentation.html

原文地址:https://www.cnblogs.com/molyeo/p/9264961.html

时间: 2024-12-27 23:17:11

kafka知识体系-日常运维命令的相关文章

Lync2013日常运维命令整理(十)

近部署完成了Lync 服务器,即将转向运维阶段,运维过程中会涉及到为员工启用Lync帐号.语音功能以及导出统计当前哪些用户开启Lync权限,在这里我简单汇总一些常用的,供大家参考,后期我还会将用到的命令出来供大家平时运维使用. 1.强制刷新更新状态 Invoke-CSManagementStoreReplication 2.检查更新状态 Get-CsManagementStoreReplicationStatus 3.为禁用的账户开启Lync功能 Enable-CsUser 4.为某个部门开启l

AIX—日常运维命令总结

查看系统IP地址: netstat  -in 1. 查看AIX服务器的物理构造信息,包括服务器网络配置信息 #  prtconf #  ifconfig   -a #  lsattr  -E  -l  mem0     :查看系统内存大小 #  netstat  -in             :显示系统各网卡的配置信息 2. 开启和关闭AIX服务器 #  shutdown  -F  :快速关机 #  shutdown  -I :采用交互式模式关闭系统 #  shutdown  -K :避免关闭

ogg日常运维命令

1.查看历史记录.快捷执行历史中的一条命令 GGSCI (11g) 32> h 23: view param exta24: info all25: lag exta.... GGSCI (11g) 33> ! 24-- 这里相当于执行了 上面 24: info all 的命令. 2.查看参数设置使用view params <进程名> 可以查看进程的参数设置.该命令同样支持通配符*. 3.查看进程状态使用 info <进程名> 命令可以查看进程信息.可以查看到的信息包括

linux日常运维命令

修改系统时间 [[email protected] ~]# date -s "2012-11-16 10:16:00" [[email protected] ~]# clock -w 2.查看系统的内核 [[email protected] ~]#  uname–a 3.查看linux服务器物理CPU的个数 [[email protected] ~]# cat /proc/cpuinfo | grep "physicalid" | sort | uniq  | wc

日常运维命令

监控系统状态进行初步的判定w命令时间 用户 网络用户显示是pts tty1 客户端 load average系统负载 :1分钟,5分钟,15分钟时间段内系统负载是多少 单位时间段内使用CUP活动的进程 都是平均值 查看逻辑CPU 显示的比实际少1 processor 系统负载第一段时间段不高于8合适 5分钟内平均有多少个进程login 登录时间 uptime 当显示系统cpu不够用时 想要进一步查询vmstat 1 5 每一秒显示 显示5次r=run 表示有多少个进程处于在执行中 b=block

日常运维命令2

监控系统状态iostat 1 每秒显示硬盘 读写速度iostat -x 1%util 表示等待IOiotop 安装显示占用IO的进程read 读的速度 write 写到速度 swapin 交换分区free free -hbuff/cache 缓冲和缓存 从磁盘度数据 交给CPU 中间需要一个内存(cache) CPU 计算完的数据存到磁盘里去 中间需要内存(buff)available=free+buff/cache(剩余)ps 命令ps aux 列出所以进程的状况ps aux | 检查进程用户

CentOS日常运维命令

查看剩余内存: free -m #-/+ buffers/cache: 6458 1649 #6458M为真实使用内存 1649M为真实剩余内存(剩余内存+缓存+缓冲器) #linux会利用所有的剩余内存作为缓存,所以要保证linux运行速度,就需要保证内存的缓存大小 系统信息: uname -a # 查看Linux内核版本信息 cat /proc/version # 查看内核版本 cat /etc/issue # 查看系统版本 lsb_release -a # 查看系统版本 需安装 cento

gitlab 日常运维命令

2018-10-24 gitlab查看版本 cat /opt/gitlab/embedded/service/gitlab-rails/VERSION 查看gitlab服务状态命令 gitlab-ctl status 启动.停止服务 # 启动Gitlab所有组件 sudo gitlab-ctl start # 停止Gitlab所有组件 sudo gitlab-ctl stop # 重启Gitlab所有组件 sudo gitlab-ctl restart 备份与恢复 备份配置 配置文件再/etc/

zookeeper 用法和日常运维

本文以ZooKeeper3.4.3版本的官方指南为基础:http://zookeeper.apache.org/doc/r3.4.3/zookeeperAdmin.html,补充一些作者运维实践中的要点,围绕ZK的部署和运维两个方面讲一些管理员需要知道的东西.本文并非一个ZK搭建的快速入门,关于这方面,可以查看<ZooKeeper快速搭建>. 1.部署 本章节主要讲述如何部署ZooKeeper,包括以下三部分的内容: 系统环境 集群模式的配置 单机模式的配置 系统环境和集群模式配置这两节内容大