参考官网site:
http://kafka.apache.org/documentation.html#basic_ops_cluster_expansion
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
说明:
当我们对kafka集群扩容时,需要满足2点要求:
- 将指定topic迁移到集群内新增的node上。
- 将topic的指定partition迁移到新增的node上。
1. 迁移topic到新增的node上
假如现在一个kafka集群运行三个broker,broker.id依次为101,102,103,后来由于业务数据突然暴增,需要新增三个broker,broker.id依次为104,105,106.目的是要把push-token-topic迁移到新增node上。
1、脚本migration-push-token-topic.json文件内容如下:
Java代码
- {
- "topics":
- [
- {
- "topic": "push-token-topic"
- }
- ],
- "version":1
- }
2、执行脚本如下所示:
Java代码
- root@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183 --topics-to-move-json-file migration-push-token-topic.json --broker-list "104,105,106" --generate
生成分配partitions的json脚本 备份恢复使用:
Current partition replica assignment
{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[8]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":3,"replicas":[5]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":9,"replicas":[5]},{"topic":"cluster-switch-topic","partition":1,"replicas":[5]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[5]},{"topic":"cluster-switch-topic","partition":2,"replicas":[4]},{"topic":"cluster-switch-topic","partition":0,"replicas":[4]},{"topic":"cluster-switch-topic","partition":6,"replicas":[4]},{"topic":"cluster-switch-topic","partition":8,"replicas":[4]}]}
重新分配parttions的json脚本如下:
migration-topic-cluster-switch-topic.json
{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[5]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":3,"replicas":[4]},{"topic":"cluster-switch-topic","partition":9,"replicas":[4]},{"topic":"cluster-switch-topic","partition":1,"replicas":[4]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[4]},{"topic":"cluster-switch-topic","partition":2,"replicas":[5]},{"topic":"cluster-switch-topic","partition":0,"replicas":[5]},{"topic":"cluster-switch-topic","partition":6,"replicas":[5]},{"topic":"cluster-switch-topic","partition":8,"replicas":[5]}]}
3、执行:
Java代码
- root@localhost:$ bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183 --reassignment-json-file migration-topic-cluster-switch-topic.json --execute
执行后会生成一个json格式文件expand-cluster-reassignment.json
4、查询执行状态:
Java代码
- bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183 --reassignment-json-file expand-cluster-reassignment.json --verify
正常执行后会返回当前数据迁移的不用partion的,信息状态类似下面
Java代码
- Reassignment of partition [push-token-topic,0] completed successfully //移动成功
- Reassignment of partition [push-token-topic,1] is in progress //这行代表数据在移动中
- Reassignment of partition [push-token-topic,2] is in progress
- Reassignment of partition [push-token-topic,1] completed successfully
- Reassignment of partition [push-token-topic,2] completed successfully
这样做不会影响原来集群上的topic业务
2.topic修改(replicats-factor)副本个数
假如初始时push-token-topic为一个副本,为了提高可用性,需要改为2副本模式。
脚本replicas-update-push-token-topic.json文件内容如下:
{
"partitions":
[
{
"topic": "log.mobile_nginx",
"partition": 0,
"replicas": [101,102,104]
},
{
"topic": "log.mobile_nginx",
"partition": 1,
"replicas": [102,103,106]
}
],
"version":1
}
2、执行:
Java代码
- root@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183 --reassignment-json-file replicas-update-push-token-topic.json --execute
执行后会列出当前的partition和修改后的patition
3、verify
Java代码
- bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2181 --reassignment-json-file replicas-update-push-token-topic.json --verify
如下:
Status of partition reassignment: Reassignment of partition [log.mobile_nginx,0] completed successfully Reassignment of partition [log.mobile_nginx,1] completed successfully
3.自定义分区和迁移
1、The first step is to hand craft the custom reassignment plan in a json file-
> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
2、Then, use the json file with the --execute option to start the reassignment process-
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] }
3、The --verify option can be used with the tool to check the status of the partition reassignment. Note that the same expand-cluster-reassignment.json (used with the --execute option) should be used with the --verify option
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully
4.topic的分区扩容用法
a.先扩容分区数量,脚本如下:
例如:push-token-topic初始分区数量为12,目前到增加到15个
[email protected]:$ ./bin/kafka-topics.sh --zookeeper 192.168.2.225:2183 --alter --partitions 15 --topic push-token-topic
b.设置topic分区副本
[email protected]:$ ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183
--reassignment-json-file partitions-extension-push-token-topic.json --execute
脚本partitions-extension-push-token-topic.json文件内容如下:
{
"partitions":
[
{
"topic": "push-token-topic",
"partition": 12,
"replicas": [101,102]
},
{
"topic": "push-token-topic",
"partition": 13,
"replicas": [103,104]
},
{
"topic": "push-token-topic",
"partition": 14,
"replicas": [105,106]
}
],
"version":1
}