kafka脚本

为了便于使用,kafka提供了比较强大的Tools,把经常需要使用的整理一下

开关kafka Server

bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-stop.shJMX_PORT=9999 nohup bin/kafka-server-start.sh config/server.properties &

topic相关

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testbin/kafka-topics.sh --list --zookeeper localhost:2181

describe topic的详细情况

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

修改topic的partition,只能增加

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test

到0.8.2才正式支持删除topic,当前是beta版

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

查看有问题的partition

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --unavailable-partitions --topic test

per-topic 修改参数
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --config max.message.bytes=128000
> bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --deleteConfig max.message.bytes

集群扩展 
集群扩展,对于broker还是比较简单的,但是现有的topic上的partition是不会做自动迁移的 
需要手工做迁移,但kafka提供了比较方便的工具,

--generate,生成参考的迁移计划 
given a list of topics and a list of brokers,工具会给出迁徙方案

把topic完全迁移到新的brokers

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

给出当前的assignment情况和,迁移方案

我们可以同时保存当前的assignment情况和迁移方案,当前的assignment情况可以用于rollback

--execute,开始执行迁移

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}

--verify,check当前的迁移状态

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

选择topic的某个partition的某些replica进行迁徙

moves partition 0 of topic foo1 to brokers 5,6 and partition 1 of topic foo2 to brokers 2,3

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

brokers下线

当前版本不支持下线的规划,需要到0.8.2才支持,这需要把一个broker上的replica清空

增加replication factor

partition 0的replica数从1增长到3,当前replica存在broker5,在broker6,7上增加replica

> cat increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}

Producer console

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

后面可以任意的输入message,都会发到broker的topic中

Comsumer console

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

从头读这个topic,可以重复读到所有数据 
我在想为啥,每次都能replay,原来每次都是随机产生一个groupid 
consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))

Consumer Offset Checker

这个会显示出consumer group的offset情况, 必须参数为--group, 不指定--topic,默认为所有topic

Displays the:  Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker

required argument: [group] 
Option Description 
------ ----------- 
--broker-info Print broker info 
--group Consumer group. 
--help Print this message. 
--topic Comma-separated list of consumer 
   topics (all topics if absent). 
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Example,

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   21              21              0               none 
pv              page_visits                    1   19              19              0               none 
pv              page_visits                    2   20              20              0               none

Export Zookeeper Offsets

将Zk中的offset信息以下面的形式打到file里面去

A utility that retrieves the offsets of broker partitions in ZK and prints to an output file in the following format:

/consumers/group1/offsets/topic1/1-0:286894308 
/consumers/group1/offsets/topic1/2-0:284803985

bin/kafka-run-class.sh kafka.tools.ExportZkOffsets

required argument: [zkconnect] 
Option Description 
------ ----------- 
--group Consumer group. 
--help Print this message. 
--output-file Output file 
--zkconnect ZooKeeper connect string. (default: localhost:2181)

Update Offsets In Zookeeper

这个挺有用,用于replay, kafka的文档有点坑爹,看了不知道咋用,还是看源码才看明白

A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK

USAGE: kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

Example,

bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties  page_visits

Group           Topic                          Pid Offset          logSize         Lag             Owner 
pv              page_visits                    0   0               21              21              none 
pv              page_visits                    1   0               19              19              none 
pv              page_visits                    2   0               20              20              none

可以看到offset已经被清0,Lag=logSize

更加直接的方式是,直接去Zookeeper里面看

通过zkCli.sh连上后,通过ls查看

Broker Node Registry

/brokers/ids/[0...N] --> host:port (ephemeral node)

Broker Topic Registry

/brokers/topics/[topic]/[0...N] --> nPartions (ephemeral node)

Consumer Id Registry

/consumers/[group_id]/ids/[consumer_id] --> {"topic1": #streams, ..., "topicN": #streams} (ephemeral node)

Consumer Offset Tracking

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id] --> offset_counter_value ((persistent node)

Partition Owner registry

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id] --> consumer_node_id (ephemeral node)
时间: 2024-08-26 00:00:29

kafka脚本的相关文章

【kafka】一键启动kafka脚本

3.1 创建文件cd bin 跳转到bin文件夹里touch  start-kafka-cluster.sh --新建一键启动文件touch  stop-kafka-cluster.sh --新建一键停止文件 3.2 编写startstart-kafka-cluster.sh内容:#!/bin/bashbrokers="hadoop01 hadoop02 hadoop03"KAFKA_HOME="/home/hadoop/kafka_2.12-2.3.0"KAFKA

Centos6.5下kafka部署安装

一)安装JDK 步骤1:新建路径/usr/Java, 并在其下解压 jdk-7u71-linux-x64.tar.gz # mkdir -p /usr/java # tar -zxvf jdk-7u71-linux-x64.tar.gz 步骤2:添加JDK到系统环境变量 # vi /etc/profile 新增以下内容: export JAVA_HOME=/usr/java/jdk1.7.0_71 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH

kafka监控系统

Metrics-Java版的指标度量工具之一 Metrics-Java版的指标度量工具之二 JAVA Metrics 度量工具使用介绍1 JAVA Metrics度量工具 - Metrics Core 翻译 http://blog.synyx.de/2013/09/yammer-metrics-made-easy-part-i/   http://kafka.apache.org/documentation.html#monitoring https://cwiki.apache.org/con

kafka本地单机安装部署

kafka是一种高吞吐量的分布式发布订阅消息系统,这几天要上kafka,只在其中的一个节点使用,结合具体的项目实践在此将kafka的本地安装部署流程记录下来与各位同仁分享交流. 准备工作: 上述的文件除了jdk以外均放在/usr/local/kafka目录下. 1.安装jdk,kafka的使用要用到jdk 首先检查有无jdk:java -version cd /usr/local/hadoop(本例中我是将jdk的安装包放到hadoop文件夹下,各位可以依据自己情况) http://www.or

Kafka Topic的详细信息 捎带主要的安装步骤

1. 安装步骤 Kafka伪分布式安装的思路跟Zookeeper的伪分布式安装思路完全一样,不过比Zookeeper稍微简单些(不需要创建myid文件), 主要是针对每个Kafka服务器配置一个单独的server.properties,三个服务器分别使用server.properties,server.1.properties, server.2.properties cp server.properties server.1.properties cp server.properties se

Linux下Kafka单机安装配置方法(图文)

Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢 介绍 Kafka是一个分布式的.可分区的.可复制的消息系统.它提供了普通消息系统的功能,但具有自己独特的设计.这个独特的设计是什么样的呢? 首先让我们看几个基本的消息系统术语: •Kafka将消息以topic为单位进行归纳.•将向Kafka topic发布消息的程序成为producers.•将预订topics并消费消息的程序成为consumer.•Kafka以集群的

【hadoop】hadoop3.2.0应用环境搭建指南

下面列出我搭建hadoop应用环境的文章整理在一起,不定期更新,供大家参考,互相学习!!! 第一篇 HADOOP部分 1.1 hadoop3.2.0的安装并测试 1.2 编译Hadoop连接eclipse的插件遇见的一系列错误,崩溃的操作 1.3 在eclipse上运行WordCount的操作过程 第二篇 HIVE与HBASE部分 2.1centos7下mysql的安装以及基本操作 2.2centos7下apache-hive-3.1.2-bin的安装测试 2.3apache-zookeeper

kafka的流量监控脚本

kafka指定topic每分钟接收到的数据总量监控 需求:获取kafka每分钟接收到的数据总量,并以 时间戳-topicName-flow的格式存入Mysql 设计思路: 1.得到kafka当前时间点的sum(logsize),并存入指定的file文件. 2.一分钟后再次执行脚本,得到即时的sum(logsize),同时读取file文件里的数字x,sum(logsize)-x即为一分钟内kafka指定topic的flow(流量). 3.将获取到的flow存入mysql,同时将file文件中的数字

zabbix配合脚本监控Kafka

简介: Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.Kafka如下特性,受到诸多公司的青睐. 1.高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息(核心目标之一). 2.支持通过Kafka服务器和消费机集群来分区消息 ………… 场景: Kafka的作用我就不在这BB了,大家可以瞅瞅http://blog.jobbole.com/75328/,总结的非常好. Kafka监控的几个指标 1.lag:多少消息没有消费 2.logsi