kafka系列二:多节点分布式集群搭建

上一篇分享了单节点伪分布式集群搭建方法,本篇来分享一下多节点分布式集群搭建方法。多节点分布式集群结构如下图所示:

  为了方便查阅,本篇将和上一篇一样从零开始一步一步进行集群搭建。

一、安装Jdk

  具体安装步骤可参考 linux安装jdk

二、安装与配置zookeeper

  下载地址:https://www-us.apache.org/dist/zookeeper/stable/

  下载二进制压缩包 zookeeper-3.4.14.tar.gz,然后上传到linux服务器指定目录下,本次上传目录为 /software ,然后执行如下命令安装:

cd /softwaretar -zxvf zookeeper-3.4.14.tar.gzmv zookeeper-3.4.14 /usr/local/zookeepercd /usr/local/zookeeper/confmv zoo_sample.cfg zoo1.cfg

  编辑 zoo1.cfg ,配置相关参数如下:

tickTime=2000
initLimit=5
syncLimit=2
dataDir=/usr/local/zookeeper/data/zookeeper1
clientPort=2181
server.1=192.168.184.128:2888:3888
server.2=192.168.184.128:2889:3889
server.3=192.168.184.128:2890:3890

  其中:

   tickTime :Zookeeper最小的时间单位,用于丈量心跳和超时时间,一般设置默认值2秒;

   initLimit :指定follower节点初始时连接leader节点的最大tick此处,设置为5,表示follower必须在  5xtickTime  即10秒内连接上leader,否则视为超时;

  syncLimit :设定follower节点与leader节点进行同步的最大时间,设置为2,表示最大时间为  2xtickTime 即4秒时间;

   dataDir :Zookeeper会在内存中保存系统快照,并定期写入该路径指定的文件夹中,生产环境需要特别注意该文件夹的磁盘占用情况; 

   clientPort :Zookeeper监听客户端连接的端口号,默认为2181,同一服务器上不同实例之间应该有所区别;

   server.X=host:port1:port2  :此处X的取值范围在1~255之间,必须是全局唯一的且和myid文件中的数字对应(myid文件后面说明),host是各个节点的主机名,port1通常是2888,用于使follower节点连接leader节点,port2通常是3888,用于leader选举,zookeeper在不同服务器上的时候,不同zookeeper服务器的端口号可以重复,在同一台服务器上的时候需要有所区别。

1.配置zoo.cfg文件

  单节点安装zookeeper的时候,仅有一份zoo.cfg文件,多节点安装的时候,每个zookeeper服务器就应该有一个zoo.cfg配置文件。如果在一台服务器安装zookeeper多实例集群,则需要在conf目录下分别配置每个实例的zoo.cfg,同时创建每个zookeeper实例自己的数据存储目录。本次在一台服务器上配置多个zookeeper实例,执行如下命令创建数据存储目录并复制配置文件:

mkdir -p /usr/local/zookeeper/data/zookeeper1
mkdir -p /usr/local/zookeeper/data/zookeeper2
mkdir -p /usr/local/zookeeper/data/zookeeper3
cd /usr/local/zookeeper/conf/
cp zoo1.cfg zoo2.cfg
cp zoo1.cfg zoo3.cfg

  复制后分别修改 zoo2.cfg , zoo3.cfg 中的配置,修改后的配置如下:

  zoo1.cfg的配置如下:

  

  zoo2.cfg的配置如下:

  

  zoo3.cfg中的配置如下:

  

2.myid文件创建与配置

  前面提到zoo.cfg文件中的server.X中的X应该与myid中的数字相对应。除此之外,myid文件必须存放在每个zookeeper实例的data目录下,对应本次安装应该位于 /usr/local/zookeeper/data/zookeeper1,2,3 目录下,执行如下命令进行配置:

echo ‘1‘ > /usr/local/zookeeper/data/zookeeper1/myidecho ‘2‘ > /usr/local/zookeeper/data/zookeeper2/myidecho ‘3‘ > /usr/local/zookeeper/data/zookeeper3/myid

3.启动zookeeper服务器

  使用如下命令启动zookeeper集群:

cd /usr/local/zookeeper/bin/./zkServer.sh start ../conf/zoo1.cfg./zkServer.sh start ../conf/zoo2.cfg./zkServer.sh start ../conf/zoo3.cfg

  启动后,使用如下命令查看集群状态:

cd /usr/local/zookeeper/bin/./zkServer.sh status ../conf/zoo1.cfg
./zkServer.sh status ../conf/zoo2.cfg
./zkServer.sh status ../conf/zoo3.cfg

  回显信息如下:

  

  可以看到有两个follower节点,一个leader节点。

 三、安装与配置kafka集群

  下载地址:http://kafka.apache.org/downloads.html

1.数据目录和配置文件创建

  目前最新版本是2.2.0,本次下载2.1.1版本的安装包,然后上传压缩包到服务器指定目录,本次上传目录为  /software ,然后执行以下命令进行安装:

tar -zxvf kafka_2.12-2.1.1.tgzmv kafka_2.12-2.1.1 /usr/local/kafkamkdir -p /usr/local/kafka/logs/kafka1mkdir -p /usr/local/kafka/logs/kafka2mkdir -p /usr/local/kafka/logs/kafka3cd /usr/local/kafka/config/mv server.properties server1.propertiescp server1.properties server2.propertiescp server1.properties server3.properties

  通过执行上面的命令,我们在 /usr/local/kafka/logs 文件夹中创建了 kafka1,kafka2,kafka3 三个文件夹用于存放三个kafka实例的数据,同时在 /usr/local/kafka/config/ 文件夹下复制了三份 server.properties 文件分别用于配置各个kafka实例。

2.配置属性文件

  接下来分别配置三个 server.properties 文件,主要配置参数如下:

   broker.id=1 :设置kafka broker的id,本次分别为1,2,3;

   delete.topic.enable=true :开启删除topic的开关;

   listeners=PLAINTEXT://192.168.184.128:9092 :设置kafka的监听地址和端口号,本次分别设置为9092,9093,9094;

   log.dirs=/usr/local/kafka/logs/kafka1 :设置kafka日志数据存储路径;

   zookeeper.connect=192.168.184.128:2181,192.168.184.128:2182,192.168.184.128:2183 :设置kafka连接的zookeeper访问地址,集群环境需要配置所有zookeeper的访问地址;

   unclean.leader.election.enable=false :为true则代表允许选用非isr列表的副本作为leader,那么此时就意味着数据可能丢失,为false的话,则表示不允许,直接抛出NoReplicaOnlineException异常,造成leader副本选举失败。

   zookeeper.connection.timeout.ms=6000 :设置连接zookeeper服务器超时时间为6秒。

  配置完成后,各个配置文件中配置如下:

  server1.properties配置:

  

  server2.properties配置:

  

  server3.properties配置:

  

3.启动kafka

  通过如下命令启动kafka集群:

cd /usr/local/kafka/bin/./kafka-server-start.sh -daemon ../config/server1.properties./kafka-server-start.sh -daemon ../config/server2.properties./kafka-server-start.sh -daemon ../config/server3.properties

  使用 java的命令jps来查看kafka进程:jps |grep -i kafka

  

  说明kafak启动正常,至此kafka集群搭建完成。本次使用一台服务器作为演示,如果需要在多个服务器上配置集群,配置方法和以上类似,只是不需要像上面那样配置多个数据目录和配置文件,每台服务器的配置保持相同,并且注意在防火墙配置端口号即可。

四、测试

1.topic创建与删除

  首先创建一个测试topic,名为testTopic,为了充分利用3个实例(服务器节点),创建3个分区,每个分区都分配3个副本,命令如下:

cd /usr/local/kafka/bin/./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --create --topic testTopic --partitions 3 --replication-factor 3

  回显 Created topic "testTopic". 则表明testTopic创建成功。执行如下命令进行验证并查看testTopic的信息:

./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --list testTopic./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --describe --topic testTopic

  以上几条命令回显信息如下:

  接下来测试topic删除,使用如下命令进行删除:

./kafka-topics.sh --zookeeper 192.168.184.128:2181 192.168.184.128:2182 192.168.184.128:2183 --delete --topic testTopic

  执行该条命令后,回显信息如下:

  可以看到,testTopic已经被标记为删除,同时第二行提示表明当配置了 delete.topic.enable 属性为 true 的时候topic才会删除,否则将不会被删除,本次安装的时候该属性设置的值为 true 。

2.测试消息发送与消费

  首先使用第一步topic创建命令,先创建testTopic这个topic,然后进行消息发送与消费测试。

  控制台测试消息发送与消费需要使用kafka的安装目录 /usr/local/kafka/bin 下的 kafka-console-producer.sh 来发送消息,使用 kafka-console-consumer.sh 来消费消息。因此本次打开两个控制台,一个用于执行 kafka-console-producer.sh来发送消息,另一个用于执行 kafka-console-consumer.sh 来消费消息。

  消息发送端命令:

cd /usr/local/kafka/bin./kafka-console-producer.sh --broker-list 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic

  消息接收端命令:

cd /usr/local/kafka/bin./kafka-console-consumer.sh --bootstrap-server 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --topic testTopic --from-beginning

  当发送端和接收端都登录后,在发送端输入需要发送的消息并回车,在接收端可以看到刚才发送的消息:

  发送端:

  

  接收端:

  

  以上就是简单地生产消息与消费消息的测试,在测试消费消息的时候时候,命令里边加了个参数 --from-beginning 表示接收该topic从创建开始的所有消息。

3.生产者吞吐量测试

  对于任何一个消息引擎而言,吞吐量是一个至关重要的性能指标。对于Kafka而言,它的吞吐量指每秒能够处理的消息数或者字节数。kafka为了提高吞吐量,采用追加写入方式将消息写入操作系统的页缓存,读取的时候从页缓存读取,因此它不直接参与物理I/O操作,同时使用以sendfile为代表的零拷贝技术进行数据传输提高效率。

  kafka提供了 kafka-producer-perf-test.sh 脚本用于测试生产者吞吐量,使用如下命令启动测试:

cd /usr/local/kafka/bin./kafka-producer-perf-test.sh --topic testTopic --num-records 50000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 acks=-1

  以上回显信息表明这台服务器上每个producer每秒能发送6518个消息,平均吞吐量是1.24MB/s,平均延迟2.035秒,最大延迟3.205秒,平均有50%的消息发送需要2.257秒,95%的消息发送需要3.076秒,99%的消息发送需要3.171秒,99.9%的消息发送需要3.205秒。

4.消费者吞吐量测试

  与生产者吞吐量测试类似,kafka提供了 kafka-consumer-perf-test.sh 脚本用于消费者吞吐量测试,可以执行以下命令进行测试:

cd /usr/local/kafka/bin./kafka-consumer-perf-test.sh --broker-list 192.168.184.128:9092,192.168.184.128:9093,192.168.184.128:9094 --messages 50000 --topic testTopic

  以上是测试50万条消息的consumer吞吐量,结果表明该consumer在1秒总共消费了9.5366MB消息。

  以上就是kafka集群的搭建以及测试,如有错误之处,烦请指正。

参考资料:《Apache kafka实战》

 

  

原文地址:https://www.cnblogs.com/fengweiweicoder/p/10841637.html

时间: 2024-10-06 11:21:05

kafka系列二:多节点分布式集群搭建的相关文章

基于Hadoop的数据分析综合管理平台之Hadoop、HBase完全分布式集群搭建

能够将热爱的技术应用于实际生活生产中,是做技术人员向往和乐之不疲的事. 现将前期手里面的一个项目做一个大致的总结,与大家一起分享.交流.进步.项目现在正在线上运行,项目名--基于Hadoop的数据分析综合管理平台. 项目流程整体比较清晰,爬取数据(txt文本)-->数据清洗-->文本模型训练-->文本分类-->热点话题发现-->报表"实时"展示,使用到的技术也是当今互联网公司常用的技术:Hadoop.Mahout.HBase.Spring Data Had

分布式实时日志系统(四) 环境搭建之centos 6.4下hbase 1.0.1 分布式集群搭建

一.hbase简介 HBase是一个开源的非关系型分布式数据库(NoSQL),它参考了谷歌的BigTable建模,实现的编程语言为 Java.它是Apache软件基金会的Hadoop项目的一部分,运行于HDFS文件系统之上,为 Hadoop 提供类似于BigTable 规模的服务.因此,它可以容错地存储海量稀疏的数据.HBase在列上实现了BigTable论文提到的压缩算法.内存操作和布隆过滤器.HBase的表能够作为MapReduce任务的输入和输出,可以通过Java API来存取数据,也可以

Hadoop全分布式集群搭建(详细)

一.准备物理集群.1.物理集群搭建方式.采用搭建3台虚拟机的方式来部署3个节点的物理集群.2.虚拟机准备.准备一个已近建好的虚拟机进行克隆.(建议为没进行过任何操作的)在要选择克隆的虚拟机上右击鼠标,管理,克隆.在弹出对话框中进行以下操作.(1).下一步.(2).选择虚拟机中的当前状态,下一步. (3).选择创建完整克隆,下一步.(4).输入虚拟机名称,下一步.(5).克隆完成.(6).按照上述步骤再创建一个虚拟机名称为slave02的.3.虚拟机网络配置.由于slave01和slave02虚拟

阿里云ECS服务器部署HADOOP集群(三):ZooKeeper 完全分布式集群搭建

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建,多添加了一个 datanode 节点 . 1 节点环境介绍: 1.1 环境介绍: 服务器:三台阿里云ECS服务器:master, slave1, slave2 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz ZooKeeper: zookeeper-3.4.14.tar.gz 1.2 各节点角色

Hadoop 分布式集群搭建 & 配置

一. 安装Java Java下载 官网下载合适的jdk,本人使用的是jdk-7u79-linux-x64.tar.gz,接下来就以该版本的jdk为例,进行Java环境变量配置 创建Java目录 在/usr/local目录下创建java目录,用于存放解压的jdk cd /usr/local mkdir java 解压jdk 进入java目录 cd java tar zxvf jdk-7u79-linux-x64.tar.gz 配置环境变量 编辑profile文件 cd /etc vim profi

Hadoop伪分布式集群搭建总结

Hadoop伪分布式集群搭建总结 一.所需软件VMware15!CentOS6.5JDK1.8Hadoop2.7.3二.安装注意:对文件进行编辑:输入a,表示对该文件进行编辑,最后保存该文件,操作为:点击键盘上的Esc按钮,然后输入英文的:字符,再输入wq,点击回车,完成文件的保存.1.关闭防火墙和禁用SELINUX(1).永久关闭防火墙,重启Linux系统(2) .禁用SELINUX:修改文件参数 重启Linux使其生效(3).检查防火墙是否运行,显示下图即为关闭2.配置hostname与IP

MinIO 分布式集群搭建

MinIO 分布式集群搭建 分布式 Minio 可以让你将多块硬盘(甚至在不同的机器上)组成一个对象存储服务.由于硬盘分布在不同的节点上,分布式 Minio 避免了单点故障. Minio 分布式模式可以搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置. (1)数据保护 分布式 Minio 采用纠删码(erasure code)来防范多个节点宕机和位衰减(bit rot). 分布式 Minio 至少需要 4 个节点,使用分布式 Minio 就自动引入了纠删码功能. 纠

Hadoop完全分布式集群搭建

Hadoop的运行模式 Hadoop一般有三种运行模式,分别是: 单机模式(Standalone Mode),默认情况下,Hadoop即处于该模式,使用本地文件系统,而不是分布式文件系统.,用于开发和调试. 伪分布式模式(Pseudo Distrubuted Mode),使用的是分布式文件系统,守护进程运行在本机机器,模拟一个小规模的集群,在一台主机模拟多主机,适合模拟集群学习. 完全分布式集群模式(Full Distributed Mode),Hadoop的守护进程运行在由多台主机搭建的集群上

Storm分布式集群搭建

Storm分布式集群搭建 1.解压Storm压缩文件 [[email protected] software]# tar -zxf apache-storm-0.10.0.tar.gz -C /opt/modules [[email protected] software]# cd /opt/modules [[email protected] modules]# mv apache-storm-0.10.0 storm-0.10.0 2.配置Storm的配置文件 部署依赖环境 Java 6+