Kafka 操作

基本操作

本节将列一下在Kafka集群上的最常用的操作。所有在本节被提到的工作都可以$KAFKA_HOME/bin目录下找到;并且每个工具都会较详细的打印命令的所有可能的选项。

添加和移除Topic

你可以手动创建一个Topic也可以在第一次向某个Topic发送消息的时候由Kafka自动创建。如果Topic是被自动创建的话,你可以需要手动对一些配置项进行调优。

添加和修改Topic工具:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
       --partitions 20 --replication-factor 3 --config x=y

replication factor控制有多少个server将会复制各条被写入Topic的消息。如果该值为3,那么可以有2台server停止工作的情况下,消费端以访问到消息。我们建议你设置该值为2或者3,这样就可以在重启服务时而不影响消费端消费数据。

partition count控制该Topic会有多少个分片。首先每个parition应该在单独的一台server上。所以如果你有20个partions,那么这个Topic的整个数据集针会被不多于20个server去处理(不考虑数据备份)。最终partition count会影响消费端的最大并行数。将会在后台详细讨论这个参数。

跟在命令行后面的参数将会覆盖掉Kafka配置的默认参数。

修改Topic

你可以通过同样的命令修改Topic的配置的partition。

添加partition命令如下:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
       --partitions 40

请注意当按一定的语义来区分partion的情况,添加partion并不会修改当前partition中已经存在的值,这可能使依赖某个partion进行消费消息的消费端。也就是说如果数据被通过 hash(key)%number_of_partitions 进行分区的话,那么在添加partion后,数据将会被按key进行重新放入不同的partion,不过Kafka将不会尝试去自动重新分布已经存在的数据。

添加配置项:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

删除配置项:
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x

删除Topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

默认情况下是不能删除Topic,如果需要删除Topic的话,请在配置文件里设置如下属性

delete.topic.enable=true

Kafka当前不支持减少partition和修改replication factor的操作

时间: 2024-10-19 09:47:03

Kafka 操作的相关文章

kafka操作

kafka 操作: source /etc/profile 查看消费: /usr/local/platform/kafka_2.11-1.1.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic all.log_impbus_impressions_deferred_pricing.0.proto --from-beginning 添加生产: /usr/local/platform/kafka_2.11-1.1.0/

kafka操作清单

1. 查看topic个数 ./kafka-topics.sh --list --zookeeper dwtest-data2:2181,dwtest-data3:2181,dwtest-data4:2181 2. 查看topic的生产者与消费者消息 ./kafka-topics.sh --describe --zookeeper dwtest-data2:2181,dwtest-data3:2181,dwtest-data4:2181 --topic test leader:负责处理消息的读和写

CDH搭建和集成spark、kafka操作

包下载: 由于是离线部署,因此需要预先下载好需要的文件. 需要准备的文件有: Cloudera Manager 5 文件名: cloudera-manager-centos7-cm5.14.0_x86_64.tar.gz 下载地址: https://archive.cloudera.com/cm5/cm/5/ CDH安装包(Parecls包) 版本号必须与Cloudera Manager相对应 下载地址: https://archive.cloudera.com/cdh5/parcels/5.1

Docker部署Kafka以及Spring Kafka操作

从https://hub.docker.com/ 查找kafka 第三个活跃并stars数量多 进去看看使用 我们使用docker-compose来构建镜像 查看使用文档中的docker-compose.yml 因为kafka要搭配zookeeper一起使用,所以文档中包含了zookeeper 我修改了一下版本号 以及变量参数 这两个参数好像是可以暴露给外网访问的(从其他博客找到的答案,不设置spring kafka 会连接不上) 运行命令docker-compose up -d  ,就会开启2

CDH集群集成kafka

搭建要求: 1.CDH环境已经搭建成功,在CDH上搭建kafka,要求用CDH上zookeeper管理kafka而不用kafka自带的zookeeper 2.kafka_2.11-0.8.2.1.tgz已经上传到kafka集群环境中 搭建步骤 1. 主机操作 修改hosts 10.10.0.11 s1-1 10.10.0.12 s1-2 10.10.0.13 s1-3 10.10.0.14 s1-4 10.10.0.15 s2-1 10.10.0.16 s2-2 10.10.0.17 s2-3

Zookeeper 集群+kafka集群+kafka manager搭建

软件需求,软件包都上传到 /usr/local/src目录: jdk-8u101-linux-x64.tar.gz kafka.2.11-0.8.22.tar.gz zookeeper-3.4.9.tar.gz kafka-manager-1.3.0.7.zip * kafka-manager是通过scala打包获取一个编译完的项目,需要提前编译好,参考 https://github.com/yahoo/kafka-manager 硬件需求,四个主机: 192.168.100.100 : kaf

kafka维护问题总结

1 Kafka操作日志的删除方法 Kafka0.8版本长时间运行过程中,在kafka_2.8.0-0.8.0/logs目录下产生了大量的kafka-request.log.*和server.log.*文件,其中*代表日期和时间,比如kafka-request.log.2014-12-08-03和server.log.2014-12-06-03,这些文件对磁盘空间的消耗非常大,需要定期备份或者清理.目前没有发现kafka自身提供了这些操作日志备份或者清理的方法,需要用户自己实现. 备份操作日志的方

Kafka笔记整理(一)

[TOC] Kafka笔记整理(一) Kafka简介 消息队列(Message Queue) 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据.例如说:文本.音乐.视频等内容. 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素.入队.出队. 消息队列 MQ 消息+队列,保存消息的队列.消息的传输过程中的容器:主要提供生产.消费接口供外部调用做数据的存储和获取. MQ分类 MQ主要分为两类:点对点(p2p).发布订阅(P

Kafka(华为FusionInsight )操作命令

华为大数据kafka操作web界面创建角色.用户.用户管理角色进入服务器环境,进入客户端目录/opt/hadoopclient,导入环境变量source bigdata_env.切换用户kinit kafka用户(kafka_test) 查看当前集群Topic列表. bin/kafka-topics.sh --list --zookeeper <ZooKeeper集群IP:24002/kafka> 查看单个Topic详细信息. bin/kafka-topics.sh --describe --