系统:centos 7.4
要求:jdk :1.8.x
kafka_2.11-1.1.0
1、绑定/etc/hosts
10.10.10.xxx online-ops-xxx-01
10.10.10.xxx online-ops-xxx-02
10.10.10.xxx online-ops-xxx-03
2、下载软件包
kafka_2.11-1.1.0
3、配置文件kafka
mkdir /data/kafka
#vim /usr/local/kafka/config/server.properties
broker.id=1 #每台机器不一样
listeners=PLAINTEXT://10.30.30.253:9092 #本机ip
host.name=online-ops-elk-01 #本机主机名
num.network.threads=18
num.io.threads=24
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka #本机路径,需要创建
num.partitions=3
num.recovery.threads.per.data.dir=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=online-ops-xxx-01:2181,online-ops-xxx-02:2181,online-ops-xxx-03:2181
zookeeper.connection.timeout.ms=6000
default.replication.factor = 2
delete.topic.enable=true
unclean.leader.election.enable=false
min.insync.replicas=2
4、配置文件zookeeper
mkdir -p /data/zookeeper /data/logs/zookeeper
#vim /usr/local/kafka/config/zookeeper.properties
maxClientCnxns=100
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/data/zookeeper #本机路径,需要创建
# the port at which the clients will connect
clientPort=2181
# the directory where the transaction logs are stored.
dataLogDir=/data/logs/zookeeper #本机路径,需要创建
autopurge.purgeInterval=6
autopurge.snapRetainCount=20
server.1=online-ops-xxx-01:2888:3888 #服务器主机名
server.2=online-ops-xxx-02:2888:3888 #服务器主机名
server.3=online-ops-xxx-03:2888:3888 #服务器主机名
5、写入id
echo 1 >/data/zookeeper/myid #和配置文件id对应 ,每台机器 都 不一样,集群内唯一id
三台都要配置:
6、启动
kafka:
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
zookeeper:
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
7、supervisord管理启动:
安装见其它的文档:ansible-安装supervisor
#cat /etc/supervisord.d/kafka.ini
[supervisord]
minfds=65536
minprocs=32768
[program:kafka]
command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
user=root
process_name=kafka
numprocs=1
stopsignal=INT
redirect_stderr=true
stdout_logfile=/data/logs/kafka/kafka.log
stdout_logfile_maxbytes=300MB
stdout_logfile_backups=10
stopasgroup=true
killasgroup=true
# cat /etc/supervisord.d/zookeeper.ini
[supervisord]
minfds=65536
minprocs=32768
[program:zookeeper]
command=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
user=root
process_name=zookeeper
numprocs=1
stopsignal=INT
redirect_stderr=true
stdout_logfile=/data/logs/zookeeper/zookeeper.log
stdout_logfile_maxbytes=300MB
stdout_logfile_backups=10
stopasgroup=true
killasgroup=true
重启supervisord
supervisorctl update zookeeper
supervisorctl restart zookeeper
维护:
新建topic:
bin/kafka-topics.sh --create --zookeeper online-ops-xxx-01:2181,online-ops-xxx-02:2181,online-ops-xxx-03:2181 --replication-factor 2 --partitions 6 --topic test
删除topic
/usr/local/kafka/bin/kafka-topics.sh --zookeeper online-ops-xxx-01:2181,online-ops-xxx-02:2181,online-ops-xxx-03:2181 --delete --topic "online-algo-kafka"
列出哪些topic:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
测试kafka集群
(此处使用的zookeeper ip不同上所示,改成自己创建的即可)
1-进入kafka根目录,创建topic--test
bin/kafka-topics.sh --create --zookeeper online-ops-xxx-01:2181,online-ops-xxx-02:2181,online-ops-xxx-03:2181 --replication-factor 1 --partitions 1 --topic test
2-列出已创建的topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
3-模拟客户端去发送消息
bin/kafka-console-producer.sh --broker-list online-ops-xxx-01:9092,online-ops-xxx-02:9092,online-ops-xxx-03:9092 --topic test
4-模拟客户端去接受消息
bin/kafka-console-consumer.sh --zookeeper online-ops-xxx-01:2181,online-ops-xxx-02:2181,online-ops-xxx-03:2181 --from-beginning --topic test
原文地址:https://www.cnblogs.com/Qing-840/p/9595673.html