Linux下kafka集群搭建过程记录

环境准备

  1. zookeeper集群环境
    kafka是依赖于zookeeper注册中心的一款分布式消息对列,所以需要有zookeeper单机或者集群环境。
  2. 三台服务器:
172.16.18.198 k8s-n1
172.16.18.199 k8s-n2
172.16.18.200 k8s-n3
  1. 下载kafka安装包

http://kafka.apache.org/downloads 中下载,目前最新版本的kafka已经到2.2.0,我这里之前下载的是kafka_2.11-2.2.0.tgz.

安装kafka集群

1.上传压缩包到三台服务器解压缩到/opt/目录下

tar -zxvf kafka_2.11-2.2.0.tgz -C /opt/
ls -s kafka_2.11-2.2.0 kafka

2.修改 server.properties

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://k8s-n1:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://k8s-n1:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/var/applog/kafka/

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=5

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=24

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=k8s-n1:2181,k8s-n2:2181,k8s-n3:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

delete.topic.enable=true

拷贝两份到k8s-n2,k8s-n3

[[email protected] config]# cat server.properties
broker.id=1
listeners=PLAINTEXT://k8s-n2:9092
advertised.listeners=PLAINTEXT://k8s-n2:9092

[[email protected] config]# cat server.properties
broker.id=2
listeners=PLAINTEXT://k8s-n3:9092
advertised.listeners=PLAINTEXT://k8s-n3:9092
  1. 添加环境变量 在/etc/profile 中添加
export ZOOKEEPER_HOME=/opt/kafka_2.11-2.2.0
export PATH=$PATH:$ZOOKEEPER_HOME/bin

source /etc/profile 重载生效

  1. 启动kafka
kafka-server-start.sh config/server.properties &

Zookeeper+Kafka集群测试

1.创建topic:

kafka-topics.sh --create --zookeeper k8s-n1:2181, k8s-n2:2181, k8s-n3:2181 --replication-factor 3 --partitions 3 --topic test

2.显示topic

kafka-topics.sh --describe --zookeeper k8s-n1:2181, k8s-n2:2181, k8s-n3:2181 --topic test

3.列出topic

kafka-topics.sh --list --zookeeper k8s-n1:2181, k8s-n2:2181, k8s-n3:2181
test

创建 producer(生产者);

kafka-console-producer.sh --broker-list k8s-n1:9092 --topic test
hello

创建 consumer(消费者)

kafka-console-consumer.sh --bootstrap-server k8s-n1:9092 --topic test --from-beginning
hello

至此,kafka集群搭建就已经完成了。

原文地址:https://www.cnblogs.com/gxyandwmm/p/11294540.html

时间: 2024-08-01 01:58:07

Linux下kafka集群搭建过程记录的相关文章

Linux下kafka集群的搭建

上一篇日志已经搭建好了zookeeper集群,详细请查看:http://www.cnblogs.com/lianliang/p/6533670.html,接下来继续搭建kafka的集群 1.首先下载kafka的gz包:http://kafka.apache.org/downloads 解压到/opt/soft/kafka/目录下,解压之后并创建文件夹logs,用于存在kafka的日志文件 进入到kafka/config的目录下,修改配置文件,vim  server.properties,需要修改

Linux下ZooKeeper集群搭建

首先安装VMWare,装了两个虚拟机,版本是CentOS6.5,IP分别是 192.168.3.20 192.168.3.21 然后下载ZooKeeper包:http://zookeeper.apache.org/,使用命令tar -zxvf  zookeeper.tar.gz 进行解压,解压到/opt/soft目录 修改主机名 vim  /etc/hosts ,在hosts文件中添加一下两项 192.168.1.20 master 192.168.1.21 slave1 进入zookeeper

linux下Mongodb集群搭建:分片+副本集

三台服务器 192.168.1.40/41/42 安装包 mongodb-linux-x86_64-amazon2-4.0.1.tgz 服务规划  服务器40  服务器41  服务器42  mongos  mongos  mongos  config server  config server  config server  shard server1 主节点  shard server1副节点  shard server1仲裁  shard server2 仲裁  shard server2 

分布式实时日志系统(一)环境搭建之 Jstorm 集群搭建过程/Jstorm集群一键安装部署

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 淘宝开源了许多产品组件

KAFKA集群搭建

一.简介 Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark都支持与Kafka集成.   Kafka适合做什么? 官方文档介绍,它通常被使用在两大类应用中: 搭建实时数据流管道,在系统或应用之间可靠的获取数据 搭建对数据流进行转换或相应的实时流应用程序.   为了了解Kafka具体如何实现这些功能, 首先理解几个概

Kafka【第一篇】Kafka集群搭建

Kafka初识 1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户的搜索关键词进行统计,分析出当前的流行趋势 有些数据,存储数据库浪费,直接存储硬盘效率又低 这些场景都有一个共同点: 数据是又上游模块产生,上游模块,使用上游模块的数据计算.统计.分析,这个时候就可以使用消息系统,尤其是分布式消息系统! 2.Kafka的定义 What is Kafka:它是一个分布

kafka学习(三)-kafka集群搭建

kafka集群搭建 下面简单的介绍一下kafka的集群搭建,单个kafka的安装更简单,下面以集群搭建为例子. 我们设置并部署有三个节点的 kafka 集合体,必须在每个节点上遵循下面的步骤来启动 kafka 服务器,kafka集群需要依赖zookeeper集群,上一篇已经说道了zookeeper的搭建,方法请参考:http://www.cnblogs.com/chushiyaoyue/p/5615267.html 1.环境准备 测试服务器(2n+1)奇数台 192.168.181.128 ce

kafka集群搭建和使用Java写kafka生产者消费者

 kafka集群搭建 Java代码   1.zookeeper集群  搭建在110, 111,112 2.kafka使用3个节点110, 111,112 修改配置文件config/server.properties broker.id=110 host.name=192.168.1.110 log.dirs=/usr/local/kafka_2.10-0.8.2.0/logs 复制到其他两个节点,然后修改对应节点上的config/server.pro 3.启动,在三个节点分别执行 bin/kaf

Linux下Elasticsearch集群配置

一.简介 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 二.集群安装 1.选择指定的版本下载 wget https://download.elastic.co/elasticsearch/release/org/elas