Kafka集群部署及測试

题记

眼下我们对大数据进行研究方向以Spark为主,当中Spark Streaming是能够接收动态数据流并进行处理。那么Spark Streaming支持多源的数据发送端,比如TCP、ZeroMQ、自然也包含Kafka,并且Kafka+SparkStreaming的技术融合也比較经常使用并且成熟,所以我们须要搭建一个Kafka集群进行流数据的測试。

--------------------------------------------------------------------------------------

Blog:    http://blog.csdn.net/chinagissoft

QQ群:

idkey=db34317167632c01ab4750de87c000ae63cf173cf6fcbbd724ae60213272da91">16403743

宗旨:专注于"GIS+"前沿技术的研究与交流,将云计算技术、大数据技术、容器技术、物联网与GIS进行深度融合,探讨"GIS+"技术和行业解决方式

转载说明:文章同意转载。但必须以链接方式注明源地址。否则追究法律责任!

--------------------------------------------------------------------------------------

环境介绍

眼下我们的环境还是原有的Hadoop集群和Spark集群。

三台集群,一台主节点。两台子节点。

  • 192.168.12.210  master
  • 192.168.12.211  slave1
  • 192.168.12.212  slave2

相同,我们的目标在这三台机器部署Kafka集群,另外同一时候包含Zookeeper集群。

眼下安装的kafka版本号为:kafka_2.10-0.8.2.1(当中2.10是支持的Scala版本号,0.8.2.1是kafka版本号)

下载地址: https://kafka.apache.org/downloads.html

安装步骤

眼下对于开源软件的部署,特别是在Linux环境下,基本上为绿色安装,也就是仅仅须要解压缩软件包就可以。

1、在210集群进行操作

解压缩kafka软件包

tar –xvf kafka_2.10-0.8.2.1

2、创建zookeeper的数据文件夹并设定server编号

mkdir zk_dir

然后再zk_dir文件夹下创建一个myid文件,编辑该文件,设置一个server唯一编号就可以。比如在210机器上。myid文件仅仅须要输入1就可以。

3、改动%KAFKA_HOME%/config里面的配置文件,编辑 config/zookeeper.properties 文件,添加下面配置:

dataDir=/home/supermap/zk_dir/
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.12.210:2888:3888
server.2=192.168.12.211:2888:3888
server.3=192.168.12.212:2888:3888
  • tickTime:zookeeper server之间的心跳时间间隔。以毫秒为单位。
  • dataDir:zookeeper 的数据保存文件夹。我们也把 zookeeper server的 ID 文件保存到这个文件夹下
  • clientPort:zookeeper server会监听这个port。然后等待客户端连接。

  • initLimit:zookeeper 集群中 follower server和 leader server之间建立初始连接时所能容忍的心跳次数的极限值。

  • syncLimit:zookeeper 集群中 follower server和 leader server之间请求和应答过程中所能容忍的心跳次数的极限值。
  • server.N:N 代表的是 zookeeper 集群server的编号。

    对于配置值,以 192.168.12.210:2888:3888 为例。192.168.12.210 表示该server的 IP 地址,2888 port表示该server与 leader server的数据交换port,3888 表示选举新的 leader server时候用到的通信port。

4、改动%KAFKA_HOME%/config里面的配置文件

a. 编辑 config/server.properties 文件

[email protected]:~/kafka_2.10-0.8.2.1/config$ grep ^[a-z] server.properties
broker.id=1
port=9092
host.name=192.168.12.210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/supermap/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181
zookeeper.connection.timeout.ms=6000
  • broker.id:Kafka broker 的唯一标识。集群中不能反复。

  • port: Broker 的监听端口,用于监听 Producer 或者 Consumer 的连接。

  • host.name:当前 Broker server的 IP 地址或者机器名。
  • zookeeper.contact:Broker 作为 zookeeper 的 client,能够连接的 zookeeper 的地址信息。
  • log.dirs:日志保存文件夹。

注意:改动zookeeper.connect为自己集群的IP及port信息,HostName输入本机的IP。broker.id设置为唯一数字就可以。比如210机器我设置的id=1.

b. 编辑 config/producer.properties 文件

[email protected]:~/kafka_2.10-0.8.2.1/config$ grep ^[a-z] producer.properties
metadata.broker.list=192.168.12.210:9092,192.168.12.211:9092,192.168.12.212:9092
producer.type=async
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder
  • broker.list:集群中 Broker 地址列表。
  • producer.type: Producer 类型,async 异步生产者,sync 同步生产者。

c. 编辑 config/consumer.properties 文件

[email protected]:~/kafka_2.10-0.8.2.1/config$ grep ^[a-z] consumer.properties
zookeeper.connect=192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
  • zookeeper.connect: Consumer 能够连接的 zookeeper server地址列表。

当我们将210机器的配置所有设置完成之后,我们使用scp工具将kafka文件夹和zk_dir文件夹复制到211和212机器上,特别注意须要改动几个文件。

1、config/server.properties 文件的broker.id和hostname

2、zk_dir/myid

简单的能够将两个id相应起来,可是必需要保证集群环境中ID唯一

服务启动

对于kafka集群我们须要启动两个服务。

1、分别在三台不同集群启动zookeeper服务

前台启动:sh /home/supermap/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/zookeeper.properties

后台启动:nohup sh /home/supermap/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/zookeeper.properties &

注意:一開始启动第一台机器,服务信息会提示不能连接其它两台机器的消息输出。

这个是正常的。假设其它两台机器服务都启动了就没有问题了。

[2016-03-21 23:12:04,946] WARN Cannot open channel to 2 at election address /192.168.12.211:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2016-03-21 23:12:04,949] WARN Cannot open channel to 3 at election address /192.168.12.212:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)

2、分别在三台不同集群启动kafka服务

前台启动:sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/server.properties

后台启动:nohup sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/server.properties &

验证安装

1、创建和查看消息主题

连接zookeeper。创建一个名为user-behavior-topic的topic

[email protected]:~$ sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create > --replication-factor 3 > --partition 3 > --topic user-behavior-topic > --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181
Created topic "user-behavior-topic".

查看此topic属性

[email protected]:~$ sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --describe --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181 --topic user-behavior-topic
Topic:user-behavior-topic	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: user-behavior-topic	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: user-behavior-topic	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: user-behavior-topic	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3

查看已经创建的topic列表

[email protected]:~$ sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --list --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181 user-behavior-topic
user-behavior-topic

2、创建消息生产者发送消息

sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 192.168.12.210:9092,192.168.12.211:9092,192.168.12.212:9092 --topic user-behavior-topic

3、创建消息消费者接收消息

sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181 --from-beginning --topic user-behavior-topic

时间: 2024-08-26 07:09:00

Kafka集群部署及測试的相关文章

Zookeeper+Kafka集群部署

Zookeeper+Kafka集群部署 主机规划: 10.200.3.85  Kafka+ZooKeeper 10.200.3.86  Kafka+ZooKeeper 10.200.3.87  Kafka+ZooKeeper 软件下载地址: #wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz #wget http://mirror.bit.edu.cn/apache/

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

3、Kafka集群部署

Kafka集群部署 1)解压安装包 [ip101]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/app/ 2)修改解压后的文件名称 [ip101]$ mv kafka_2.11-0.11.0.0/ kafka 3)在/opt/app/kafka目录下创建logs文件夹 [ip101]$ mkdir logs 4)修改配置文件 [ip101]$ cd config/ [[email protected] config]$ vi server.propert

Kafka集群部署

一. 关于kafka Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案.Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费. 关于Kafka的

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

kafka集群部署步骤

参考: kafka 集群--3个broker 3个zookeeper创建实战 细细品味Kafka_Kafka简介及安装_V1.3http://www.docin.com/p-1291437890.html 一. 准备工作: 1. 准备3台机器,IP地址分别为:192.168.3.230(233,234) 2. 下载kafka稳定版本,我的版本为:Scala 2.11  - kafka_2.11-0.9.0.0.tgz http://kafka.apache.org/downloads.html

kafka集群部署文档(转载)

原文链接:http://www.cnblogs.com/luotianshuai/p/5206662.html Kafka初识 1.Kafka使用背景 在我们大量使用分布式数据库.分布式计算集群的时候,是否会遇到这样的一些问题: 我们想分析下用户行为(pageviews),以便我们设计出更好的广告位 我想对用户的搜索关键词进行统计,分析出当前的流行趋势 有些数据,存储数据库浪费,直接存储硬盘效率又低 这些场景都有一个共同点: 数据是由上游模块产生,上游模块,使用上游模块的数据计算.统计.分析,这

Kafka 集群部署

kafka是一个分布式消息队列,需要依赖ZooKeeper,请先安装好zk集群 kafka安装包解压 $ tar xf kafka_2.10-0.9.0.1.tgz $ mv kafka_2.10-0.9.0.1 /usr/kafka $ cd /usr/kafka 配置文件 server.properties # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Z

kafka 集群部署 多机多broker模式

环境IP : 172.16.1.35   zookeeper   kafka 172.16.1.36   zookeeper   kafka 172.16.1.37   zookeeper   kafka 开放端口  2181  2888  3888   9092 编辑  server.properties  文件  (以下为 172.16.1.35 的配置) #在默认的配置上,我只修改了4个地方.broker.id = 三个主机172.16.1.35,172.16.1.36,172.16.1.