Kafka命令行详细介绍

常用的几个命令如下:

kafka-server-start.sh
kafka-console-consumer.sh
kafka-console-producer.sh
kafka-topics.sh
在这几个命令中,第一个仅用于启动Kafka,后两个console常用于测试,用途最多的是最后一个命令,所以下面命令中主要介绍的就是 kafka-topics.sh。

kafka-server-start.sh
用法:> bin/kafka-server-start.sh [-daemon] server.properties [--override property=value]*

这个命令后面可以有多个参数,第一个是可选参数,该参数可以让当前命令以后台服务方式执行,第二个必须是 Kafka 的配置文件。后面还可以有多个--override开头的参数,其中的property可以是Broker Configs中提供的所有参数。这些额外的参数会覆盖配置文件中的设置。

例如下面使用同一个配置文件,通过参数覆盖启动多个Broker。

> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=0 --override log.dirs=/tmp/kafka-logs-1 --override listeners=PLAINTEXT://:9092 --override advertised.listeners=PLAINTEXT://192.168.16.150:9092

> bin/kafka-server-start.sh -daemon config/server.properties --override broker.id=1 --override log.dirs=/tmp/kafka-logs-2 --override listeners=PLAINTEXT://:9093 --override advertised.listeners=PLAINTEXT://192.168.16.150:9093

上面这种用法只是用于演示,真正要启动多个Broker 应该针对不同的 Broker 创建相应的 server.properties 配置。

kafka-console-consumer.sh
这个命令只是简单的将消息输出到标准输出中,该命令支持的参数如下。

option                                   Description
------                                   -----------
--blacklist <String: blacklist>          Blacklist of topics to exclude from
                                           consumption.
--bootstrap-server <String: server to    REQUIRED (unless old consumer is
  connect to>                              used): The server to connect to.
--consumer-property <String:             A mechanism to pass user-defined
  consumer_prop>                           properties in the form key=value to
                                           the consumer.
--consumer.config <String: config file>  Consumer config properties file. Note
                                           that [consumer-property] takes
                                           precedence over this config.
--csv-reporter-enabled                   If set, the CSV metrics reporter will
                                           be enabled
--delete-consumer-offsets                If specified, the consumer path in
                                           zookeeper is deleted when starting up
--enable-systest-events                  Log lifecycle events of the consumer
                                           in addition to logging consumed
                                           messages. (This is specific for
                                           system tests.)
--formatter <String: class>              The name of a class to use for
                                           formatting kafka messages for
                                           display. (default: kafka.tools.
                                           DefaultMessageFormatter)
--from-beginning                         If the consumer does not already have
                                           an established offset to consume
                                           from, start with the earliest
                                           message present in the log rather
                                           than the latest message.
--key-deserializer <String:
  deserializer for key>
--max-messages <Integer: num_messages>   The maximum number of messages to
                                           consume before exiting. If not set,
                                           consumption is continual.
--metrics-dir <String: metrics           If csv-reporter-enable is set, and
  directory>                               this parameter isset, the csv
                                           metrics will be outputed here
--new-consumer                           Use the new consumer implementation.
                                           This is the default.
--offset <String: consume offset>        The offset id to consume from (a non-
                                           negative number), or ‘earliest‘
                                           which means from beginning, or
                                           ‘latest‘ which means from end
                                           (default: latest)
--partition <Integer: partition>         The partition to consume from.
--property <String: prop>                The properties to initialize the
                                           message formatter.
--skip-message-on-error                  If there is an error when processing a
                                           message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms>       If specified, exit if no message is
                                           available for consumption for the
                                           specified interval.
--topic <String: topic>                  The topic id to consume on.
--value-deserializer <String:
  deserializer for values>
--whitelist <String: whitelist>          Whitelist of topics to include for
                                           consumption.
--zookeeper <String: urls>               REQUIRED (only when using old
                                           consumer): The connection string for
                                           the zookeeper connection in the form
                                           host:port. Multiple URLS can be
                                           given to allow fail-over. 

--bootstrap-server 必须指定,通常--topic也要指定查看的主题。如果想要从头查看消息,还可以指定--from-beginning参数。一般使用的命令如下。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

还可以通过下面的命令指定分区查看:

>> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --partition 0

kafka-console-producer.sh
这个命令可以将文件或标准输入的内容发送到Kafka集群。该命令参数如下。

Option                                   Description
------                                   -----------
--batch-size <Integer: size>             Number of messages to send in a single
                                           batch if they are not being sent
                                           synchronously. (default: 200)
--broker-list <String: broker-list>      REQUIRED: The broker list string in
                                           the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String:             The compression codec: either ‘none‘,
  compression-codec]                       ‘gzip‘, ‘snappy‘, or ‘lz4‘.If
                                           specified without value, then it
                                           defaults to ‘gzip‘
--key-serializer <String:                The class name of the message encoder
  encoder_class>                           implementation to use for
                                           serializing keys. (default: kafka.
                                           serializer.DefaultEncoder)
--line-reader <String: reader_class>     The class name of the class to use for
                                           reading lines from standard in. By
                                           default each line is read as a
                                           separate message. (default: kafka.
                                           tools.
                                           ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on       The max time that the producer will
  send>                                    block for during a send request
                                           (default: 60000)
--max-memory-bytes <Long: total memory   The total memory used by the producer
  in bytes>                                to buffer records waiting to be sent
                                           to the server. (default: 33554432)
--max-partition-memory-bytes <Long:      The buffer size allocated for a
  memory in bytes per partition>           partition. When records are received
                                           which are smaller than this size the
                                           producer will attempt to
                                           optimistically group them together
                                           until this size is reached.
                                           (default: 16384)
--message-send-max-retries <Integer>     Brokers can fail receiving the message
                                           for multiple reasons, and being
                                           unavailable transiently is just one
                                           of them. This property specifies the
                                           number of retires before the
                                           producer give up and drop this
                                           message. (default: 3)
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds
  expiration interval>                     after which we force a refresh of
                                           metadata even if we haven‘t seen any
                                           leadership changes. (default: 300000)
--old-producer                           Use the old producer implementation.
--producer-property <String:             A mechanism to pass user-defined
  producer_prop>                           properties in the form key=value to
                                           the producer.
--producer.config <String: config file>  Producer config properties file. Note
                                           that [producer-property] takes
                                           precedence over this config.
--property <String: prop>                A mechanism to pass user-defined
                                           properties in the form key=value to
                                           the message reader. This allows
                                           custom configuration for a user-
                                           defined message reader.
--queue-enqueuetimeout-ms <Integer:      Timeout for event enqueue (default:
  queue enqueuetimeout ms>                 2147483647)
--queue-size <Integer: queue_size>       If set and the producer is running in
                                           asynchronous mode, this gives the
                                           maximum amount of  messages will
                                           queue awaiting sufficient batch
                                           size. (default: 10000)
--request-required-acks <String:         The required acks of the producer
  request required acks>                   requests (default: 1)
--request-timeout-ms <Integer: request   The ack timeout of the producer
  timeout ms>                              requests. Value must be non-negative
                                           and non-zero (default: 1500)
--retry-backoff-ms <Integer>             Before each retry, the producer
                                           refreshes the metadata of relevant
                                           topics. Since leader election takes
                                           a bit of time, this property
                                           specifies the amount of time that
                                           the producer waits before refreshing
                                           the metadata. (default: 100)
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.
                                           (default: 102400)
--sync                                   If set message send requests to the
                                           brokers are synchronously, one at a
                                           time as they arrive.
--timeout <Integer: timeout_ms>          If set and the producer is running in
                                           asynchronous mode, this gives the
                                           maximum amount of time a message
                                           will queue awaiting sufficient batch
                                           size. The value is given in ms.
                                           (default: 1000)
--topic <String: topic>                  REQUIRED: The topic id to produce
                                           messages to.
--value-serializer <String:              The class name of the message encoder
  encoder_class>                           implementation to use for
                                           serializing values. (default: kafka.
                                           serializer.DefaultEncoder)   

其中 --broker-list 和 --topic 是两个必须提供的参数。

常用命令如下。

使用标准输入方式。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

从文件读取:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test < file-input.txt

kafka-topics.sh
相比上面几个偶尔使用的命令来说,kafka-topics.sh 相对就比较重要。该命令包含以下参数。

Create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions,
                                           replica assignment, and/or
                                           configuration for the topic.
--config <String: name=value>            A topic configuration override for the
                                           topic being created or altered.The
                                           following is a list of valid
                                           configurations:
                                            cleanup.policy
                                            compression.type
                                            delete.retention.ms
                                            file.delete.delay.ms
                                            flush.messages
                                            flush.ms
                                            follower.replication.throttled.
                                           replicas
                                            index.interval.bytes
                                            leader.replication.throttled.replicas
                                            max.message.bytes
                                            message.format.version
                                            message.timestamp.difference.max.ms
                                            message.timestamp.type
                                            min.cleanable.dirty.ratio
                                            min.compaction.lag.ms
                                            min.insync.replicas
                                            preallocate
                                            retention.bytes
                                            retention.ms
                                            segment.bytes
                                            segment.index.bytes
                                            segment.jitter.ms
                                            segment.ms
                                            unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs.
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option).
--describe                               List details for the given topics.
--disable-rack-aware                     Disable rack aware replica assignment
--force                                  Suppress console prompts
--help                                   Print usage information.
--if-exists                              if set when altering or deleting
                                           topics, the action will only execute
                                           if the topic exists
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist
--list                                   List all available topics.
--partitions <Integer: # of partitions>  正在创建或更改主题的分区数
                                         (警告:如果为具有密钥的主题
                                         (分区)增加了分区
                                          消息的逻辑或排序将受到影响
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           正在创建的主题中每个分区的复制因子。
  replication factor>
--topic <String: topic>                  The topic to be create, alter or
                                           describe. Can also accept a regular
                                           expression except for --create option
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--zookeeper <String: urls>               REQUIRED: The connection string for
                                           the zookeeper connection in the form
                                           host:port. Multiple URLS can be
                                           given to allow fail-over. 

下面是几种常用的 topic 命令。

描述主题的配置
bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic

设置保留时间
# Deprecated way
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test_topic --config retention.ms=1000

# Modern way
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000

如果您需要删除主题中的所有消息,则可以利用保留时间。首先将保留时间设置为非常低(1000 ms),等待几秒钟,然后将保留时间恢复为上一个值。

注意:默认保留时间为24小时(86400000毫秒)。

删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic

注意:需要在Broker的配置文件server.properties中配置 delete.topic.enable=true 才能删除主题。

主题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic

添加分区
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic test_topic --partitions 3

创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic

列出主题
bin/kafka-topics.sh --list --zookeeper localhost:2181

topic 相关内容来源:http://ronnieroller.com/kafka/cheat-sheet

命令那么多,怎么记?
Kafka 的命令行工具提供了非常丰富的提示信息,所以只需要记住上面大概的几个用法,知道怎么写就行。当需要用到某个命令时,通过命令提示进行操作。

比如说,如何使用 kafka-configs.sh 查看主题(Topic)的配置?

首先,在命令行中输入bin/kafka-configs.sh,然后或输出下面的命令提示信息。

Add/Remove entity config for a topic, client, user or broker
Option                      Description
------                      -----------
--add-config <String>       Key Value pairs of configs to add. Square brackets
                              can be used to group values which contain commas:
                              ‘k1=v1,k2=[v1,v2,v2],k3=v3‘. The following is a
                              list of valid configurations: For entity_type
                              ‘topics‘:
                                cleanup.policy
                                compression.type
                                delete.retention.ms
                                file.delete.delay.ms
                                flush.messages
                                flush.ms
                                follower.replication.throttled.replicas
                                index.interval.bytes
                                leader.replication.throttled.replicas
                                max.message.bytes
                                message.format.version
                                message.timestamp.difference.max.ms
                                message.timestamp.type
                                min.cleanable.dirty.ratio
                                min.compaction.lag.ms
                                min.insync.replicas
                                preallocate
                                retention.bytes
                                retention.ms
                                segment.bytes
                                segment.index.bytes
                                segment.jitter.ms
                                segment.ms
                                unclean.leader.election.enable
                            For entity_type ‘brokers‘:
                                follower.replication.throttled.rate
                                leader.replication.throttled.rate
                            For entity_type ‘users‘:
                                producer_byte_rate
                                SCRAM-SHA-256
                                SCRAM-SHA-512
                                consumer_byte_rate
                            For entity_type ‘clients‘:
                                producer_byte_rate
                                consumer_byte_rate
                            Entity types ‘users‘ and ‘clients‘ may be specified
                              together to update config for clients of a
                              specific user.
--alter                     Alter the configuration for the entity.
--delete-config <String>    config keys to remove ‘k1,k2‘
--describe                  List configs for the given entity.
--entity-default            Default entity name for clients/users (applies to
                              corresponding entity type in command line)
--entity-name <String>      Name of entity (topic name/client id/user principal
                              name/broker id)
--entity-type <String>      Type of entity (topics/clients/users/brokers)
--force                     Suppress console prompts
--help                      Print usage information.
--zookeeper <String: urls>  REQUIRED: The connection string for the zookeeper
                              connection in the form host:port. Multiple URLS
                              can be given to allow fail-over.

从第一行可以看到这个命令可以修改 topic, client, user 或 broker 的配置。

如果要设置 topic,就需要设置 entity-type 为topics,输入如下命令:

> bin/kafka-configs.sh --entity-type topics
Command must include exactly one action: --describe, --alter

命令提示需要指定一个操作(不只是上面提示的两个操作),增加--describe试试:

> bin/kafka-configs.sh --entity-type topics --describe
[[email protected] kafka_2.11-0.10.2.1]# bin/kafka-configs.sh --entity-type topics --describe
Missing required argument "[zookeeper]"

继续增加 --zookeeper:

> bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181
Configs for topic ‘__consumer_offsets‘ are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

由于没有指定主题名,这里显示了__consumer_offsets的信息。下面指定一个topic试试。

> bin/kafka-configs.sh --entity-type topics --describe --zookeeper localhost:2181 --entity-name test
Configs for topic ‘test‘ are

此时显示了test主题的信息,这里是空。

因为Kafka完善的命令提示,可以很轻松的通过提示信息来进行下一步操作,运用熟练后,基本上很快就能实现自己想要的命令。

本文转自:https://blog.csdn.net/isea533/article/details/73720066

原文地址:https://www.cnblogs.com/JThinking/p/9447789.html

时间: 2024-10-19 17:51:26

Kafka命令行详细介绍的相关文章

Linux Bash内置命令大全详细介绍

转自:http://os.51cto.com/art/201006/207329.htm 主要Shell内置命令 Shell有很多内置在其源代码中的命令.这些命令是内置的,所以Shell不必到磁盘上搜索它们,执行速度因此加快.不同的Shell内置命令有所不同. A.2.1  bash内置命令 .:执行当前进程环境中的程序.同source. . file:dot命令从文件file中读取命令并执行. : 空操作,返回退出状态0. alias:显示和创建已有命令的别名. bg:把作业放到后台. bin

tp5命令行基础介绍

查看指令 生成模块 生成文件 生成类库映射文件 生成路由缓存文件 生成数据表字段缓存文件 指令扩展示例 命令行调试 命令行颜色支持 调用命令 查看指令 命令行工具需要在命令行下面执行,请先确保你的php.exe已经加入了系统环境变量Path. 应用的命令行入口文件是应用根目录的think文件,其内容如下: // 定义项目路径 define('APP_PATH', './application/'); // 加载框架命令行引导文件 require './thinkphp/console.php';

Linux性能查看与分析--命令行工具介绍

本文介绍工作中常用的几个linux性能查看命令:top,sar,vmstat,iostat,pidstat等. 1.top top是最常用的linux性能分析工具,它能够实时的显示系统中各个进程的资源占用情况.top命令的部分输出如下: top的交互命令有以下几个: (1)f 选择显示的列 (2)o或O 改变列的显示顺序 (3)l 切换显示平均负载和启动时间信息 (4)m 切换显示内存信息 (5)t 切换显示进程和CPU状态信息 (6)c 切换显示命令名称和完整命令行 (7)M 根据驻留内存大小

华为eNSP中交换机命令行简单介绍

华为eNSP简单介绍1.华为交换机的命令行用户视图 <huawei> 系统视图 [huawei] <Huawei>system-view //从用户视图进入系统视图 接口视图 [Huawei]interface Ethernet 0/0/1[Huawei-Ethernet0/0/1]协议视图2.视图间的转换quit 返回上一视图return/Ctrl+Z返回用户视图3.配置交换机名称<Huawei>system-view[Huawei]sysname dqq[dqq]4

linux 常用基础命令 tar 详细介绍

[命令介绍] tar命令可以为linux的文件和目录创建档案.利用tar,可以为某一特定文件创建档案(备份文件),也可以在档案中改变文件,或者向档案中加入新的文件.tar最初被用来在磁带上创建档案,现在,用户可以在任何设备上创建档案.利用tar命令,可以把一大堆的文件和目录全部打包成一个文件,这对于备份文件或将几个文件组合成为一个文件以便于网络传输是非常有用的. 首先要弄清两个概念:打包和压缩. 打包是指将一大堆文件或目录变成一个总的文件: 压缩则是将一个大的文件通过一些压缩算法变成一个小文件.

linux 常用基础命令 cat 详细介绍

cat 输出文件内容: 命令说明:cat(Concatenate的缩写),一条linux内置命令,把一个或者多个文件连接在一起,并标准输出或输入.常用来显示文件内容,或者将几个文件连接起来显示,或者从标准输入读取内容并显示.它常与重定向符号配合使用. 命令功能: a)  一次显示整个文件:catfilename b)  从键盘创建一个文件:cat> filename 只能创建新文件,不能编辑已有文件 c)  将几个文件合并为一个文件:catfile1 file2 > file 注: cat f

Linux操作系统中“rpm”命令的详细介绍

Linux中常见的软件包封装类型如下所述: RPM软件包:这种软件包的扩展名为 ".rpm" ,只能在使用RPM机制的Linux操作系统中安装,如RHEL.Fedora.centOS等.RPM软件包一般针对特定版本的操作系统量身定制,因此依赖性较强. 源代码安装包:这种软件包是程序员开发完成的原始代码,一般被制作成 ".tar.gz" 或 ".tar.bz2" 等格式的压缩包文件,因多数使用 tar 命令打包而成的,所以经常被称为 "t

kafka命令行

1.创建topic ./kafka-topics.sh --create --zookeeper 172.17.0.25:2181 --topic test --partitions 1 --replication-factor 1 2.查询topic列表 ./kafka-topics.sh --zookeeper 172.17.0.25:2181 --list 3.查询某个topic的详细信息 ./kafka-topics.sh --zookeeper 172.17.0.25:2181 --d

Git 命令行操作介绍

git 客户端 功能 命令 可选参数  说明 SSH ssh-keygen   生成密钥(注意要在~/.ssh文件夹执行,或生成后移动到.ssh) 仓库 git clone  -b | --branch 克隆指定分支 --depth <depth> 创建一个浅克隆,克隆指定历史提交记录 --shallow-since=<date> 创建一个浅克隆,克隆指定日期的历史提交记录 git init   初始化仓库 分支管理 git commit   提交更改到仓库 git merge