Kafka(六)Kafka基本客户端命令操作

主题管理

创建主题

如果配置了auto.create.topics.enable=true(这也是默认值)这样当生产者向一个没有创建的主题发送消息就会自动创建,其分区数量和副本数量也是有默认配置来控制的。

# 我们这里创建一个3个分区每个分区有2个副本的主题
kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create 表示建立
--zookeeper
表示ZK地址,可以传递多个,用逗号分隔

--zookeeper IP:PORT,IP:PORT,IP:PORT/kafka

--replication-factor 表示副本数量,这里的数量是包含Leader副本和Follower副本,副本数量不能超过代理数量
--partitions 表示主题的分区数量,必须传递该参数。Kafka的生产者和消费者采用多线程并行对主题的消息进行处理,每个线程处理一个分区,分区越多吞吐量就会越大,但是分区越多也意味着需要打开更多的文件句柄数量,这样也会带来一些开销。
--topic 表示主题名称

在Zookeeper中可以看到如下信息

删除主题

删除有两种方式手动和自动

  • 手动方式需要删除各个节点日志路径下的该主题所有分区,并且删除zookeeper上/brokers/topics和/config/topics下的对应主题节点
  • 自动删除就是通过脚本来完成,同时需要配置服务器配置文件中的delete.topic.enable=true,默认为false也就是说通过命令删除主题只会删除ZK中的节点,日志文件不会删除需要手动清理,如果配置为true,则会自动删除日志文件。
kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest

下面的两句话就是说该主题标记为删除/admin/delete_topics节点下。实际数据没有影响因为该参数没有设置为true。

查看主题

# 列出所有主题
kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka

下面是从ZK中看到的所有主题

# 查看所有主题信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka

# 查看特定主题信息
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB

Replicas:是AR列表,表示副本分布在哪些代理上,且该列表第一个元素就是Leader副本所在代理

ISR:该列表是显示已经同步的副本集合,这个列表的副本都是存活的

# 通过--describe 和 --under-replicated-partitions 可以查看正在同步的主题或者同步可能发生异常,
# 也就是ISR列表长度小于AR列表,如果一切正常则不会返回任何东西,也可以通过 --tipic 指定具体主题
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions
# 查看哪些主题建立时使用了单独的配置 
kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides

这里只有一个内部主题__comsumer_offsets使用了非配置文件中的设置

配置管理

所谓配置就是参数,比如修改主题的默认参数。

主题级别的

# 查看配置
kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

这里显示 Configs for topic 'BBB' are 表示它的配置有哪些,这里没有表示没有为该主题单独设置配置,都是使用的默认配置。

# 增加一个配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

如果修改的话还是相同的命令,只是把值修改一下

# 删除配置
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

客户端级别

这个主要是设置流控

# 设置指定消费者的流控 --entity-name 是客户端在创建生产者或者消费者时是指定的client.id名称
kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

下图为ZK中对应的信息

分区管理

分区平衡

Leader副本在集群中应该是均衡分布,因为Leader副本对外提供读写服务,尽可能不让同一个主题的多个Leader副本在同一个代理上,但是随着时间推移比如故障转移等情况发送,Leader副本可能不均衡。有两种方式设置自动平衡,自动和手动。

自动就是在配置文件中增加 auto.leader.rebalance.enable true 如果该项为false,当某个节点故障恢复并重新上线后,它原来的Leader副本也不会转移回来,只是一个Follower副本。

手动就是通过命令来执行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分区迁移

当下线一个节点需要将该节点上的分区副本迁移到其他可用节点上,Kafka并不会自动进行分区迁移,如果不迁移就会导致某些主题数据丢失和不可用的情况。当增加新节点时,只有新创建的主题才会分配到新节点上,之前的主题分区不会自动分配到新节点上,因为老的分区在创建时AR列表中没有这个新节点。

上面2个主题,每个主题3个分区,每个分区3个副本,我们假设现在代理2要下线,所以我们要把代理2上的这两个主题的分区数据迁移出来。

# 1. 在KAFKA目录的config目录中建立topics-to-move.json文件
{
    "topics":[
        {
            "topic":"AAA"
        },
        {
            "topic":"BBB"
        }
    ],
    "version":1
}
# 2. 生成分区分配方案,只是生成一个方案信息然后输出
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

这个命令的原理是从zookeeper中读取主题元数据信息及制定的有效代理,根据分区副本分配算法重新计算指定主题的分区副本分配方案。把【Proposed partition reassignment configuration】下面的分区方案保存到一个JSON文件中,partitions-reassignment.json 文件名无所谓。

# 3. 执行方案
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute

# 4. 查看进度
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

查看结果,这里已经没有代理0了。

集群扩容

上面演示了节点下线的数据迁移,这里演示一下集群扩容的数据迁移。我们还是用上面两个主题,假设代理0又重新上线了。其实扩容就是上面的反向操作

# 1. 建立JSON文件
# 该文件和之前的相同
# 2. 生成方案并保存到一个JSON文件中
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

# 3. 数据迁移,这里通过--throttle做一个限流操作,如果数据过大会把网络堵塞。
kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

查看进度和结果

增加副本

原文地址:http://blog.51cto.com/littledevil/2147297

时间: 2024-10-15 06:35:28

Kafka(六)Kafka基本客户端命令操作的相关文章

Redis客户端、服务端的安装以及命令操作

目的: redis简介 redis服务端安装 redis客户端安装 redis相关命令操作 redis简介 官网下载(https://redis.io/) Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库. Redis 与其他 key - value 缓存产品有以下三个特点: Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用. Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset

ZooKeeper客户端命令行操作

ZooKeeper客户端命令行操作 启动服务端 [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh start 查看状态信息 Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [[email protected] zookeeper-3.4.10]$ bin/zkServer.sh status ZooK

最好用的 Kafka Json Logger Java客户端,赶紧尝试一下

最好用的 Kafka Json Logger Java客户端. slf4j4json 最好用的 Kafka Json Logger 库:不尝试一下可惜了! Description 一款为 Kafka 提供的 json logger 客户端,支持将 json 格式的 log 输出到 kafka.文件.控制台. 支持 slf4j 的全部功能. 比 KafkaLog4jAppender 更好用,可配置性更好. 支持 close logger, 在程序关闭之前 flush log to kafka. 支

kafka启动和es启动命令

**6.启动zookeeper** ```/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties ``` **7.启动kafka** ```/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties ``` .查看主题: 如果我们运行list topic命令,我们现在可以看到该主题 /opt/kafka/bi

Apache Kafka安全| Kafka的需求和组成部分

1.目标 - 卡夫卡安全 今天,在这个Kafka教程中,我们将看到Apache Kafka Security 的概念  .Kafka Security教程包括我们需要安全性的原因,详细介绍加密.有了这个,我们将讨论Kafka Security可以轻松解决的问题列表.此外,我们将看到Kafka身份验证和授权.此外,我们将看看ZooKeeper身份验证.那么,让我们开始Apache Kafka Security. Apache Kafka安全| Kafka的需求和组成部分 2.什么是Apache K

二. 第六单元.shell脚本命令

###########################二. 第六单元.shell脚本命令############################# 1.diff diff        file file1            ##比较两个文件的不同        -c                        ##显示周围的行        -u                        ##按照一定格式统一输出生成补丁        -r                      

Redis的一些常用命令操作

五种 基本数据 类型 以及操作命令操作命令的网址:http://doc.redisfans.com/ 一.在可视化界面上打开命令窗口 二.打开后就是这样子 三.命令操作---查询.删除.字符串 1.keys *   查询所有的key 2.keys key*   查询所有key模糊匹配的key 3.del key1     删除key 4.flushall 清空所有数据库的所有key    (注意:轻易不要使用这个命令,不可恢复性) 5.set shanghai "11111"  创建一

Git的纯命令操作,Install,Clone , Commit,Push,Pull,版本回退,撤销更新,分支的创建/切换/更新/提交/合并,代码冲突

Git的纯命令操作,Install,Clone , Commit,Push,Pull,版本回退,撤销更新,分支的创建/切换/更新/提交/合并,代码冲突 这篇是接着上篇分布式版本库--Windows下Git的环境部署以及在GitHub上开源自己的项目讲的,上篇主要是说用GUI来图形化界面操作,但是一般我们程序员也不会这么干,用命令又轻松又愉悦,所以,这里我就再开了一篇来专门说一下纯命令是怎么去操作的,但是要注意哦,其实廖雪峰老师的网站就是非常赞的学习资源哦! 廖雪峰老师:http://www.li

ssh服务优化与客户端命令使用

上一节我们讲解了vim的一套使用方法,今天我们开始我们的第一个网络服务ssh.SSH的英文全称是Secure SHell.通过使用SSH,我们可以把所有传输的数据进行加密,这样"中间人"这种攻击方式就不可能实现了,而且也能够防止DNS和IP欺骗.说白了他就是我们本地电脑用secureCRT远程登录服务器的所要开启的服务.如果没有开启,是登录不了我们的服务器.我们的服务器操作系统默认已经安装了服务.有童鞋好奇问,前面我们不是已经讲过此服务了,怎么在这里又重复讲呢?前面我们只是简单的使用这