Kafka性能测试
测试背景
由于业务需求,针对kafka在不同参数下的性能进行测试。从而进行kafka性能调优
测试目标
测试kafka 0.8n的性能(Producer/Consumer性能)。当消息大小、批处理大小、压缩等参数变化时对吞吐率的影响。
测试环境
软件版本:kafka 0.8.1.1
硬件环境:3台多云服务组成的kafka集群。各服务器CPU4核,内存16G,配置如下:
服务器IP:
203.150.54.215
203.150.54.216
203.150.54.217
测试方法
使用kafka官方提供的kafa-perf工具做性能测试
测试步骤
一、测试环境准备
1、测试工具kafka-perf的准备
cp kafka-perf_2.10-0.8.1.1.jar /application/kafka/libs/
2、启动kafka
cd /application/kafka
vim config/server.properties #内容见下图
./bin/kafka-server-start.sh --daemon
config/server.properties
3、测试集群可靠性
创建一个主题,复制因子为3.
[[email protected]
kafka]# bin/kafka-topics.sh --create –zookeeper 203.150.54.215:21203,203.150.54.216:21203,203.150.54.217:21203
--replication-factor 3 --partitions 3 --topic 88
Created topic
"88".
查看创建的主题分区情况
[[email protected]
kafka]# bin/kafka-topics.sh --describe --zookeeper 203.150.54.215:21203 --topic 88
Topic:88 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: 88 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: 88 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: 88 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
在215上启动生产者
[[email protected]
kafka]# bin/kafka-console-producer.sh --broker-list 203.150.54.215:21204
--topic 88
SLF4J: Failed to
load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting
to no-operation (NOP) logger implementation
SLF4J: See
http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
hh
在216上进行消费
[[email protected]
kafka]#
bin/kafka-console-consumer.sh --zookeeper 203.150.54.216:21203 --topic
88 --from-beginning
SLF4J: Failed to
load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting
to no-operation (NOP) logger implementation
SLF4J: See
http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
test
hh
从上可知:3个服务器集群之间链接正常,下面进行性能测试
kafka-producer-perf-test.sh中参数说明:
messages 生产者发送走的消息数量
message-size 每条消息的大小
batch-size 每次批量发送消息的数量
topics 生产者发送的topic
threads 生产者
broker-list 安装kafka服务的机器ip:porta列表
producer-num-retries 一个消息失败发送重试次数
request-timeouts-ms 一个消息请求发送超时时间
bin/kafka-consumer-perf-test.sh中参数说明:
zookeeper zk配置
messages 消费者消费消息的总数量
topic 消费者需要消费的topic
threads 消费者使用几个线程同时消费
group 消费者组名称
socket-buffer-sizes socket缓存大小
fetch-size 每次想kafka broker请求消费消息大小
consumer.timeout.ms 消费者去kafka broker拿一条消息的超时时间
二、测试生产者吞吐率
此项只测试producer在不同的batch-zie,patition等参数下的吞吐率,也就是数据只被及计划,没有consumer读取数据消费情况。
生成Topic:
生成不同复制因子,partition的topic
bin/kafka-topics.sh --zookeeper 203.150.54.215:21203
--create --topic test-pati1-rep1 --partitions 1 --replication-factor 1
bin/kafka-topics.sh --zookeeper 203.150.54.215:21203
--create --topic test-pati1-rep2 --partitions 1 --replication-factor 2
bin/kafka-topics.sh --zookeeper 203.150.54.215:21203
--create --topic test-pati2-rep1 --partitions 2 --replication-factor 1
bin/kafka-topics.sh --zookeeper 203.150.54.215:21203
--create --topic test-pati2-rep2 --partitions 2 --replication-factor 2
bin/kafka-topics.sh --zookeeper 203.150.54.215:21203
--create --topic test-pati3-rep1 --partitions 3 --replication-factor 1
bin/kafka-topics.sh --zookeeper 203.150.54.215:21203
--create --topic test-pati3-rep2 --partitions 3 --replication-factor 2
测试producer吞吐率
调整batch-size,thread,topic,压缩等参数测试producer吞吐率。
示例:
a)批处理为1,线程数为1,复制因子为1
bin/kafka-producer-perf-test.sh
--messages 500000 --message-size 512 --batch-size 1 --topic
test-pati1-rep1 --threads 1
--broker-list 203.150.54.215:21204,203.150.54.216:21204,203.150.54.217:21204
b)批处理为5,线程数为4,复制因子为2
bin/kafka-producer-perf-test.sh
--messages 50000 --message-size 512 --batch-size 5 --topic test-pati1-rep2
--threads 1 --broker-list 203.150.54.215:21204,203.150.54.216:21204,203.150.54.217:21204
c)批处理为10,线程数为4,复制因子为2,不压缩,sync
bin/kafka-producer-perf-test.sh
--messages 50000 --message-size 512 --batch-size 10 --topic test-pati2-rep2
--threads 4 --compression-codec 0 --sync 1 --broker-list 203.150.54.215:21204,203.150.54.216:21204,203.150.54.217:21204
d)批处理为10,线程数为4,复制因子为2,gzip压缩,sync
bin/kafka-producer-perf-test.sh
--messages 50000 --message-size 512 --batch-size 10 --topic test-pati2-rep2 --threads
4 --compression-codec 1 --sync 1 --broker-list 203.150.54.215:21204, 203.150.54.216:21204, 203.150.54.217:21204
说明:消息大小统一使用和业务场景中日志大小相近的512Bype,消息数为50w或200w条。
三、测试消费者吞吐率
测试consumer吞吐率
调整批处理数,线程数,partition数,复制因子,压缩等进行测试。
示例:
a)批处理为1,线程数为1,复制因子为1
bin/kafka-consumer-perf-test.sh
--messages 500000 --message-size 512 --batch-size 1 --topic test-pati1-rep1 --threads 1--zookeeper
203.150.54.215:2181
b)批处理为5,线程数为4,复制因子为2
bin/kafka-consumer-perf-test.sh
--messages 50000 --message-size 512 --batch-size 5 --topic test-pati1-rep2 --threads 1 --zookeeper
203.150.54.215:2181
c)批处理为10,线程数为4,复制因子为2,不压缩,sync
bin/kafka-consumer-perf-test.sh
--messages 50000 --message-size 512 --batch-size 10 --topic test-pati2-rep2 --threads 4 --compression-codec 0 --sync 1 --zookeeper
203.150.54.215:2181
d)批处理为10,线程数为4,复制因子为2,gzip压缩,sync
bin/kafka-consumer-perf-test.sh
--messages 50000 --message-size 512 --batch-size 10 --topic test-pati2-rep2 --threads 4 --compression-codec 1 --sync 1 --zookeeper
203.150.54.215:2181
注:以上测试均使用python脚本多线程测试实现。
测试结果及分析
1、 生产者测试结果及分析
调整线程数,批处理数,复制因子等参考,对producer吞吐率进行测试。在测试时消息大小为512Byte,消息数为50w,结果如下:
复制因子 |
1 |
2 |
||
线程数 |
批处理 |
|||
1 |
1 |
1.8598 |
0.5878 |
|
4 |
5 |
17.6594 |
7.2501 |
|
4 |
10 |
26.4221 |
13.0208 |
|
压/no |
||||
4 |
10 |
4.6855 |
1.9058 |
|
4 |
10 |
3.0945 |
1.4846 |
Producer吞吐率(MB/s)
调整sync模式,压缩方式得到吞吐率数据如表3.在本次测试中msg=512Byte,message=500000,batch_zie=10
结果分析:
1)kafka在批处理,多线程,不适用同步复制的情况下,吞吐率是比较高的,可以达26MB/s,消息数达17w条/s以上。
2)使用批处理或多线程对提升生产者吞吐率效果明显。
3)复制因子会对吞吐率产生较明显影响
使用同步复制时,复制因子会对吞吐率产生较明显的影响。复制因子为2比因子为1(即无复制)时,吞吐率下降40%左右。
4)使用sync方式,性能有明显下降。
使用Sync方式producer吞吐率会有明显下降
5)压缩与吞吐率
使用Gzip及Snappy方式压缩,吞吐率反而有下降,原因待分析。而Snappy方式吞吐率高于gzip方式。
6)分区数与吞吐率
分区数增加生产者吞吐率反而有所下降
2、消费者结果及分析
结果分析:
1)kafka consumer吞吐率在parition,threads较大的情况下,在测试场景下,最大吞吐率达到了34MB/s
2)复制因子,影响较小。replication factor并不会影响consumer的吞吐率测试, consumer从每个partition的leader读数据,而与replication factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。
3)线程数和partition与吞吐率关系
当分区数较大时,增加thread数可显著提升consumer的吞吐率。
但要注意在分区较大时线程数不改大于分区数,否则会出现No broker partitions consumed
by consumer,对提升吞吐率也没有帮助。
4)批处理数对吞吐率影响
改变批处理数对吞吐率影响不大
5)压缩与吞吐率
压缩对吞吐率影响小。
附优化后的配置文件:
broker.id=1
listeners=PLAINTEXT://0.0.0.0:6667
advertised.listeners=PLAINTEXT://203.150.54.215:6667
port=6667
host.name=203.150.54.215
# Replication configurations
num.replica.fetchers=1
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
compression.codec:none
controller.socket.timeout.ms=30000
controller.message.queue.size=10
controlled.shutdown.enable=true
default.replication.factor:2
# Log configuration
num.partitions=1
num.recovery.threads.per.data.dir=1
message.max.bytes=1000000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
log.dirs=/mnt/kafka-logs/kafka00
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=72 #保留三天,也可以更短
log.flush.interval.ms=10000 #每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.messages=20000 #log数据文件刷新策略
log.flush.scheduler.interval.ms=2000
log.roll.hours=72
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824 #kafka启动时是单线程扫描目录(log.dir)下所有数据文件
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
zookeeper.connect=203.150.54.215:2181,203.150.54.216:2182,203.150.54.217:2183
# Socket server configuration
num.io.threads=5 #配置线程数量为cpu核数加1
num.network.threads=8 #配置线程数量为cpu核数2倍,最大不超过3倍
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=1000
producer.purgatory.purge.interval.requests=1000