Kafka1 利用虚拟机搭建自己的Kafka集群

前言:

      上周末自己学习了一下Kafka,参考网上的文章,学习过程中还是比较顺利的,遇到的一些问题最终也都解决了,现在将学习的过程记录与此,供以后自己查阅,如果能帮助到其他人,自然是更好的。

===============================================================长长的分割线====================================================================

正文:

  关于Kafka的理论介绍,网上可以搜到到很多的资料,大家可以自行搜索,我这里就不在重复赘述。

本文中主要涉及三块内容: 第一,就是搭建Zookeeper环境;第二,搭建Kafka环境,并学习使用基本命令发送接收消息;第三,使用Java API完成操作,以便初步了解在实际项目中的使用方式。

  闲话少说,言归正传,本次的目的是利用VMware搭建一个属于自己的ZooKeeper和Kafka集群。本次我们选择的是VMware10,具体的安装步骤大家可以到网上搜索,资源很多。

  第一步,确定目标:

ZooKeeperOne       192.168.224.170  CentOS

ZooKeeperTwo       192.168.224.171  CentOS

ZooKeeperThree     192.168.224.172  CentOS

KafkaOne                192.168.224.180  CentOS

KafkaTwo                192.168.224.181  CentOS

我们安装的ZooKeeper是3.4.6版本,可以从这里下载zookeeper-3.4.6; Kafka安装的是0.8.1版本,可以从这里下载kafka_2.10-0.8.1.tgz; JDK安装的版本是1.7版本。

另: 我在学习的时候,搭建了两台Kafka服务器,正式环境中我们最好是搭建2n+1台,此处仅作为学些之用,暂不计较。

第二步,搭建Zookeeper集群:

此处大家可以参照我之前写的一篇文章 ZooKeeper1  利用虚拟机搭建自己的ZooKeeper集群 ,我在搭建Kafka的环境的时候就是使用的之前搭建好的Zookeeper集群。

第三步,搭建Kafka集群:

(1). 将第一步中下载的 kafka_2.10-0.8.1.tgz 解压缩后,进入config目录,会看到如下图所示的一些配置文件,我们准备编辑server.properties文件。

(2). 打开 server.properties 文件,需要编辑的属性如下所示:

1 broker.id=0
2 port=9092
3 host.name=192.168.118.80
4
5 log.dirs=/opt/kafka0.8.1/kafka-logs
6
7 zookeeper.connect=192.168.224.170:2181,192.168.224.171:2181,192.168.224.172:2181

注意:

a. broker.id: 每个kafka对应一个唯一的id,自行分配即可

b. port: 默认的端口号是9092,使用默认端口即可

c. host.name: 配置的是当前机器的ip地址

d. log.dirs: 日志目录,此处自定义一个目录路径即可

e. zookeeper.connect: 将我们在第二步搭建的Zookeeper集群的配置全部写上

(3). 上边的配置完毕后,我们需要执行命令 vi /etc/hosts,将相关服务器的host配置如下图,如果没有执行此步,后边我们在执行一些命令的时候,会报无法识别主机的错误。

(4).  经过上述操作,我们已经完成了对Kafka的配置,很简单吧?!但是如果我们执行 bin/kafka-server-start.sh   config/server.properties  & 这个启动命令,可能我们会遇到如下两个问题:

a. 我们在启动的报 Unrecognized VM option ‘+UseCompressedOops‘.Could not create the Java virtual machine. 这个错误。

解决方式:

查看 bin/kafka-run-class.sh

找到下面这段代码,去掉-XX:+UseCompressedOops     

1 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
2 KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
3 fi

b. 解决了第一个问题,我们还有可能在启动的时候遇到 java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder 这个错误。

解决方式:

从网上的下载 slf4j-nop-1.6.0.jar 这个jar包,然后放到kafka安装目录下的libs目录中即可。注意,基于我目前的kafka版本,我最开始从网上下载的slf4j-nop-1.5.0.jar 这个jar包,但是启动的时候依然会报错,所以一定要注意版本号哦~

      (5). 现在我们执行 bin/kafka-server-start.sh   config/server.properties  & 这个启动命令,应该就可以正常的启动Kafka了。命令最后的 & 符号是为了让启动程序在后台执行。如果不加这个 & 符号,当执行完启动后,我们通常会使用 ctrl + c 退出当前控制台,kafka此时会自动执行shutdown,所以此处最好加上 & 符号。

第三步,使用基本命令创建消息主题,发送和接收主题消息:

(1). 创建、查看消息主题

 1 #连接zookeeper, 创建一个名为myfirsttopic的topic
 2 bin/kafka-topics.sh --create --zookeeper 192.168.224.170:2181 --replication-factor 2 --partitions 1 --topic myfirsttopic
 3
 4 # 查看此topic的属性
 5 bin/kafka-topics.sh --describe --zookeeper 192.168.224.170:2181 --topic myfirsttopic
 6
 7 # 查看已经创建的topic列表
 8 bin/kafka-topics.sh --list --zookeeper 192.168.224.170:2181 myfirsttopic  

上述命令执行完毕后,截图如下:

(2). 创建一个消息的生产者:

1 #启动生产者,发送消息
2 bin/kafka-console-producer.sh --broker-list 192.168.224.180:9092 --topic myfirsttopic
3
4 #启动消费者,接收消息
5 bin/kafka-console-consumer.sh --zookeeper 192.168.224.170:2181 --from-beginning --topic myfirsttopic

上述命令执行完毕后,截图如下:

(3). 按照(1)、(2)这两步,你应该可以利用Kafka感受到了分布式消息系统。这里需要着重的再说一下我在这个过程中发现的一个问题: 大家可以看下上图中的consumer的命令,我选择了zookeeper的其中一台192.168.224.170:2181接收消息是可以正常接收的!不要忘了,我是三台zookeeper的,所以我又尝试了向192.168.224.171:2181和192.168.224.172:2181接收myfirsttopic这个主题的消息。正常情况下,三台访问的结果应该都是可以正常的接收消息,但是当时我的情况在访问了192.168.224.171:2181这台时会报 org.apache.zookeeper.clientcnxn 这个错误!!!

我当时多试了两遍,发现我的三台zookeeper中,谁是leader,concumer连接的时候就会报上面的那个异常。后来定位到了zookeeper的zoo.cfg配置文件中的maxClientCnxns属性,即客户端最大连接数,我当时使用的是默认配置是2。后来我把这个属性的值调大一些,consumer连接zookeeper leader时,就不会报这个错误了。如果你选择将这个属性注释掉(从网上查询到注释掉该属性默认值是10),也不会报这个错误了。其实网上的很多文章也只是说了此属性可以尽量设置的大一些,没有解释其他的。

但我后来还是仔细想了想,当我把maxClientCnxns这个属性设置为2时,如果两台kafka启动时,每个kafka和zookeeper的节点之间建立了一个客户端连接,那么此时zookeeper的每个节点的客户端连接数就已经达到了最大连接数2,那么我创建consumer的时候,应该是三台zookeeper连接都有问题,而不是只有leader会有问题。所以,此处需要各位有见解的再帮忙解释一下!!! 

第四步,使用Java API 操作Kafka:

其实Java API提供的功能基本也是基于上边的客户端命令来实现的,万变不离其宗,我将我整理的网上的例子贴到下面,大家可以在本地Java工程中执行一下,即可了解调用方法。

(1). 我的maven工程中pom.xml的配置

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 2   <modelVersion>4.0.0</modelVersion>
 3   <groupId>com.ismurf.study</groupId>
 4   <artifactId>com.ismurf.study.kafka</artifactId>
 5   <version>0.0.1-SNAPSHOT</version>
 6   <name>Kafka_Project_0001</name>
 7   <packaging>war</packaging>
 8
 9   <dependencies>
10           <dependency>
11             <groupId>org.apache.kafka</groupId>
12             <artifactId>kafka_2.10</artifactId>
13             <version>0.8.1.1</version>
14         </dependency>
15   </dependencies>
16
17   <build>
18       <plugins>
19           <plugin>
20             <groupId>org.apache.maven.plugins</groupId>
21             <artifactId>maven-war-plugin</artifactId>
22             <version>2.1.1</version>
23             <configuration>
24                 <outputFileNameMapping>@{artifactId}@[email protected]{extension}@</outputFileNameMapping>
25             </configuration>
26         </plugin>
27
28         <!-- Ensures we are compiling at 1.6 level -->
29         <plugin>
30             <groupId>org.apache.maven.plugins</groupId>
31             <artifactId>maven-compiler-plugin</artifactId>
32             <configuration>
33                 <source>1.6</source>
34                 <target>1.6</target>
35             </configuration>
36         </plugin>
37
38         <plugin>
39             <groupId>org.apache.maven.plugins</groupId>
40             <artifactId>maven-surefire-plugin</artifactId>
41             <configuration>
42                 <skipTests>true</skipTests>
43             </configuration>
44         </plugin>
45       </plugins>
46   </build>
47
48 </project>

(2). 实例代码: 大家可以参考这片文章的  http://blog.csdn.net/honglei915/article/details/37563647 中的代码,粘贴到工程后即可使用,上述文章中的代码整理后目录截图如下:

时间: 2024-12-11 12:09:50

Kafka1 利用虚拟机搭建自己的Kafka集群的相关文章

ZooKeeper1 利用虚拟机搭建自己的ZooKeeper集群

前言:       前段时间自己参考网上的文章,梳理了一下基于分布式环境部署的业务系统在解决数据一致性问题上的方案,其中有一个方案是使用ZooKeeper,加之在大数据处理中,ZooKeeper确实起到协调服务的作用,所以利用周末休息时间,自己在虚拟机上简单搭建了一个ZooKeeper集群,学习了解一下. ===============================================================长长的分割线===========================

使用Docker快速搭建Zookeeper和kafka集群

集群搭建 镜像选择 Zookeeper和Kafka集群分别运行在不同的容器中zookeeper官方镜像,版本3.4kafka采用wurstmeister/kafka镜像 集群规划 hostname Ip addr port listener zoo1 172.19.0.11 2184:2181 zoo2 172.19.0.12 2185:2181 zoo3 172.19.0.13 2186:2181 kafka1 172.19.0.14 9092:9092 kafka1 kafka2 172.1

Hadoop4 利用VMware搭建自己的hadoop集群

前言:       前段时间自己学习如何部署伪分布式模式的hadoop环境,之前由于工作比较忙,学习的进度停滞了一段时间,所以今天抽出时间把最近学习的成果和大家分享一下.       本文要介绍的是如何利用VMware搭建自己的hadoop的集群.如果大家想了解伪分布式的大家以及eclipse中的hadoop编程,可以参考我之前的三篇文章. 1.在Linux环境中伪分布式部署hadoop(SSH免登陆),运行WordCount实例成功. http://www.cnblogs.com/Purple

【译】调优Apache Kafka集群

今天带来一篇译文"调优Apache Kafka集群",里面有一些观点并无太多新颖之处,但总结得还算详细.该文从四个不同的目标出发给出了各自不同的参数配置,值得大家一读~ 原文地址请参考:https://www.confluent.io/blog/optimizing-apache-kafka-deployment/ ========================================== Apache Kafka是当前最好的企业级流式处理平台.把你的应用程序链接到Kafka

docker 搭建zookeeper集群和kafka集群

docker 搭建zookeeper集群 安装docker-compose容器编排工具 Compose介绍 Docker Compose 是 Docker 官方编排(Orchestration)项目之一,负责快速在集群中部署分布式应用. Compose 项目是 Docker 官方的开源项目,负责实现对 Docker 容器集群的快速编排.Compose 定位是 「定义和运行多个 Docker 容器的应用(Defining and running multicontainer Docker appl

完全分布式ZooKeeper集群和Kafka集群的搭建和使用

自己使用的版本为zookeeper-3.4.7.tar.gz和kafka_2.10-0.9.0.0.tgz.首先要安装JDK(jdk-7u9-linux-i586.tar.gz)和SSH,IP地址的分配为Kafka1(192.168.56.136),Kafka2(192.168.56.137),Kafka3(192.168.56.138).下面主要介绍SSH的安装,ZooKeeper和Kafka集群的搭建和使用. 一. SSH的安装 (1)apt-get install ssh (2)/etc/

一脸懵逼学习KafKa集群的安装搭建--(一种高吞吐量的分布式发布订阅消息系统)

1:KafKa的官方网址:http://kafka.apache.org/ 开发流程图,如: 2:KafKa的基础知识: 2.1:kafka是一个分布式的消息缓存系统2.2:kafka集群中的服务器都叫做broker2.3:kafka有两类客户端,一类叫producer(消息生产者),一类叫做consumer(消息消费者),客户端和broker服务器之间采用tcp协议连接2.4:kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载2.

window环境搭建zookeeper,kafka集群

为了演示集群的效果,这里准备一台虚拟机(window 7),在虚拟机中搭建了单IP多节点的zookeeper集群(多IP节点的也是同理的),并且在本机(win 7)和虚拟机中都安装了kafka. 前期准备说明: 1.三台zookeeper服务器,本机安装一个作为server1,虚拟机安装两个(单IP) 2.三台kafka服务器,本机安装一个作为server1,虚拟机安装两个. 备注:当然你可以直接在虚拟机上安装三个服务器分别为server1.server2.server3 . 虚拟机和本机网络环

docker容器搭建kafka集群

Docker搭建kafka集群 ?  需求说明: 公司目前有三个环境,生产环境,测试和演示环境,再包括开发人员还有开发的环境,服务器上造成了一定的资源浪费,因为环境需要依赖zookeeper和kafka,redis这些服务,只要搭一个环境,所有东西都要重新搭一遍,所以搭建kafka集群,让大部分环境都连接一个集群,把单个的服务变成公共的,稳定并易于管理 ?  Kafka集群管理和状态保存是通过zookeeper来实现的,要先部署zk集群 ?  环境说明: centos系统安装docker,通过d