一、环境操作系统和软件版本介绍
1.环境操作系统为CentOS Linux release 7.2.1511 (Core)
可用cat /etc/redhat-release查询
2.软件版本
Kafka版本为:0.10.0.0
二、软件的基础准备
由于Kafka集群需要依赖ZooKeeper集群来协同管理,所以需要事先搭建好ZK集群。此文主要介绍搭建Kafka集群环境。
三、详细安装搭建步骤
1.下载压缩包kafka_2.10-0.10.0.0.tgz到/data/soft目录
2.将kafka_2.10-0.10.0.0.tgz解压到/data/app/Kafka目录
tar –xzf kafka_2.10-0.10.0.0.tgz –C /data/app/kafkacluster
把文件夹重命名为19092,进入config目录,修改server.properties文件
3.用vi命令打开server.properties
1 [[email protected] config]# vi server.properties
4.修改如下:
1 broker.id=0 2 port=19092 3 log.dirs=/data/app/kafkacluster/19093/bin/kafka-logs19092 4 zookeeper.connect=192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003
其他两台服务器上的kafka同上,先修改文件夹名称(在此文另外两个文件夹名称为19093和19094)
再进入config目录,分别改server.properties名称为server1.properties和server2.properties
server1.properties中的配置需要改:
1 broker.id=1 2 port=19093 3 log.dirs=/data/app/kafkacluster/19093/bin/kafka-logs19093 4 zookeeper.connect=192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003
server2.properties中的配置需要改:
1 broker.id=2 2 port=19094 3 log.dirs=/data/app/kafkacluster/19094/bin/kafka-logs19094 4 zookeeper.connect=192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003
四、启动kafka&测试验证
1.首先启动独立的ZK集群,三台都要启动(./zkServer.sh start)
2.进入到kafka的bin目录,然后启动服务./kafka-server-start.sh ../config/server.properties (三台服务器都要启动)
1 ./kafka-server-start.sh ../config/server1.properties 2 ./kafka-server-start.sh ../config/server2.properties
另外,启动其他节点的时候,在最先开始启动的节点会显示其它节点加入的信息记录,如下所示:
1 [2017-01-18 14:44:24,352] INFO Partition [aaa,0] on broker 0: Expanding ISR for partition [aaa,0] from 0 to 0,1 (kafka.cluster.Partition) 2 [2017-01-18 14:44:37,065] INFO Partition [aaa,0] on broker 0: Expanding ISR for partition [aaa,0] from 0,1 to 0,1,2 (kafka.cluster.Partition)
3.验证启动进程
1 [[email protected] bin]# jps 2 25778 Kafka 3 26132 Jps 4 25285 Kafka 5 25014 QuorumPeerMain 6 25064 QuorumPeerMain 7 25531 Kafka 8 25116 QuorumPeerMain
4.使用客户端进入zk
1 [[email protected] bin]# ./zkCli.sh -server 192.168.1.18:3001 2 Connecting to 192.168.1.18:3001
5.查看目录情况
1 [zk: 192.168.1.18:3001(CONNECTED) 0] ls / 2 [controller_epoch, controller, brokers, zookeeper, test, admin, isr_change_notification, consumers, config] 3 [zk: 192.168.1.18:3001(CONNECTED) 1]
上面的显示结果中:只有zookeeper是zookeeper原生的,其他都是Kafka创建的
6. 创建一个topic:
1 [[email protected] bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003 --replication-factor 3 --partitions 1 --topic test666 2 Created topic "test666".
7. 查看topic状态:
1 [[email protected] bin]# ./kafka-topics.sh --describe --zookeeper 192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003 --topic test666 2 Topic:test666 PartitionCount:1 ReplicationFactor:3 Configs: 3 Topic: test666 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
输出参数解释:
第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
Leader:负责处理消息的读和写,Leader是从所有节点中随机选择的。
Replicas:列出了所有的副本节点,不管节点是否在服务中
Isr:是正在服务中的节点
由上可见,此时的leader是0
下文会kill 0,看leader是否更改
8.往test666中发送消息:
1 [[email protected] bin]# ./kafka-console-producer.sh --broker-list localhost:19092,localhost:19093,localhost:19094 --topic test666 2 hello kafka! 3 hello littleMonster! 4 hello world!
9.接收消息:
1 [[email protected] bin]# ./kafka-console-consumer.sh --zookeeper 192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003 --topic test666 --from-beginning 2 hello kafka! 3 hello littleMonster! 4 hello world!
消息接收成功。
10.找到为0的leader的进程,并杀死
1 [[email protected] /]# ps -ef | grep ka
1 [[email protected] /]# kill -9 25285
11.再次查看topic状态:
1 [[email protected] bin]# ./kafka-topics.sh --describe --zookeeper 192.168.1.18:3001,192.168.1.18:3002,192.168.1.18:3003 --topic test666 2 Topic:test666 PartitionCount:1 ReplicationFactor:3 Configs: 3 Topic: test666 Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 2,1
由此可见,在Isr(正在服务中的节点)项,0已消失,新选举出的leader是2。
12.再次发送消息
接收消息
消息正常接收,该测试通过。