Apache Kafka(三)- Kakfa CLI 使用

1. Topics CLI

1.1  首先启动 zookeeper 与 kafka

> zookeeper-server-start.sh config/zookeeper.properties

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

INFO Expiring session 0x100ab41939d0000, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)

INFO Processed session termination for sessionid: 0x100ab41939d0000 (org.apache.zookeeper.server.PrepRequestProcessor)

INFO Creating new log file: log.1d (org.apache.zookeeper.server.persistence.FileTxnLog)

> kafka-server-start.sh config/server.properties

Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)

INFO Cluster ID = D69veaGlS5Ce3aHTsxCHkQ (kafka.server.KafkaServer)

INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)

INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)

INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(ip-10-0-2-70.cn-north-1.compute.internal,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 44 (kafka.zk.KafkaZkClient)

这里我们可以简单的了解到,启动了一个Kafka broker,id为 0,监听的端口为9092。

1.2. 创建一个 topic

这里需要注意的是 --replication-factor参数,例如:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic first_topic --create --partitions 3 --replication-factor 2

此命令会返回一个报错:

ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

(kafka.admin.TopicCommand$)

此错误表示的是:指定的replication-factor的数量超过了broker的数量。

所以我们使用以下命令创建一个kafka topic:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic first_topic --create --partitions 3 --replication-factor 1

然后列出已创建的kafka topics:

>  kafka-topics.sh --zookeeper 10.0.2.70:2181 --list

first_topic

如果我们需要更多有关一个topic的信息,如partitions,replication-factors 等,使用--descriebe:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic first_topic --describe

Topic:first_topic       PartitionCount:3        ReplicationFactor:1     Configs:

Topic: first_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Topic: first_topic      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

Topic: first_topic      Partition: 2    Leader: 0       Replicas: 0     Isr: 0

可以看到此topic有3个partition,id分别为0,1,2。每个partition的leader都是broker 0,replicas也是broker 0,Isr也是broker 0(因为replication-replica 为1)

现在我们创建第二个topic:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic second_topic --create --partitions 6 --replication-factor 1

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --list

first_topic

second_topic

1.3. 删除一个topic

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic second_topic --delete

Topic second_topic is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

可以看到,second_topic 被标注为deletion。如果delete.topic.enable没有被设置为true,则此topic不会被删除。

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --list

first_topic

根据list的结果,我们可以看到second_topic 被删除,说明delete.topic.enable 默认是true。

2. Produer CLI

根据kafka-console-produer.sh 的使用描述,在使用此脚本时,必须提供的参数是--broker-list与 –topic,现在我们指定这两个参数后执行:

> kafka-console-producer.sh --broker-list 10.0.2.70:9092 --topic first_topic

然后输入messages:

>

>hello world

>are you ok?

>learning kafka

>another message :)

Ctrl + C 退出

在启动一个producer时,也可以指定它的属性,例如:

> kafka-console-producer.sh --broker-list 10.0.2.70:9092 --topic first_topic --producer-property acks=all

>yep is acked

>hello  ack

>are you ok? acked!

>^C

若是我们指定一个不存在的topic的话会怎么样?

> kafka-console-producer.sh --broker-list 10.0.2.70:9092 --topic new_topic

>new topic messages

[2019-08-08 03:37:47,160] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {new_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

>what about now

>it is ok

>^C

可以看到,在指定一个不存在的topic后,在输入消息时,第一次返回了一个WARN,这是由于此topic 没有一个leader。正如之前提到过的,producer有自动recover的机制,所以会尝试找到一个leader去发送消息。我们使用list看一下结果:

> kafka-topics.sh --zookeeper 10.0.2.70 --list

first_topic

new_topic

> kafka-topics.sh --zookeeper 10.0.2.70 --topic new_topic --describe

Topic:new_topic PartitionCount:1        ReplicationFactor:1     Configs:

Topic: new_topic        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

可以看到自动新创建的new_topic,以及创建后的默认配置:partition数目为1,replication-factor数目也为1。此默认设置在 server.properties 里配置,例如:

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1

建议永远都要先创建topic,不要使用默认创建topic

3. Consumer CLI

通过查看kafka-console-consumer.sh脚本,可以看到必须的参数为:--bootstrap-server 与 --topic。按照规则启动一个consumer:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic

但是可以看到的是,此consumer并未读取任何之前producer发送的数据。原因在于:consumer仅会读取在它启动之后的数据。

所以若是我们此时使用producer向first_topic 发送数据,则会在consumer控制台输出接收到的数据。

那如何获取producer之前发送的所有数据?使用 --from-beginning

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --from-beginning

learning kafka

are you ok? acked!

hello world

another message :)

yep is acked

hi

are you ok?

hello  ack

可以看到,以上消息输出的顺序并不为我们输入的顺序。这是因为仅在同一个partition中的消息是有序的,而first_topic 中有3个partitions。若是一个topic中仅有一个partition,则此topic中的全部消息都是有序的。

3. Consumers in Group

3.1. 使用consumer group

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

使用此方法,可以读取到producer写入的每条消息。

但是如果我们再次启动一个 consumer,使用同样的 --group my-first-app:

最左边的为producer,可以看到的是,第一个consumer先获取一条message,然后第二个consumer获取两条message,然后依次类推。

这是由于:consumer group里当前有两个consumer,而topic有3个partition,所以此时consumer group中的一个consumer会负责2个partition的读,而另一个consumer会负责剩余1个partition的读。

若此时再为同一个consumer group启动一个consumer,则每个partition对应于一个consumer,此时发送3条message,会由3个consumer依次读取。

3.2. 使用--from-beginning

对第二个 consumer group使用 --from-beginning:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-second-app --from-beginning

learning kafka

are you ok? acked!

可以看到此consumer 列出了所有之前的消息。若是我们再次执行此命令,则会发现不会打印任何消息。

这是因为每个group的offsets都会由Kafka记录下来。所以再次使用此group读数据时,会使用记录的offsets继续读取数据。

4. Consumer Group CLI

查看 kafka-consumer-groups的用途:

This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.

必须的参数是 --bootstrap-server

首先列出所有groups:

> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --list

my-first-app

my-first-application

my-second-app

查看一个group的详细信息:

> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --describe --group my-first-app

这里首先打出的是:consumer group ‘my-first-app’ has no active members。这是因为我们已经停止了这个consumer group 下的所有 consumers,所以此consumer group 下面没有一个active members。

接下打出的信息显示了每个partition,当前的offset;log里最终的 offset;以及 LAG,它表示的是最终还未被消费的message数量(也就是cur-offset与log-end-offset的差)。

我们再往 my-first-app 写入几条数据,然后对consumer group 做describe:

可以看到 LAG 增加。

然后使用consumer-group 读此topic:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

help

yep

再 describe:

可以看到LAG为0,且列出了当前consumers 的 id

5. Reset Offset

我们看到 consumer groups 的offset 可以被kafka记录,那如何重置一个consumer group 的offset?使用:

> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --reset-offsets --group my-first-app --topic first_topic --to-earliest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET

my-first-app                   first_topic                    0          0

my-first-app                   first_topic                    2          0

my-first-app                   first_topic                    1          0

使用consumer 检查:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

learning kafka

are you ok? acked!

也可以使用--shift-by将offsets做移动,而不是重置:

这里我们用正数做--shift-by 的参数,可以发现 offset是向后移动。所以若是需要向前移动,则需要使用负数,例如:
> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --reset-offsets
--group my-first-app --topic first_topic --shift-by -2 --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET

my-first-app                   first_topic                    0          12

my-first-app                   first_topic                    2          13

my-first-app                 
 first_topic                    1          13

然后使用 consumer 验证:

> kafka-console-consumer.sh --bootstrap-server
10.0.2.70:9092 --topic first_topic --group my-first-app

help

yep

6. Kafka UI

以上命令均基于命令行,也可以使用图形化界面配置并访问kafka,如Kafka Tool:

此工具官网地址如下:

http://www.kafkatool.com/

原文地址:https://www.cnblogs.com/zackstang/p/11334479.html

时间: 2024-11-05 21:55:42

Apache Kafka(三)- Kakfa CLI 使用的相关文章

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

Apache Kafka系列(二) 命令行工具(CLI)

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka命令行工具(Command Line Interface,CLI),下文简称CLI. 1. 启动Kafka 启动Kafka需要两步: 1.1. 启动ZooKeeper [[email protected] kafka_2.12-0.11.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties 1.2.

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 本文的图片是通过PPT截图出的,读者如果修改意见请联系我 一.Consumer为何需要实现多线程 假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息.该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用AP

消息订阅发布系统Apache Kafka分布式集群环境搭建和简单测试

一.什么是kafka? kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目.在它的主页描述kafka为一个高吞吐量的分布式(能将消息分散到不同的节点上)MQ.Kafka仅仅由7000行Scala编写,据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB) 二.kafka的官方网站在哪里? http://kafka.apache.org/ 三.在哪里下载?需要哪些组件的支持? kafka2.9.2在下面的地址可以下载:

apache kafka总结

apache kafka总结 本文将从多个方面对apache kafka进行总结. 简介 apache kafka是一个分布式消息队列. 这个消息队列在很多场景中被应用, 这一点在kafka apache项目的网站中就有介绍. 相对于其他消息队列而言(比如rabbitMQ等), 在大量数据传输方面性能较好. 当然也有它的不足, 暂时没有足够的中文文档是一方面, 另一方面, kafka在用户制定方面也有所欠缺. 图1-1是apache kafka简略架构图. 图1-1 生产者(producer)与

Apache Kafka分布式消息队列安装配置

一.介绍 Apache Kafka是由Apache软件基金会开发的一个开源消息系统项目,由Scala写成.Kafka最初是由LinkedIn开发,并于2011年初开源.2012年10月从Apache Incubator毕业.该项目的目标是为处理实时数据提供一个统一.高通量.低等待的平台. 二.安装环境kafka server IP: 10.0.0.25 10.0.0.26 操作系统: CentOS 6.5 x86_64 须用到的软件包: jdk-1.7.0_65-fcs.x86_64 java-

[转载] 从Apache Kafka 重温文件高效读写

原文: http://calvin1978.blogcn.com/articles/kafkaio.html 关于文件IO和cache, 让我们对page cache不再陌生. 0. Overview 卡夫卡说:不要害怕文件系统. 它就那么简简单单地用顺序写的普通文件,借力于Linux内核的Page Cache,不(显式)用内存,胜用内存,完全没有别家那样要同时维护内存中数据.持久化数据的烦恼——只要内存足够,生产者与消费者的速度也没有差上太多,读写便都发生在Page Cache中,完全没有同步

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod