Kakfa分布式集群搭建

本位以最新版本kafka_2.11-0.10.1.0版本讲述分布式kafka集群环境的搭建过程。服务器列表:

172.31.10.1
172.31.10.2
172.31.10.3

1.下载kafka安装包

登录kafka官网http://kafka.apache.org/,

  • 单击左侧“Download”按钮
  • 选择对应的版本,版本2.11代表scala版本(kafka是由scala编写的),0.10.1.0代表kafka的版本
  • 在弹出的窗口中选择下载链接即可

2.下载zookeeper安装包

kafka整体架构如下:

而kafka集群通常会依赖zookeeper的命名服务,单机版的可以直接用kafka安装包的zookeeper,而通常生产环境为保证命名服务的可用性,一般会单独搭建zookeeper集群。服务器不足可以直接和kafka broker共用服务器,zookeeper命名服务队资源要求不高。

登录zookeeper官网http://www.apache.org/dyn/closer.cgi/zookeeper/,一路选择download下载即可,本文选择稳定版zookeeper-3.4.8

3.安装zookeeper集群

将安装包zookeeper-3.4.8.tar上传至服务器172.31.10.1,

  • 解压,目录/opt/zookeeper/zookeeper-3.4.8

    tar -zxvf zookeeper-3.4.8.tar
    
  • 配置,切换到conf目录,并更改dataDir和server.x

    cd /opt/zookeeper/zookeeper-3.4.8/conf
    mv zoo_sample.cfg zoo.cfg
    

    更改后的zoo.cfg配置如下:

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/var/logs/data/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    server.1=172.31.10.1:2888:3888
    server.2=172.31.10.2:2888:3888
    server.3=172.31.10.3:2888:3888
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    

    其中dataDir为zookeeper目录,server.x为zookeeper服务器列表的地址和通信端口

  • 远程复制到其他两台服务器,并在dataDir目录下创建myid文件,内容为server.x中的数字。本文设置如下:
#172.31.10.1执行
cd /var/logs/data/zookeeper
echo "1" >  /var/logs/data/zookeeper/myid

#172.31.10.2执行
cd /var/logs/data/zookeeper
echo "2" >  /var/logs/data/zookeeper/myid

#172.31.10.3执行
cd /var/logs/data/zookeeper
echo "3" >  /var/logs/data/zookeeper/myid
  • 启动zookeeper集群和验证
#在每台服务器上启动zookeeper
cd /opt/zookeeper/zookeeper-3.4.8/bin
/opt/zookeeper/zookeeper-3.4.8/bin/zkServer.sh start

#查看服务器上zookeeper节点角色
cd /opt/zookeeper/zookeeper-3.4.8/bin
/opt/zookeeper/zookeeper-3.4.8/bin/zkServer.sh status

4.安装kafka集群

  • 解压,到/opt/kafka/kafka_2.11-0.10.1.0
tar -zxvf kafka_2.11-0.10.1.0.tgz
cd /opt/kafka/kafka_2.11-0.10.1.0
  • 更改conf/server.properties配置,主要是更改如下几项:
  • broker.id=1
    host.name=172.31.10.1
    log.dirs=/var/logs/data/kafka
    zookeeper.connect=172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka
    

  注意每台服务器上的broker.id均不同,需要保证整个集群中唯一性

  更改后的server.properties如下:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=172.31.10.1

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/var/logs/data/kafka

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don‘t drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
  • 同步到其他服务器,更改broker.id
  • kafka启动和验证

    cd /opt/kafka/kafka_2.11-0.10.1.0/bin
    nohup /opt/kafka/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh config/server.properties &
    

    创建topic,如能成功创建topic则表示集群安装完成,也可以用jps命令查看kafka进程是否存在。

    /opt/kafka/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper 172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka --replication-factor 3 --partitions 1 --topic test
    

    至此,kafka分布式集群安装完成,后续将深入讲解kafka其他内容。

时间: 2024-08-27 14:36:30

Kakfa分布式集群搭建的相关文章

基于Hadoop的数据分析综合管理平台之Hadoop、HBase完全分布式集群搭建

能够将热爱的技术应用于实际生活生产中,是做技术人员向往和乐之不疲的事. 现将前期手里面的一个项目做一个大致的总结,与大家一起分享.交流.进步.项目现在正在线上运行,项目名--基于Hadoop的数据分析综合管理平台. 项目流程整体比较清晰,爬取数据(txt文本)-->数据清洗-->文本模型训练-->文本分类-->热点话题发现-->报表"实时"展示,使用到的技术也是当今互联网公司常用的技术:Hadoop.Mahout.HBase.Spring Data Had

分布式实时日志系统(四) 环境搭建之centos 6.4下hbase 1.0.1 分布式集群搭建

一.hbase简介 HBase是一个开源的非关系型分布式数据库(NoSQL),它参考了谷歌的BigTable建模,实现的编程语言为 Java.它是Apache软件基金会的Hadoop项目的一部分,运行于HDFS文件系统之上,为 Hadoop 提供类似于BigTable 规模的服务.因此,它可以容错地存储海量稀疏的数据.HBase在列上实现了BigTable论文提到的压缩算法.内存操作和布隆过滤器.HBase的表能够作为MapReduce任务的输入和输出,可以通过Java API来存取数据,也可以

Storm分布式集群搭建

Storm分布式集群搭建 1.解压Storm压缩文件 [[email protected] software]# tar -zxf apache-storm-0.10.0.tar.gz -C /opt/modules [[email protected] software]# cd /opt/modules [[email protected] modules]# mv apache-storm-0.10.0 storm-0.10.0 2.配置Storm的配置文件 部署依赖环境 Java 6+

kafka系列二:多节点分布式集群搭建

上一篇分享了单节点伪分布式集群搭建方法,本篇来分享一下多节点分布式集群搭建方法.多节点分布式集群结构如下图所示: 为了方便查阅,本篇将和上一篇一样从零开始一步一步进行集群搭建. 一.安装Jdk 具体安装步骤可参考 linux安装jdk. 二.安装与配置zookeeper 下载地址:https://www-us.apache.org/dist/zookeeper/stable/ 下载二进制压缩包 zookeeper-3.4.14.tar.gz,然后上传到linux服务器指定目录下,本次上传目录为 

Hadoop 分布式集群搭建 & 配置

一. 安装Java Java下载 官网下载合适的jdk,本人使用的是jdk-7u79-linux-x64.tar.gz,接下来就以该版本的jdk为例,进行Java环境变量配置 创建Java目录 在/usr/local目录下创建java目录,用于存放解压的jdk cd /usr/local mkdir java 解压jdk 进入java目录 cd java tar zxvf jdk-7u79-linux-x64.tar.gz 配置环境变量 编辑profile文件 cd /etc vim profi

Hadoop伪分布式集群搭建总结

Hadoop伪分布式集群搭建总结 一.所需软件VMware15!CentOS6.5JDK1.8Hadoop2.7.3二.安装注意:对文件进行编辑:输入a,表示对该文件进行编辑,最后保存该文件,操作为:点击键盘上的Esc按钮,然后输入英文的:字符,再输入wq,点击回车,完成文件的保存.1.关闭防火墙和禁用SELINUX(1).永久关闭防火墙,重启Linux系统(2) .禁用SELINUX:修改文件参数 重启Linux使其生效(3).检查防火墙是否运行,显示下图即为关闭2.配置hostname与IP

Hadoop全分布式集群搭建(详细)

一.准备物理集群.1.物理集群搭建方式.采用搭建3台虚拟机的方式来部署3个节点的物理集群.2.虚拟机准备.准备一个已近建好的虚拟机进行克隆.(建议为没进行过任何操作的)在要选择克隆的虚拟机上右击鼠标,管理,克隆.在弹出对话框中进行以下操作.(1).下一步.(2).选择虚拟机中的当前状态,下一步. (3).选择创建完整克隆,下一步.(4).输入虚拟机名称,下一步.(5).克隆完成.(6).按照上述步骤再创建一个虚拟机名称为slave02的.3.虚拟机网络配置.由于slave01和slave02虚拟

MinIO 分布式集群搭建

MinIO 分布式集群搭建 分布式 Minio 可以让你将多块硬盘(甚至在不同的机器上)组成一个对象存储服务.由于硬盘分布在不同的节点上,分布式 Minio 避免了单点故障. Minio 分布式模式可以搭建一个高可用的对象存储服务,你可以使用这些存储设备,而不用考虑其真实物理位置. (1)数据保护 分布式 Minio 采用纠删码(erasure code)来防范多个节点宕机和位衰减(bit rot). 分布式 Minio 至少需要 4 个节点,使用分布式 Minio 就自动引入了纠删码功能. 纠

阿里云ECS服务器部署HADOOP集群(三):ZooKeeper 完全分布式集群搭建

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建,多添加了一个 datanode 节点 . 1 节点环境介绍: 1.1 环境介绍: 服务器:三台阿里云ECS服务器:master, slave1, slave2 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz ZooKeeper: zookeeper-3.4.14.tar.gz 1.2 各节点角色