[bigdata] kafka基本命令 -- 迁移topic partition到指定的broker

版本 0.9.2

创建topic

bin/kafka-topics.sh --create --topic topic_name --partition 6 --replication-factor 1 -zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181

开启console consumer查看消息

bin/kafka-console-consumer.sh --topic rt_live_pcweb -zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181 --from-begin

改变topic的partition数, kafka topic的partition数只能增加,不能减少。

[[email protected] kafka]# bin/kafka-topics.sh --zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181 --alter --topic rt_live_pcweb --partitions 8

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected

Error while executing topic command The number of partitions for a topic can only be increased

kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased

at kafka.admin.AdminUtils$.addPartitions(AdminUtils.scala:98)

at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:109)

at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:93)

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:93)

at kafka.admin.TopicCommand$.main(TopicCommand.scala:52)

at kafka.admin.TopicCommand.main(TopicCommand.scala)

迁移topic的partition到指定的broker上

1. 将需要迁移的topic按如下格式写成json文件

[[email protected] kafka]# cat topics-to-move.json

{"topics":

[{"topic": "rt_live_mobile_new"}],

"version":1

}

2. 执行如下命令,其中 0,1,2,3 为要迁移到的broker id

[[email protected] kafka]# bin/kafka-reassign-partitions.sh --zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

Current partition replica assignment

{"version":1,"partitions":[{"topic":"rt_live_mobile_new","partition":2,"replicas":[4]},{"topic":"rt_live_mobile_new","partition":1,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":6,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":4,"replicas":[6]},{"topic":"rt_live_mobile_new","partition":8,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":7,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":11,"replicas":[5]},{"topic":"rt_live_mobile_new","partition":3,"replicas":[5]},{"topic":"rt_live_mobile_new","partition":0,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":10,"replicas":[4]},{"topic":"rt_live_mobile_new","partition":5,"replicas":[7]},{"topic":"rt_live_mobile_new","partition":9,"replicas":[3]}]}

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"rt_live_mobile_new","partition":2,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":6,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":1,"replicas":[3]},{"topic":"rt_live_mobile_new","partition":8,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":4,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":7,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":11,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":10,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":0,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":3,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":9,"replicas":[3]},{"topic":"rt_live_mobile_new","partition":5,"replicas":[3]}]}

3. 将上面命令的输出(红色部门)存入json文件reassignment.json中

4. 执行如下命令,即将partition都迁移至broker 0,1,2,3 服务器上。

[[email protected] kafka]# bin/kafka-reassign-partitions.sh --zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181 --reassignment-json-file reassignment.json --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"rt_live_mobile_new","partition":2,"replicas":[4]},{"topic":"rt_live_mobile_new","partition":1,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":6,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":4,"replicas":[6]},{"topic":"rt_live_mobile_new","partition":8,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":7,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":11,"replicas":[5]},{"topic":"rt_live_mobile_new","partition":3,"replicas":[5]},{"topic":"rt_live_mobile_new","partition":0,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":10,"replicas":[4]},{"topic":"rt_live_mobile_new","partition":5,"replicas":[7]},{"topic":"rt_live_mobile_new","partition":9,"replicas":[3]}]}

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"rt_live_mobile_new","partition":2,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":4,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":8,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":1,"replicas":[3]},{"topic":"rt_live_mobile_new","partition":6,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":7,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":11,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":3,"replicas":[1]},{"topic":"rt_live_mobile_new","partition":0,"replicas":[2]},{"topic":"rt_live_mobile_new","partition":10,"replicas":[0]},{"topic":"rt_live_mobile_new","partition":5,"replicas":[3]},{"topic":"rt_live_mobile_new","partition":9,"replicas":[3]}]}

5.隔一会之后(partition迁移需要一段时间),执行describe查看,partition已经迁移成功

[[email protected] kafka]# bin/kafka-topics.sh --topic rt_live_mobile_new --describe -zookeeper 10.27.100.207:2181,10.27.100.144:2181,10.27.100.145:2181

Topic:rt_live_mobile_new PartitionCount:12 ReplicationFactor:1 Configs:

Topic: rt_live_mobile_new Partition: 0 Leader: 2 Replicas: 2 Isr: 2

Topic: rt_live_mobile_new Partition: 1 Leader: 3 Replicas: 3 Isr: 3

Topic: rt_live_mobile_new Partition: 2 Leader: 0 Replicas: 0 Isr: 0

Topic: rt_live_mobile_new Partition: 3 Leader: 1 Replicas: 1 Isr: 1

Topic: rt_live_mobile_new Partition: 4 Leader: 2 Replicas: 2 Isr: 2

Topic: rt_live_mobile_new Partition: 5 Leader: 3 Replicas: 3 Isr: 3

Topic: rt_live_mobile_new Partition: 6 Leader: 0 Replicas: 0 Isr: 0

Topic: rt_live_mobile_new Partition: 7 Leader: 1 Replicas: 1 Isr: 1

Topic: rt_live_mobile_new Partition: 8 Leader: 2 Replicas: 2 Isr: 2

Topic: rt_live_mobile_new Partition: 9 Leader: 3 Replicas: 3 Isr: 3

Topic: rt_live_mobile_new Partition: 10 Leader: 0 Replicas: 0 Isr: 0

Topic: rt_live_mobile_new Partition: 11 Leader: 1 Replicas: 1 Isr: 1

时间: 2024-10-09 21:13:50

[bigdata] kafka基本命令 -- 迁移topic partition到指定的broker的相关文章

kafka学习(四)-Topic & Partition

topic中partition存储分布 Topic在逻辑上可以被认为是一个queue.每条消费都必须指定它的topic,可以简单理解为必须指明把这条消息放进哪个queue里.为了使得 Kafka的吞吐率可以水平扩展,物理上把topic分成一个或多个partition,每个partition在物理上对应一个文件夹,该文件夹下存储 这个partition的所有消息和索引文件.partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减

Kafka Topic Partition Replica Assignment实现原理及资源隔离方案

本文共分为三个部分: Kafka Topic创建方式 Kafka Topic Partitions Assignment实现原理 Kafka资源隔离方案 1. Kafka Topic创建方式 Kafka Topic创建方式有以下两种表现形式: (1)创建Topic时直接指定Topic Partition Replica与Kafka Broker之间的存储映射关系 /usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeep

kafka的迁移干货

随着业务的发展, 服务器所在网段/机群不允许kafka继续保留在那, 需要移动到先机器上. 哎呀上面是废话,总的说就是: 2台老kafka不要了,数据要迁移到新的2台kafka上面.要求数据不丢失 通过查询官网,并无直接切换的命令,当前版本是kafka 0.8.1,  说是0.8.2才提供老机器的退役功能. 不过没关系, 我们有一个变通的方法: kafka提供了修改复制因子的方法, 我们可以将她的复制目标机器改成新的节点.  这样所有发往老节点的数据都会被转移到新节点去. 等你将发送者的API修

kafka数据迁移

kafka数据迁移操作流程: 此次数据迁移主要是针对Isr自动同步异常,而进行手动干预的操作. 具体的故障截图如下: 一.确认同步异常的topic和Isr数目 ./kafka-topics.sh --under-replicated-partitions --describe --zookeeper IP:2181 二.创建文件/tmp/topics-to-move.json vim /tmp/topics-to-move.json 复制这些topic,并写成如下格式的文件, 命名为 topic

使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)

使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题将使用默认值,先改变需要使用命令bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --config max.message.bytes=128000显示的修改,我们也希望将此过程在Producer调用之前通

Kafka分区分配策略(Partition Assignment Strategy

问题 用过 Kafka 的同学用过都知道,每个 Topic 一般会有很多个 partitions.为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,而每个 Consumer 又会启动一个或多个streams去分别消费 Topic 里面的数据.我们又知道,Kafka 存在 Consumer Group 的概念,也就是 group.id 一样的 Consumer,这些 Consumer 属于同一个Consumer Group,组内的所有消费者协调在一起来消费订阅主题(su

如何彻底删除Kafka中的topic (marked for deletion)

工作中因为各种原因, 例如topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,可能需要彻底清理一下kafka topic,那么如何彻底删除topic?方法一(配置delete.topic.enable=true)  修改kafaka配置文件server.properties, 添加delete.topic.enable=true,重启kafka,之后通过kafka命令行就可以直接删除topic  通过命令行删除topic:     ./bin/kafka-topics.sh --de

kafka基本命令和实践

Kafka基本命令 #启动server ./bin/kafka-server-start.sh config/server.properties #创建topic(主题)test ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -partitions 1 --topic test #删除主题 ./bin/kafka-topics.sh --zookeeper localhost:21

Kafka如何删除topic?

Kafka如何删除topic? 今天为大家带来"Kafka删除topic原理解析",希望可以帮到那些苦于无法删除topic的朋友们. 前提条件: 在启动broker时候开启删除topic的开关,即在server.properties中添加:  delete.topic.enable=true 命令: bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name 这条命令其实就是在zo