kafka 基本操作命令

  • 查看kafka版本

     进入kafka安装目录 ... kafka/libs,看到类似kafka_2.12-2.0.0.jar这样的文件,2.12为scala版本,2.0.0是kafka版本(kafka使用了Scala进行开发).

  • zookeeper-server-start.sh

   参照 kafka环境搭建

  • zookeeper-server-stop.sh

   停止kafka bin/kafka-server-stop.sh

  • kafka-server-start.sh

  参照 kafka环境搭建

  • kafka-server-stop.sh

停止zookeeper  bin/kafka-server-stop.sh

  • kafka-topics.sh

   参照  kafka环境搭建

创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

展示topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

描述topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

删除topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicname

删除topic中存储的内容
在config/server.properties中找到如下的位置log.dirs=路径,删除log.dirs指定的文件目录,然后重新启动就可以了
  • kafka-consumer-groups.sh
-- 获取group列表[email protected]:/usr/local/kafka/kafka_2.12-2.0.0$ bin/kafka-consumer-groups.sh --bootstrap-server 192.168.101.75:6667 --list
console-consumer-61307
-- 查看具体的消费者group信息。其中CURRENT-OFFSET是该分区当前消费到的offset;LOG-END-OFFSET是该分区当前最新的offset;LAG是消费滞后区间[email protected]:/usr/local/kafka/kafka_2.12-2.0.0$ bin/kafka-consumer-groups.sh --bootstrap-server 192.168.101.75:6667 --group console-consumer-61307 --describe
Consumer group ‘console-consumer-61307‘ has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            0          3               3               0               -               -               -

--重置消费者offset:指定GROUP_NAME和topic的offset修改到NEW_OFFSET的位置,重启消费者后,消费中将从指定的offset处消费。注意这里只能NEW_OFFSET只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用
  bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
--重置消费者offset:将指定GROUP_NAME和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费
  bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-earliest/--to-latest --topic TOPIC_NAME

  • kafka-console-consumer.sh

  参照  kafka环境搭建

-- 从头消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
-- 指定offset消费,并限制消费数量(partition必须指定,除非‘--offset‘是明确的)
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0 --max-messages 2
  • kafka-run-class.sh

  从kafka jar包中执行指定的工具类.

[email protected]:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell
An interactive shell for getting topic offsets.
Option                                 Description
------                                 -----------
--broker-list <String: hostname:       REQUIRED: The list of hostname and
  port,...,hostname:port>                port of the server to connect to.
--max-wait-ms <Integer: ms>            DEPRECATED AND IGNORED: The max amount
                                         of time each fetch request waits.
                                         (default: 1000)
--offsets <Integer: count>             DEPRECATED AND IGNORED: number of
                                         offsets returned (default: 1)
--partitions <String: partition ids>   comma separated list of partition ids.
                                         If not specified, it will find
                                         offsets for all partitions (default:
                                         )
--time <Long: timestamp/-1(latest)/-2  timestamp of the offsets before that
  (earliest)>                            (default: -1)
--topic <String: topic>                REQUIRED: The topic to get offset from.

[email protected]:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.101.75:6667  --topic test
test:0:3
[email protected]:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.101.75:6667  --topic test --time -1
test:0:3
[email protected]:/usr/local/kafka/kafka_2.12-2.0.0# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.101.75:6667  --topic test --time -2
test:0:0

原文地址:https://www.cnblogs.com/ryjJava/p/12545617.html

时间: 2024-11-09 02:10:51

kafka 基本操作命令的相关文章

kafka集群安装配置

1.下载安装包 2.解压安装包 3.进入到kafka的config目录修改server.properties文件 进入后显示如下: 修改log.dirs,基本上大部分都是默认配置 kafka依赖zookeeper保存一些meta信息,所以这些需要配置 分发安装包到其他节点上 scp -r .... 再次修改配置文件(重要) 依次修改各服务器上配置文件的的broker.id,分别是0,1,2不得重复. 启动集群 依次在各节点上启动kafka bin/kafka-server-start.sh  c

大数据高可用集群环境安装与配置(10)——安装Kafka高可用集群

1. 获取安装包下载链接 访问https://kafka.apache.org/downloads 找到kafka对应版本 需要与服务器安装的scala版本一致(运行spark-shell可以看到当前安装的scala版本) 2. 执行命令下载并安装 cd /usr/local/src/ wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.2.2/kafka_2.11-2.2.2.tgz tar -zxvf kafka_2.11-2.2

Kafka安装和常用操作命令

Kafka安装: 下载kafka_2.10-0.8.2.1 1.关闭防火墙 2.修改配置文件  server.properties broker.id=1log.dirs= /usr/kafka_2.10-0.8.2.1/data  //最后不要写logzookeeper.connect=master:2181,slave01:2181,slave02:2181delete.topic.enable = true //删除话题的时候需要设置其为truenum.partitions=3//建议默认

kafka常用的操作命令

1.kafka启动命令 nohup bin/kafka-server-start.sh config/server.properties & 2.创建topic bin/kafka-topics.sh --create --zookeeper 10.0.178.210:2181 --replication-factor 1 --partitions 1 --topic yddpi_dlcd 3.列出所有Topic bin/kafka-topics.sh --list --zookeeper 10

Kafka(华为FusionInsight )操作命令

华为大数据kafka操作web界面创建角色.用户.用户管理角色进入服务器环境,进入客户端目录/opt/hadoopclient,导入环境变量source bigdata_env.切换用户kinit kafka用户(kafka_test) 查看当前集群Topic列表. bin/kafka-topics.sh --list --zookeeper <ZooKeeper集群IP:24002/kafka> 查看单个Topic详细信息. bin/kafka-topics.sh --describe --

kafka常规及几个重要的操作命令

1. 查看所有topic kafka-topics.sh --zookeeper hadoop3 --list 2. 创建tooic及topic的partitioner ./kafka-topics.sh --zookeeper hadoop3:2181,hadoop4:2181,hadoop5:2181,hadoop6:2181,hadoop7:2181,hadoop8:2181,hadoop9:2181 --create --topic check-data --partitions 21

Kafka常用topic操作命令汇总

offset topic consumer-group consumer producer producer-golang topic 工具 https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools offset相关 # 最大offset bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test

Kafka 如何读取指定topic中的offset -------------用来验证分区是不是均衡!!!(__consumer_offsets)(注,本文尚在测试验证阶段,,,后续一俩天会追加修正)

我现在使用的是librdkafka 的C/C++ 的客户端来生产消息,用flume来辅助处理异常的数据,,, 但是在前段时间,单独使用flume测试的时候发现,flume不能对分区进行负载均衡!同一个集群中,一个broker的一个分区已经有10亿条数据,另外一台的另一个分区只有8亿条数据: 因此,我对flume参照别人的做法,增加了拦截器: 即在flume配置文件中 增加以下字段: ----- stage_nginx.sources.tailSource.interceptors = i2sta

阿里云构建Kafka单机集群环境

简介 在一台ECS阿里云服务器上构建Kafa单个集群环境需要如下的几个步骤: 服务器环境 JDK的安装 ZooKeeper的安装 Kafka的安装 1. 服务器环境 CPU: 1核 内存: 2048 MB (I/O优化) 1Mbps 操作系统 ubuntu14.04 64位 感觉服务器性能还是很好的,当然不是给阿里打广告,汗. 随便向kafka里面发了点数据,性能图如下所示:  2. 安装JDK 想要跑Java程序,就必须安装JDK.JDK版本,本人用的是JDK1.7. 基本操作如下: 从JDK