【kafka】安装部署kafka集群(kafka版本:kafka_2.12-2.3.0)

2.1 下载kafka并安装
kafka_2.12-2.3.0.tgz

tar -zxvf kafka_2.12-2.3.0.tgz

2.2 配置kafka集群

在config/server.properties中修改参数:
[[email protected] kafka_2.12-2.3.0]$ cd config

[[email protected] config]$ gedit server.properties
参数1:添加host.name=hadoop01
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
host.name=hadoop01
参数2:修改log.dirs
############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/home/hadoop/kafka_2.12-2.3.0/logs
参数3:添加zookeeper节点地址和端口
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181

2.3 拷贝至各节点
scp -r ~/kafka_2.12-2.3.0 hadoop02:~/
scp -r ~/kafka_2.12-2.3.0 hadoop03:~/

2.4 到各节点修改server.properties里面的broker.id参数,其余参数不变

hadoop02节点:
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
host.name=hadoop02
hadoop03节点:
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
host.name=hadoop03

2.5启动kafka(需要先启动zookeeper)

2.5.1 启动kafka之前必须先启动zookeeper

[[email protected] ~]$ cd bin
[[email protected] bin]$ sh zookeeperstart.sh --这里是一键启动zookeeper脚本 ,具体见https://www.cnblogs.com/CQ-LQJ/p/11605603.html
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
INFO:starting zookeeper on 01
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
INFO:starting zookeeper on 02
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
INFO:starting zookeeper on 03
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /home/hadoop/apache-zookeeper-3.5.5/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

2.5.2 zookeeper启动后执行启动kafka命令

[[email protected] bin]$ cd ~/kafka_2.12-2.3.0
[[email protected] kafka_2.12-2.3.0]$ bin/kafka-server-start.sh  config/server.properties --此命令正确,但不能关闭窗口,卡在进程界面

2.5.2.1.1 第一次启动报错(解决办法:升级到jdk8 131或者151均可):
[2019-09-30 00:28:56,482] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-30 00:28:56,875] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.VerifyError: Uninitialized object exists on backward branch 209
Exception Details:
  Location:
    scala/collection/immutable/HashMap$HashTrieMap.split()Lscala/collection/immutable/Seq; @249: goto
  Reason:
    Error exists in the bytecode
  Bytecode:
    0000000: 2ab6 0064 04a0 001e b200 c1b2 00c6 04bd
    0000010: 0002 5903 2a53 c000 c8b6 00cc b600 d0c0
    0000020: 00d2 b02a b600 38b8 0042 3c1b 04a4 0156
    0000030: 1b05 6c3d 2a1b 056c 2ab6 0038 b700 d43e
    0000040: 2ab6 0038 021d 787e 3604 2ab6 0038 0210
    0000050: 201d 647c 7e36 05bb 0019 59b2 00c6 2ab6
    0000060: 003a c000 c8b6 00d8 b700 db1c b600 df3a
    0000070: 0619 06c6 001a 1906 b600 e3c0 008b 3a07
    0000080: 1906 b600 e6c0 008b 3a08 a700 0dbb 00e8
    0000090: 5919 06b7 00eb bf19 073a 0919 083a 0abb
    00000a0: 0002 5915 0419 09bb 0019 59b2 00c6 1909
    00000b0: c000 c8b6 00d8 b700 db03 b800 f13a 0e3a
    00000c0: 0d03 190d b900 f501 0019 0e3a 1136 1036
    00000d0: 0f15 0f15 109f 0027 150f 0460 1510 190d
    00000e0: 150f b900 f802 00c0 0005 3a17 1911 1917
    00000f0: b800 fc3a 1136 1036 0fa7 ffd8 1911 b801
    0000100: 00b7 0069 3a0b bb00 0259 1505 190a bb00
    0000110: 1959 b200 c619 0ac0 00c8 b600 d8b7 00db
    0000120: 03b8 00f1 3a13 3a12 0319 12b9 00f5 0100
    0000130: 1913 3a16 3615 3614 1514 1515 9f00 2715
    0000140: 1404 6015 1519 1215 14b9 00f8 0200 c000
    0000150: 053a 1819 1619 18b8 0103 3a16 3615 3614
    0000160: a7ff d819 16b8 0100 b700 693a 0cbb 0105
    0000170: 5919 0bbb 0105 5919 0cb2 010a b701 0db7
    0000180: 010d b02a b600 3a03 32b6 010f b0       
  Stackmap Table:
    same_frame(@35)
    full_frame(@141,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118]},{})
    append_frame(@151,Object[#139],Object[#139])
    full_frame(@209,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
    full_frame(@252,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Top,Top,Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#159],Uninitialized[#159],Integer,Object[#139]})
    full_frame(@312,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
    full_frame(@355,{Object[#2],Integer,Integer,Integer,Integer,Integer,Object[#118],Object[#139],Object[#139],Object[#139],Object[#139],Object[#2],Top,Object[#25],Object[#62],Integer,Integer,Object[#116],Object[#25],Object[#62],Integer,Integer,Object[#116]},{Uninitialized[#262],Uninitialized[#262],Integer,Object[#139]})
    full_frame(@387,{Object[#2],Integer},{})

at scala.collection.immutable.HashMap$.scala$collection$immutable$HashMap$$makeHashTrieMap(HashMap.scala:185)
    at scala.collection.immutable.HashMap$HashMap1.updated0(HashMap.scala:220)
    at scala.collection.immutable.HashMap.updated(HashMap.scala:62)
    at scala.collection.immutable.Map$Map4.updated(Map.scala:227)
    at scala.collection.immutable.Map$Map4.$plus(Map.scala:228)
    at scala.collection.immutable.Map$Map4.$plus(Map.scala:200)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
    at scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
    at scala.collection.TraversableOnce.$anonfun$toMap$1(TraversableOnce.scala:320)
    at scala.collection.TraversableOnce$$Lambda$10/838411509.apply(Unknown Source)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableOnce.toMap(TraversableOnce.scala:319)
    at scala.collection.TraversableOnce.toMap$(TraversableOnce.scala:317)
    at scala.collection.AbstractTraversable.toMap(Traversable.scala:108)
    at kafka.api.ApiVersion$.<init>(ApiVersion.scala:98)
    at kafka.api.ApiVersion$.<clinit>(ApiVersion.scala)
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:146)
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:854)
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala)
    at kafka.metrics.KafkaMetricsConfig.<init>(KafkaMetricsConfig.scala:32)
    at kafka.metrics.KafkaMetricsReporter$.startReporters(KafkaMetricsReporter.scala:62)
    at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:27)
    at kafka.Kafka$.main(Kafka.scala:68)
    at kafka.Kafka.main(Kafka.scala)
现在jdk版本:
[[email protected] kafka_2.12-2.3.0]$ java -version
java version "1.8.0_11"
Java(TM) SE Runtime Environment (build 1.8.0_11-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.11-b03, mixed mode)

2.5.2.1.2 kafka无法启动或卡死
<可能原因:虚拟机内存不足

解决方法:修改启动脚本的初始内存1G -> 200m

1、打开脚本 vim bin/kafka-server-start.sh

2、找到:export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

3、修改为:export KAFKA_HEAP_OPTS="-Xmx200m -Xms200m"

4、重启kafka:bin/kafka-server-start.sh config/server.properties>

2.5.2.2 kafka启动方式:
加daemon和&,用守护进程的启动方式。(直接在控制台打印输出,不能关闭窗口 命令:bin/kafka-server-start.sh config/server.properties)

[[email protected] bin]$ cd ~/kafka_2.12-2.3.0
[[email protected] kafka_2.12-2.3.0]# bin/kafka-server-start.sh -daemon config/server.properties & 

2.5.2.3 用jps命令查看kafka是否启动成功

[[email protected] kafka_2.12-2.3.0]# jps
8736 NodeManager
8593 ResourceManager
8083 DataNode
7942 NameNode
8330 SecondaryNameNode
12700 Jps
11981 Kafka

注:启动关闭kafka和zookeeper的顺序,先启动zookeeper再启动kafka,先停止kafka再停止zookeeper

2.5.2.4 在其他节点利用同样方式启动kafka

说明事项:在hadoop集群应用中,只要启动任何一个组件自带的zookeeper,或者是独立的zookeeper,就可以为其他任何需要zookeeper服务的组件提供支持,并不需要单独启动自带的zookeeper

2.6 测试kafka

2.6.1 创建topics(消息主题:相当于文件系统目录,用于保存消息内容)

[[email protected] kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --create --topic testr --replication-factor 3 --partitions 3
Created topic testr.

2.6.2 查看topic 和 topic详情

[[email protected] kafka_2.12-2.3.0]$ bin/kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --list
testr

2.6.3 开启Kafka producer生产者(在hadoop01和hadoop02和hadoop03都可以),模拟producer发送消息,用命令行的方式手动的往kafka的topic里面发送消息:

[[email protected] kafka_2.12-2.3.0]$ bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr
>THI^H^H\^H^H^H   this is message from terminal
>hello
>my^H^H  hahha
>
>
>11111
>
>11111111111111111111
> 

注:作为producer,上面启动的终端一直处于发送消息的状态,就是等待用户输入命令,并保存到topic。这个时候,我们需要在另开一个终端创建consumer消费者,才能接受这些消息

2.6.4 开启Kafka consumer消费者(在hadoop1和hadoop2和hadoop3都可以)

[[email protected] kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic testr --from-beginning (kafka版本0.9以前用此命令)
[[email protected] kafka_2.12-2.3.0]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic testr --from-beginning (kafka版本0.9以后用此命令)
hello

  hahha
11111
this is message from terminal

11111111111111111111

说明事项: producer 和 consumer 通过kafka中间件联系起来,这种应用模式在很多数据处理系统中都可以发挥积极的作用。例如:在一些实时的大数据应用中,kafka可以保存从数据源产生的数据,consumer可以采用自己的数据率保存数据,因此kafka起到一个缓冲的作用

原文地址:https://www.cnblogs.com/CQ-LQJ/p/11617126.html

时间: 2024-10-14 01:54:26

【kafka】安装部署kafka集群(kafka版本:kafka_2.12-2.3.0)的相关文章

使用docker安装部署Spark集群来训练CNN(含Python实例)

使用docker安装部署Spark集群来训练CNN(含Python实例) 本博客仅为作者记录笔记之用,不免有很多细节不对之处. 还望各位看官能够见谅,欢迎批评指正. 博客虽水,然亦博主之苦劳也. 如需转载,请附上本文链接,不甚感激! http://blog.csdn.net/cyh_24/article/details/49683221 实验室有4台神服务器,每台有8个tesla-GPU,然而平时做实验都只使用了其中的一个GPU,实在暴遣天物! 于是想用spark来把这些GPU都利用起来.听闻d

kubernetes系列03—kubeadm安装部署K8S集群

1.kubernetes安装介绍 1.1 K8S架构图 1.2 K8S搭建安装示意图 1.3 安装kubernetes方法 1.3.1 方法1:使用kubeadm 安装kubernetes(本文演示的就是此方法) 优点:你只要安装kubeadm即可:kubeadm会帮你自动部署安装K8S集群:如:初始化K8S集群.配置各个插件的证书认证.部署集群网络等.安装简易. 缺点:不是自己一步一步安装,可能对K8S的理解不会那么深:并且有那一部分有问题,自己不好修正. 1.3.2 方法2:二进制安装部署k

CentOS7.X环境下基于docker安装部署RabbitMQ集群

1.IP地址规划(将信息配置到/etc/hosts中)主机名 IP地址RabbitMQ01 192.168.8.131RabbitMQ02 192.168.8.132RabbitMQ03 192.168.8.133RabbitMQ04 192.168.8.1342.RabbitMQ集群安装(1)四个节点同时运行,下载RabbitMQ镜像[[email protected]~]# docker pull rabbitmq:3-management(2)四个节点分别运行,启动RabbitMQ容器 [

安装部署Kubernetes集群实战

kubernetes概述: Kubernetes是一个开源的,用于管理云平台中多个主机上的容器化的应用,Kubernetes的目标是让部署容器化的应用简单并且高效(powerful),Kubernetes提供了应用部署,规划,更新,维护的一种机制.Kubernetes是Google 2014年创建管理的,是Google 10多年大规模容器管理技术Borg的开源版本. 通过kubernetes可以实现的功能: 快速部署应用 快速扩展应用 无缝对接新的应用功能 节省资源,优化硬件资源的使用 我们的目

kubernetes 基础-安装部署etcd集群

HDSS7-200上: ~]# wget https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 -O /usr/bin/cfssl ~]# wget https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 -O /usr/bin/cfssl-json ~]# wget https://pkg.cfssl.org/R1.2/cfssl-certinfo_linux-amd64 -O /usr/bin/cfssl-cert

K8S简介+CentOS7 部署K8S集群

一.前言 Kubernetes(简称K8S)是开源的容器集群管理系统,可以实现容器集群的自动化部署.自动扩缩容.维护等功能.它既是一款容器编排工具,也是全新的基于容器技术的分布式架构领先方案.在Docker技术的基础上,为容器化的应用提供部署运行.资源调度.服务发现和动态伸缩等功能,提高了大规模容器集群管理的便捷性.[Kubernetes是容器集群管理工具] 二.Kubernetes的架构图 三.重要概念 3.1.cluster cluster是 计算.存储和网络资源的集合,k8s利用这些资源运

kafka linux下的集群安装

第一步.kafka 集群安装环境准备 环境:CentOS6.5 集群环境: 192.168.139.130 master 192.168.139.131 node1 192.168.139.132 node2 zookeeper 版本:zookeeper-3.3.6.tar.gz kafka 版本:kafka_2.11-0.9.0.0.tar.gz 第二步.安装zoookeeper集群 注意kafka有自己自带的zookeeper,我这里没用kafka自带的zookeeper集群,而是自己安装的

zookeeper与kafka安装部署及java环境搭建

1. ZooKeeper安装部署 本文在一台机器上模拟3个zk server的集群安装. 1.1. 创建目录.解压 cd /usr/ #创建项目目录 mkdir zookeeper cd zookeeper mkdir tmp mkdir zookeeper-1 mkdir zookeeper-2 mkdir zookeeper-3 cd tmp mkdir zk1 mkdir zk2 mkdir zk3 cd zk1 mkdir data mkdir log cd zk2 mkdir data

阿里云ECS服务器部署HADOOP集群(四):Hive本地模式的安装

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建. 本地模式需要采用MySQL数据库存储数据. 1 环境介绍 一台阿里云ECS服务器:master 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz Hive:apache-hive-2.3.6-bin.tar.gz Mysql: Mysql 5.7 MySQL Connector-J:mysql-

在Docker中安装和部署MongoDB集群

在Docker中安装mongodb 采用的mongodb镜像:https://registry.hub.docker.com/u/tutum/mongodb/ 以该镜像启动一个容器(注意此时mongodb是standalone模式): docker run -d --name=mongodb -p 27017:27017 -p 28017:28017 tutum/mongodb:3.0 docker logs mongodb 输出信息: =============================