Kafka介绍及安装部署

本节内容:

  • 消息中间件
  • 消息中间件特点
  • 消息中间件的传递模型
  • Kafka介绍
  • 安装部署Kafka集群
  • 安装Yahoo kafka manager
  • kafka-manager添加kafka cluster

一、消息中间件

消息中间件是在消息的传输过程中保存消息的容器。消息中间件在将消息从消息生产者到消费者时充当中间人的作用。队列的主要目的是提供路由并保证消息的传送;如果发送消息时接收者不可用,消息对列会保留消息,直到可以成功地传递它为止,当然,消息队列保存消息也是有期限的。

二、消息中间件特点

1. 采用异步处理模式
消息发送者可以发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(主题或者队列)上,消息接收者则订阅或者监听该通道。一条消息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出同步回应。整个过程是异步的。

  • 比如用户信息注册。注册完成后过段时间发送邮件或者短信。

2. 应用程序和应用程序调用关系为松耦合关系

  • 发送者和接收者不必要了解对方、只需要确认消息
  • 发送者和接收者不必同时在线

比如在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到信息中间件里通知订单系统修改订单支付状态。两个系统通过消息中间件解耦。

三、消息中间件的传递模型

1. 点对点模型(PTP)
点对点模型用于消息生产者和消息消费者之间点对点的通信。消息生产者将消息发送到由某个名字标识的特定消费者。这个名字实际上对应于消费服务中的一个队列(Queue),在消息传递给消费者之前它被存储在这个队列中。队列消息可以放在内存中也可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。
点对点模型特性:

  • 每个消息只有一个消费者
  • 发送者和接受者没有时间依赖
  • 接受者确认消息接受和处理成功

2. 发布—订阅模型(Pub/Sub)

发布者/订阅者模型支持向一个特定的消息主题生产消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。这种模式被概括为:多个消费者可以获得消息。在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便能够让消费者订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在这种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
其实消息中间件,像MySQL其实也可以作为消息中间件,只要你把消息中间件原理搞清楚,你会发现目前所有的存储,包括NoSQL,只要支持顺序性东西的,就可以作为一个消息中间件。就看你怎么去利用它了。就像redis里面那个队列list,就可以作为一个消息队列。
发布—订阅模型特性:

  • 每个消息可以有多个订阅者
  • 客户端只有订阅后才能接收到消息
  • 持久订阅和非持久订阅

(1) 发布者和订阅者有时间依赖

接收者和发布者只有建立订阅关系才能收到消息。

(2) 持久订阅

订阅关系建立后,消息就不会消失,不管订阅者是否在线。

(3) 非持久订阅

订阅者为了接收消息,必须一直在线

当只有一个订阅者时约等于点对点模式。

大部分情况下会使用持久订阅。常用的消息队列有Kafka、RabbitMQ、ActiveMQ、metaq等。

四、Kafka介绍

Kafka是一种分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础,具有高水平扩展和高吞吐量。

目前越来越多的开源分布式处理系统如Apache flume、Apache Storm、Spark、Elasticsearch都支持与Kafka集成。

五、安装部署Kafka集群

1. 环境信息

主机名 操作系统版本 IP地址 安装软件
log1 CentOS 7.0 114.55.29.86 JDK1.7、kafka_2.11-0.9.0.1
log2 CentOS 7.0 114.55.29.241 JDK1.7、kafka_2.11-0.9.0.1
log3 CentOS 7.0 114.55.253.15 JDK1.7、kafka_2.11-0.9.0.1

2. 安装JDK1.7

3台机器都需要安装JDK1.7。

[[email protected] local]# mkdir /usr/java
[[email protected] local]# tar zxf jdk-7u80-linux-x64.gz -C /usr/java/
[[email protected] local]# vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.7.0_80
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
[[email protected] local]# source /etc/profile

安装JDK7

3. 安装集群

需要先安装好Zookeeper集群,见之前的文章《Zookeeper介绍及安装部署》。

(1)创建消息持久化目录

[[email protected] ~]# mkdir /kafkaLogs

(2)下载解压kafka,版本是kafka_2.11-0.9.0.1

[[email protected] local]# wget http://mirrors.cnnic.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
[[email protected] local]# tar zxf kafka_2.11-0.9.0.1.tgz

(3)修改配置

[[email protected] local]# cd kafka_2.11-0.9.0.1/config/
[[email protected] config]# vim server.properties
  • 修改broker.id

  • 修改kafka监听地址

注意: advertised.host.name参数用来配置返回的host.name值,把这个参数配置为IP地址。这样客户端在使用java.net.InetAddress.getCanonicalHostName()获取时拿到的就是ip地址而不是主机名。

  • 修改消息持久化目录

  • 修改zk地址

  • 添加启用删除topic配置

  • 关闭自动创建topic

是否允许自动创建topic。如果设为true,那么produce,consume或者fetch metadata一个不存在的topic时,就会自动创建一个默认replication factor和partition number的topic。默认是true。

auto.create.topics.enable=false

(4)把log1的配置好的kafka拷贝到log2和log3上

[[email protected] local]# scp -rp kafka_2.11-0.9.0.1 [email protected]114.55.29.241:/usr/local/
[[email protected] local]# scp -rp kafka_2.11-0.9.0.1 [email protected]114.55.253.15:/usr/local/

(5)log2和log3主机上创建消息持久化目录

[[email protected] ~]# mkdir /kafkaLogs
[[email protected] ~]# mkdir /kafkaLogs

(6)修改log2配置文件中的broker.id为1,log3主机的为2

[[email protected] config]# vim server.properties

4. 启动集群

log1主机启动kafka:

[[email protected] ~]# cd /usr/local/kafka_2.11-0.9.0.1/
[[email protected] kafka_2.11-0.9.0.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &

log1主机启动kafka

log2主机启动kafka:

[[email protected] ~]# cd /usr/local/kafka_2.11-0.9.0.1/
[[email protected] kafka_2.11-0.9.0.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &

log2主机启动kafka

log3主机启动kafka:

[[email protected] ~]# cd /usr/local/kafka_2.11-0.9.0.1/
[[email protected] kafka_2.11-0.9.0.1]# JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties &

log3主机启动kafka

5. 脚本定期清理logs下的日志文件

默认kafka是按天切割日志的,而且不删除:

这里写一个简单的脚本来清理这些日志,主要是清理server.log和controller.log。

[[email protected] ~]# cd /usr/local/kafka_2.11-0.9.0.1/
[[email protected] kafka_2.11-0.9.0.1]# vim clean_kafkalog.sh
#!/bin/bash
###Description:This script is used to clear kafka logs, not message file.
###Written by: jkzhao - [email protected]
###History: 2016-04-18 First release.

# log file dir.
logDir=/usr/local/kafka_2.11-0.9.0.1/logs

# Reserved 7 files.
COUNT=7

ls -t $logDir/server.log* | tail -n +$[$COUNT+1] | xargs rm -f
ls -t $logDir/controller.log* | tail -n +$[$COUNT+1] | xargs rm -f
ls -t $logDir/state-change.log* | tail -n +$[$COUNT+1] | xargs rm -f
ls -t $logDir/log-cleaner.log* | tail -n +$[$COUNT+1] | xargs rm –f

清理kafka日志的脚本

赋予脚本执行权限:

[[email protected] kafka_2.11-0.9.0.1]# chmod +x clean_kafkalog.sh

周期性任务策略:每周日的0点0分去执行这个脚本。

[[email protected] logs]# crontab -e
0 0 * * 0 /usr/local/kafka_2.11-0.9.0.1/clean_kafkalog.sh

把清理日志的脚本拷贝到第二台和第三台主机:

[[email protected] kafka_2.11-0.9.0.1]# scp -p clean_kafkalog.sh [email protected]114.55.29.241:/usr/local/kafka_2.11-0.9.0.1
[[email protected] kafka_2.11-0.9.0.1]# scp -p clean_kafkalog.sh [email protected]114.55.253.15:/usr/local/kafka_2.11-0.9.0.1

6. 停止kafka命令

[[email protected] ~]# /usr/local/kafka_2.11-0.9.0.1/bin/kafka-server-stop.sh

7. 测试集群

(1)log1主机上创建一个名为test的topic

[[email protected] kafka_2.11-0.9.0.1]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

(2)log2和log3主机上利用命令行工具创建一个consumer程序

[[email protected] kafka_2.11-0.9.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
[[email protected] kafka_2.11-0.9.0.1]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

(3)log1主机上利用命令行工具创建一个producer程序

[[email protected] kafka_2.11-0.9.0.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

log1主机上终端输入message,然后到log2和log3主机的终端查看:

8. 创建生产环境topic

如果kafka集群是3台,我们创建一个名为business的Topic,如下:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic business

注意:为Topic创建分区时,--partitions(分区数)最好是broker数量的整数倍,这样才能使一个Topic的分区均匀的分布在整个Kafka集群中。

9. Kafka常用命令

(1)启动kafka

nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &

(2)查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

(3)控制台消费

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic middleware --from-beginning

(4)删除topic

  • 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录
  • 如果配置了delete.topic.enable=true直接通过命令删除,如果命令删除不掉,直接通过zookeeper-client 删除掉"/brokers/topics/"目录下相关topic节点。

注意: 如果你要删除一个topic并且重建,那么必须重新启动kafka,否则新建的topic在zookeeper的/brokers/topics/test-topic/目录下没有partitions这个目录,也就是没有分区信息。

六、安装Yahoo kafka manager

1. Yahoo kafka manager介绍

项目地址:https://github.com/yahoo/kafka-manager

Requirements:

  • Kafka 0.8.1.1 or 0.8.2.*
  • sbt 0.13.x
  • Java 8+

Kafka Manager是一个管控台,这款工具主要支持以下几个功能:

  • 管理多个不同的集群;
  • 很容易地检查集群的状态(topics, brokers, 副本的分布, 分区的分布);
  • 选择副本;
  • 产生分区分配(Generate partition assignments)基于集群的当前状态;
  • 重新分配分区。

2. 环境信息

主机名 操作系统版本  IP地址 安装软件
console CentOS 7.0 114.55.29.246 JDK1.8、kafka-manager-1.3.0.6.zip

Kafka Manager可以装在任何一台机器上,我这里部署在一台单独的机器上。

3. 安装jdk1.8

[[email protected] local]# tar zxf jdk-8u73-linux-x64.gz -C /usr/java/
[[email protected] ~]# vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_73
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
[[email protected] ~]# source /etc/profile

安装JDK8

4. 安装sbt0.13.9

[[email protected] ~]# curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
[[email protected] ~]# yum install -y sbt

安装sbt0.13.9

5. 构建kafka manager包

[[email protected] ~]# git clone https://github.com/yahoo/kafka-manager.git

[[email protected] ~]# unzip -oq kafka-manager-upgrade-to-90.zip
[[email protected] ~]# mv kafka-manager-upgrade-to-90 kafka-manager
[[email protected] ~]# cd kafka-manager
[[email protected] kafka-manager]# sbt clean dist
The command below will create a zip file which can be used to deploy the application. 

使用sbt编译打包的时候时间可能会比较长。

这个需要FQ才能完成。配置代理:

[[email protected] ~]# vim /usr/share/sbt-launcher-packaging/conf/sbtconfig.txt
-Dhttp.proxyHost=proxy
-Dhttp.proxyPort=8080

再次运行这个命令,依然需要等待较长的时间,有可能还会失败。如果失败就多次尝试打包:

打包完成后会创建一个zip压缩包,而这个压缩包可以用来部署该应用。生成的包会在kafka-manager/target/universal 下面。生成的包只需要java环境就可以运行了,在以后部署到其他机器上不需要安装sbt进行打包构建了。

6. 安装kafka manager

[[email protected] kafka-manager]# cp target/universal/kafka-manager-1.3.0.6.zip ~/
[[email protected] kafka-manager]# cd
[[email protected] ~]# unzip -oq kafka-manager-1.3.0.6.zip

安装kafka manager

7. 配置kafka-manager

[[email protected] ~]# cd kafka-manager-1.3.0.6/
[[email protected] kafka-manager-1.3.0.6]# vim conf/application.conf

设置zkhosts:

kafka-manager.zkhosts="114.55.29.246:2181,114.55.29.86:2181,114.55.29.241:2181"

8. 启动kafka-manager

[[email protected] kafka-manager-1.3.0.6]# bin/kafka-manager

默认监听的端口是9000。你也可以在启动时指定配置文件和监听端口:

# bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080

启动并置于后台运行:

[[email protected] kafka-manager-1.3.0.6]$ nohup bin/kafka-manager > /dev/null 2>&1 &

七、kafka-manager添加kafka cluster

浏览器输入地址访问:http://114.55.29.246:9000/

注意:安装完成后需要手动添加Cluster。添加Cluster是指添加一个已有的Kafka集群进入监控列表,而非通过Kafka Manager部署一个新的Kafka Cluster,这一点与Cloudera Manager不同。

时间: 2024-08-02 06:55:12

Kafka介绍及安装部署的相关文章

Storm介绍及安装部署

本节内容: Apache Storm是什么 Apache Storm核心概念 Storm原理架构 Storm集群安装部署 启动storm ui.Nimbus和Supervisor 一.Apache Storm是什么 Apache Storm是自由开源的分布式实时计算系统,擅长处理海量数据,适用于数据实时处理而非批处理. 批处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大.自动容错等优点,在海量数据处理上得到了广泛的使用.但是,hadoop不擅长实时

Zabbix介绍及安装部署

.    zabbix是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案,zabbix能监视各种网络参数,保证服务器系统的安全运营:并提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题. zabbix由2部分构成,zabbix server与可选组件zabbix agent zabbix server可以通过SNMP,zabbix agent,ping,端口监视等方法提供对远程服务器/网络状态的监视,数据收集等功能,它可以运行在Linux, Solaris,

kafka本地单机安装部署

kafka是一种高吞吐量的分布式发布订阅消息系统,这几天要上kafka,只在其中的一个节点使用,结合具体的项目实践在此将kafka的本地安装部署流程记录下来与各位同仁分享交流. 准备工作: 上述的文件除了jdk以外均放在/usr/local/kafka目录下. 1.安装jdk,kafka的使用要用到jdk 首先检查有无jdk:java -version cd /usr/local/hadoop(本例中我是将jdk的安装包放到hadoop文件夹下,各位可以依据自己情况) http://www.or

hue框架介绍和安装部署

大家好,我是来自内蒙古的小哥,我现在在北京学习大数据,我想把学到的东西分享给大家,想和大家一起学习 hue框架介绍和安装部署 hue全称:HUE=Hadoop User Experience 他是cloudera公司提供的一个web框架,和其他大数据框架整合,提供可视化界面 hue的架构 1.hue UI:hue提供一个可视化的web界面 2.hue server:hue的服务器,对外提供一个web的访问 3.hue db:存储整合框架的信息 1.Hue的介绍 HUE=Hadoop User E

Elasticsearch介绍及安装部署

本节内容: Elasticsearch介绍 Elasticsearch集群安装部署 Elasticsearch优化 安装插件:中文分词器ik 一.Elasticsearch介绍 Elasticsearch是一个分布式搜索服务,提供Restful API,底层基于Lucene,采用多shard的方式保证数据安全,并且提供自动resharding的功能,加之github等大型的站点也采用 Elasticsearch作为其搜索服务. 二.Elasticsearch集群安装部署 1. 环境信息 主机名

keepalived的介绍和安装部署

keepalived的介绍 作用 keepalived主要针对LVS群聚应用而设计的,提供故障切换和健康检查功能.在非LVS群集环境中,也可用来实现多机热备功能. 故障切换:ha fallover功能,实现LB Master和Backup主机之间故障转移和自动切换 这是针对有两个负载均衡器Director同时工作而采取的故障转移措施.当主负载均衡器(MASTER)失效或出现故障时候,备份负载均衡器(BACKUP)将自动接管主负载均衡器的所有工作:一旦主负载均衡器故障修复,主负载均衡器又会接管回它

SharePoint Online 开发篇:node.js和npm介绍和安装部署

Blog链接:https://blog.51cto.com/13969817 过去的几年中,出现的最重要的开发平台是Node.js和基于npm的开发,SharePoint Framework是这种开发方法的一个示例,本文我们来了解下Node.js.npm和Node Package Manager 是什么? Node.js,是一个建立在Google Chrome JavaScript的引擎(V8引擎)上的服务器端平台.JavaScript引擎的性能已经提高了很多,取决于你在做什么,事实上,它们的性

3_HA介绍和安装部署

一.hadoop 2.x产生背景 1.hadoop 1.x中hdfs和mr在高可用和扩展性等方面存在问题.2.hdfs存在的问题:NN单点故障,难以应用于在线场景:NN压力过大,内存受限,影响系统扩展性.3.mr存在的问题:1.x难以支持除mr之外的计算框架,如spark和storm(mr一般得到结果时间较长,storm和spark可以很快得到结果). 二.hadoop 1.x与hadoop2.x区别 1.2.x由hdfs.mr和yarn三个分支构成,yarn是分布式的资源管理器(资源包括内存,

hadoop(1)_HDFS介绍及安装部署

一.hadoop简介 1.hadoop的初衷是为了解决Nutch的海量数据爬取和存储的需要,HDFS来源于google的GFS,MapReduce来源于Google的MapReduce,HBase来源于Google的BigTable.hadoop后被引入Apache基金会. 2.hadoop两大核心设计是HDFS和MapReduce,HDFS是分布式存储系统,提供高可靠性.高扩展性.高吞吐率的数据存储服务;MapReduce是分布式计算框架,具有易于编程.高容错性和高扩展性等优点. 3.hado