Kafka安装部署文档 |
|||
■ 文档版本 |
V1.0 |
■ 操作系统 |
CentOS Linux release 7.3.1611 |
■ 编写人员 |
闫立雄 |
■ 文档日期 |
2019-01-06 |
一. 概述
该文档详细描述了在Linux环境下安装Kafka和ZooKeeper的全过程,文档中以kafka_2.11-1.1.0.tgz和zookeeper-3.4.12.tar.gz为例。
二. 安装ZooKeeper
2.1 下载ZooKeeper
方式1 http://mirrors.shu.edu.cn/apache/zookeeper/网站上下载最新版ZooKeeper
方式2 去我的百度网盘去下载
zookeeper-3.4.12.tar.gz
链接:https://pan.baidu.com/s/1SBK-fN8-yP2qYnzQzfBreA
提取码:2dux
然后把zookeeper-3.4.12.tar.gz文件上传到服务器/usr/local/目录下(集群的话各上传到服务器目录下)。
2.2 安装ZooKeeper
# tar –zxf zookeeper-3.4.11.tar.gz
# mv zookeeper-3.4.12 /usr/local/zookeeper
# mkdir –p /var/lib/zookeeper
# mv /usr/local/zookeeper/conf/zoo_sample /usr/local/zookeeper/conf/zoo.cfg
# vi /usr/local/zookeeper/conf/zoo.cfg(server.1表示集群的服务信息,有几台集群就配置几个server。建议配置奇数的集群,如:3、5、7个)。
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=192.168.1.10:2888:3888
server.2=192.168.1.11:2888:3888
server.3=192.168.1.12:2888:3888
# export JAVA_HOME=/usr/java/jdk1.8.0_121
# vi /var/lib/zookeeper/myid(配置所在服务器集群的id,如:在192.168.1.10服务器就配置为1)
1
# vi /usr/local/kafka/config/zookeeper.properties(注释掉Kafka默认ZooKeeper相关参数dataDir、clientPort、maxClientCnxns)
2.3 使用Kafka默认安装ZooKeeper
# mkdir –p /var/lib/zookeeper
# vi /usr/local/kafka/config/zookeeper.properties(server.1表示集群的服务信息,有几台集群就配置几个server。建议配置奇数的集群,如:3、5、7个)
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=192.168.1.10:2888:3888
server.2=192.168.1.11:2888:3888
server.3=192.168.1.12:2888:3888
# vi /var/lib/zookeeper/myid(配置所在服务器集群的id,如:在192.168.1.10服务器就配置为1)
1
三. 安装Kafka
3.1 下载Kafka
方式1 去apache网站下载(http://kafka.apache.org/downloads)最新版的Kafka,JDK推荐使用JDK1.8版本。
方式2 去我的百度网盘去下载
kafka_2.11-1.1.0.tgz
链接:https://pan.baidu.com/s/15aEYWPPaBPr1fTJ-Z6qJbg
提取码:3x98
然后把kafka_2.11-1.1.0.tgz文件上传到服务器/usr/local/目录下(集群的话各上传到服务器目录下)。
3.2 安装Kafka
# tar -zxf kafka_2.11-1.1.0.tgz
# mv kafka_2.11-1.1.0 /usr/local/kafka
# mkdir /home/kafka-logs
# export JAVA_HOME=/usr/java/jdk1.8.0_121
# vi /usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.1.10:9092
advertised.listeners=PLAINTEXT:// 192.168.1.10:9092
log.dirs=/home/kafka-logs
zookeeper.connect=192.168.1.10:2181,192.168.1.11:2181,192.168.1.12:2181
zookeeper.connection.timeout.ms=6000
num.recovery.threads.per.data.dir=10
delete.topic.enable=true
注意:如果服务器分内外网IP,advertised.listeners要配置外网IP,也就是程序连接的IP地址。
# vi /usr/local/kafka/bin/kafka-server-start.sh (根据机器内存来设置内存大小,可以设置总内存大小的50%)
Export KAFKA_HEAP_OPTS=”-Xmx8G –Xms8G”
# hostname(查看本机host)
# vi /etc/hosts (把本机host配置到hosts文件中,其它集群机器hosts也配置)
添加一行。
192.168.1.10 hostname
四. Zookeeper、kafka常用命令
1.启动zookeeper
sh /usr/local/zookeeper/bin/zkServer.sh start 单独的zookeeper启动
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties kafka自带的zookeeper启动
2.启动Kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
3.创建一个主题
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.10:2181 --replication-factor 1 --partitions 2 --topic test (replication-factor 副本因子 partitions分区)
4.发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.10:9092 --topic test
5.消费消息
/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 192.168.1.10:2181 --topic test --from-beginning (from-beginning 从最开始消费)
6.查看已创建人topic列表
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.10:2181
7.查看topic属性
/usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.1.10:2181 --topic test
8.删除topic
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.10:2181 --delete --topic test
(1)登录zookeeper客户端:命令:./zookeeper-shell.sh 192.168.1.10:2181
(2)找到topic所在的目录:ls /brokers/topics
(3)找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。
9.查看分组消费情况
/usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.1.10:9092 --describe --group test-consumer-group
10.查看所有分组列表
/usr/local/kafka/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 192.168.1.10:9092 --list
五. 常见问题
1、用命令发送消息时报Connection to node -1 could not be established. Broker may not be available.
解决:如果请求的地址是localhost的话,改成实际的IP地址。
2、JAVA代码消费不了Kafka的消息,在ConsumerRecords<String, String> records = consumer.poll(100);方法中一直循环。
解决:是因为部署Kafka的机器之前有安装过Kafka,产生了脏数据。
1)进入zookeeper 运行zkCli.sh 。(Kafka默认ZooKeeper运行./zookeeper-shell.sh 192.168.1.10:2181)
2)运行ls /brokers/topics 查看主题
3)然后运行 rmr /brokers/topics/__consumer_offsets 删除__consumer_offsets_主题
4)然后重启kafka集群。
3、发送消息时不断的提示 Error while
fetching metadata with correlation id 1。
解决:每台服务器的/etc/hosts需要配置127.0.0.1 本机hosts,注意不要写访问的IP地址,要写127.0.0.1
六. java调用工具类
本人在springboot微服务中用到的kafka工具类。
ylx_java_utility class.zip
链接:https://pan.baidu.com/s/1gwmnSL-N2KPAtzh_kzM1og
提取码:pg9o
以上是本人自己总结,并且在项目总实际运用操作的。新手一枚,不喜勿喷!
原文地址:https://www.cnblogs.com/yanlixiong/p/10229890.html