安装和测试Kafka

本文主要介绍如何在单节点上安装 Kafka 并测试 broker、producer 和 consumer 功能。

下载

进入下载页面:http://kafka.apache.org/downloads.html ,选择 Binary downloads下载 (Source download需要编译才能使用),这里我下载 kafka_2.11-0.8.2.1,其对应的 Scala 版本为 2.11

$ wget http://apache.fayea.com/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz

解压并进入目录:

$ tar -xzvf kafka_2.11-0.8.2.1.tgz
$ cd kafka_2.11-0.8.2.1

查看目录结构:

tree -L 2
.
├── bin
│   ├── kafka-console-consumer.sh
│   ├── kafka-console-producer.sh
│   ├── kafka-consumer-offset-checker.sh
│   ├── kafka-consumer-perf-test.sh
│   ├── kafka-mirror-maker.sh
│   ├── kafka-preferred-replica-election.sh
│   ├── kafka-producer-perf-test.sh
│   ├── kafka-reassign-partitions.sh
│   ├── kafka-replay-log-producer.sh
│   ├── kafka-replica-verification.sh
│   ├── kafka-run-class.sh
│   ├── kafka-server-start.sh
│   ├── kafka-server-stop.sh
│   ├── kafka-simple-consumer-shell.sh
│   ├── kafka-topics.sh
│   ├── windows
│   ├── zookeeper-server-start.sh
│   ├── zookeeper-server-stop.sh
│   └── zookeeper-shell.sh
├── config
│   ├── consumer.properties
│   ├── log4j.properties
│   ├── producer.properties
│   ├── server.properties
│   ├── test-log4j.properties
│   ├── tools-log4j.properties
│   └── zookeeper.properties
├── libs
│   ├── jopt-simple-3.2.jar
│   ├── kafka_2.11-0.8.2.1.jar
│   ├── kafka_2.11-0.8.2.1-javadoc.jar
│   ├── kafka_2.11-0.8.2.1-scaladoc.jar
│   ├── kafka_2.11-0.8.2.1-sources.jar
│   ├── kafka_2.11-0.8.2.1-test.jar
│   ├── kafka-clients-0.8.2.1.jar
│   ├── log4j-1.2.16.jar
│   ├── lz4-1.2.0.jar
│   ├── metrics-core-2.2.0.jar
│   ├── scala-library-2.11.5.jar
│   ├── scala-parser-combinators_2.11-1.0.2.jar
│   ├── scala-xml_2.11-1.0.2.jar
│   ├── slf4j-api-1.7.6.jar
│   ├── slf4j-log4j12-1.6.1.jar
│   ├── snappy-java-1.1.1.6.jar
│   ├── zkclient-0.3.jar
│   └── zookeeper-3.4.6.jar
├── LICENSE
└── NOTICE

4 directories, 45 files

启动和停止

运行 kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka 提供的脚本启动一个 zookeeper 实例:

$ bin/zookeeper-server-start.sh config/zookeeper.properties &

默认的,zookeeper 会监听在 *:2181/tcp

停止刚才启动的 zookeeper 实例:

$ bin/zookeeper-server-stop.sh

启动Kafka server:

$ bin/kafka-server-start.sh config/server.properties &

config/server.properties 中有一些默认的配置参数,这里仅仅列出参数,不做解释:

broker.id=0

port=9092

#host.name=localhost

#advertised.host.name=<hostname routable by clients>

#advertised.port=<port accessible by clients>

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

#log.flush.interval.messages=10000

#log.flush.interval.ms=1000

log.retention.hours=168

#log.retention.bytes=1073741824

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

log.cleaner.enable=false

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=6000

如果你像我一样是在虚拟机中测试 kafka,那么你需要修改 kafka 启动参数中 JVM 内存大小。查看 kafka-server-start.sh 脚本,修改 KAFKA_HEAP_OPTS 处 -Xmx 和 -Xms 的值。

启动成功之后,会看到如下日志:

[2015-03-17 11:19:30,528] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2015-03-17 11:19:30,604] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2015-03-17 11:19:30,605] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2015-03-17 11:19:30,687] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2015-03-17 11:19:30,756] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2015-03-17 11:19:30,887] INFO Registered broker 0 at path  /brokers/ids/0 with address cdh1:9092. (kafka.utils.ZkUtils$)
[2015-03-17 11:19:30,928] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2015-03-17 11:19:31,048] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)

从日志可以看到:

  • log flusher 有一个默认的周期值
  • kafka server 监听在9092端口
  • 在 cdh1:9092 上注册了一个 broker 0 ,路径为 /brokers/ids/0

停止 Kafka server :

$ bin/kafka-server-stop.sh

单 broker 测试

在启动 kafka-server 之后启动,运行producer:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

在另一个终端运行 consumer:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

在 producer 端输入字符串并回车,查看 consumer 端是否显示。

多 broker 测试

配置和启动 Kafka broker

接下来参考 Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node 这篇文章,基于 config/server.properties 配置文件创建多个 broker 的 kafka 集群。

创建第一个 broker:

$ cp config/server.properties config/server1.properties

编写 config/server1.properties 并修改下面配置:

broker.id=1
port=9092
log.dir=/tmp/kafka-logs-1

创建第二个 broker:

$ cp config/server.properties config/server2.properties

编写 config/server2.properties 并修改下面配置:

broker.id=2
port=9093
log.dir=/tmp/kafka-logs-2

创建第三个 broker:

$ cp config/server.properties config/server3.properties

编写 config/server3.properties 并修改下面配置:

broker.id=3
port=9094
log.dir=/tmp/kafka-logs-3

接下来分别启动这三个 broker:

$ JMX_PORT=9999  ; nohup bin/kafka-server-start.sh config/server1.properties &
$ JMX_PORT=10000  ; nohup bin/kafka-server-start.sh config/server2.properties &
$ JMX_PORT=10001  ; nohup bin/kafka-server-start.sh config/server3.properties &

下面是三个 broker 监听的网络接口和端口列表:

        Broker 1     Broker 2      Broker 3
----------------------------------------------
Kafka   *:9092/tcp   *:9093/tcp    *:9094/tcp
JMX     *:9999/tcp   *:10000/tcp   *:10001/tcp

创建 Kafka topic

在 Kafka 0.8 中有两种方式创建一个新的 topic:

  • 在 broker 上开启 auto.create.topics.enable 参数,当 broker 接收到一个新的 topic 上的消息时候,会通过 num.partitions 和 default.replication.factor 两个参数自动创建 topic。
  • 使用 bin/kafka-topics.sh 命令

创建一个名称为 zerg.hydra 的 topic:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic zerg.hydra --partitions 3 --replication-factor 2

使用下面查看创建的 topic:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list
test
zerg.hydra

还可以查看更详细的信息:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic zerg.hydra
Topic:zerg.hydra    PartitionCount:3    ReplicationFactor:2 Configs:
    Topic: zerg.hydra   Partition: 0    Leader: 2   Replicas: 2,3   Isr: 2,3
    Topic: zerg.hydra   Partition: 1    Leader: 3   Replicas: 3,0   Isr: 3,0
    Topic: zerg.hydra   Partition: 2    Leader: 0   Replicas: 0,2   Isr: 0,2

默认的,Kafka 持久化 topic 到 log.dir 参数定义的目录。

$ tree /tmp/kafka-logs-{1,2,3}
/tmp/kafka-logs-1                   # first broker (broker.id = 1)
├── zerg.hydra-0                    # replica of partition 0 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-2                    # replica of partition 2 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

/tmp/kafka-logs-2                   # second broker (broker.id = 2)
├── zerg.hydra-0                    # replica of partition 0 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-1                    # replica of partition 1 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

/tmp/kafka-logs-3                   # third broker (broker.id = 3)
├── zerg.hydra-1                    # replica of partition 1 of topic "zerg.hydra"
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
├── zerg.hydra-2                    # replica of partition 2 of topic "zerg.hydra" (this broker is leader)
│   ├── 00000000000000000000.index
│   └── 00000000000000000000.log
└── replication-offset-checkpoint

6 directories, 15 files

启动一个 producer

以 sync 模式启动一个 producer:

$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic zerg.hydra

然后,输入以下内容:

Hello, world!
Rock: Nerf Paper. Scissors is fine.

启动一个 consumer

在另一个终端运行:

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning

注意,生产环境通常不会添加 --from-beginning 参数。

观察输出,你会看到下面内容:

Hello, world!
Rock: Nerf Paper. Scissors is fine.

把 consumer 停掉再启动,你还会看到相同的输出结果。

将日志推送到 kafka

例如,将 apache 或者 nginx 或者 tomcat 等产生的日志 push 到 kafka,只需要执行下面代码即可:

$ tail -n 0 -f  /var/log/nginx/access.log | bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --sync --topic zerg.hydra
时间: 2024-08-06 07:50:36

安装和测试Kafka的相关文章

Centos下 安装和测试kafka

系统Centos6.5 工具SecureCRT 1.首先下载kafka压缩包 kafka_2.9.2-0.8.1.1.tgz 解压 tar -zxvf kafka_2.9.2-0.8.1.1.tgz 2.修改配置文件 首先要有zookeeper ,安装zookeeper 步骤在另外一随笔里 http://www.cnblogs.com/yovela/p/5178210.html 学到一个新命令:cd XXXX && ls就能进入同时查看文件目录 2.1.修改zookeeper.proper

Kafka的安装及测试

1.zokeeper的安装及配置 在zokeeper的conf目录下新建zoo.cfg文件,在里面配置如下内容 # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending

0030-如何在CDH中安装Kudu&amp;Spark2&amp;Kafka

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看. 1.概述 在CDH的默认安装包中,是不包含Kafka,Kudu和Spark2的,需要单独下载特定的Parcel包才能安装相应服务.本文档主要描述在离线环境下,在CentOS6.5操作系统上基于CDH5.12.1集群,使用Cloudera Manager通过Parcel包方式安装Kudu.Spark2和Kafka的过程. 内容概括 Kudu安装 Spark2安装 Kafka安装 服务验证 测试环境 操作系统版本:CentOS6.5 C

mosquitto在Linux环境下的部署/安装/使用/测试

mosquitto在Linux环境下的部署 看了有三四天的的源码,(当然没怎么好好看了),突然发现对mosquitto的源码有了一点点感觉,于是在第五天决定在Linux环境下部署mosquitto. 使用传统源码安装步骤: 步骤1:http://mosquitto.org/files/source/官网下载源码,放到Linux环境中.解压后,找到主要配置文件config.mk,其中包含mosquitto的安装选项,需要注意的是,默认情况下mosquitto的安装需要OpenSSL(一个强大的安全

Etcd学习(一)安装和测试

Etcd是一个比较新的分布式协调框架,现在才只到0.4.6版本,还没发布1.0版本 从网上搜etcd关键字,基本上就只能看到"开源中国"的介绍: etcd 是一个高可用的 Key/Value 存储系统,主要用于分享配置和服务发现.etcd 的灵感来自于 ZooKeeper 和 Doozer,侧重于: 简单:支持 curl 方式的用户 API (HTTP+JSON) 安全:可选 SSL 客户端证书认证 快速:单实例可达每秒 1000 次写操作 可靠:使用 Raft 实现分布式 Etcd

【MYSQL系列】【基础版】第一章 MYSQL的安装以及测试

1. MYSQL的安装以及测试     1.1 什么是数据库,有什么作用,以及有哪些特点         1.1.1 Database,DB,是一个数据的仓库:         1.1.2 用于保存.管理数据         1.1.3 特点:             1.1.3.1 一致性.完整性             1.1.3.2 降低冗余(重复)             1.1.3.3 应用的共享,以及有组织         1.1.4 数据仓库: 偏向于数据分析,是数据挖掘的一种  

Hadoop2.2.0多节点分布式安装及测试

众所周知,hadoop在10月底release了最新版2.2.很多国内的技术同仁都马上在网络上推出了自己对新版hadoop的配置心得.这其中主要分为两类: 1.单节点配置 这个太简单了,简单到只要懂点英语,照着网上说的做就ok了.我这里不谈这个,有兴趣的童鞋可以自己去问度娘和谷哥- 2.多节点配置 这个就是我要重点说明的,老实说网络上说的的确是多节点,但不是真正的分布式部署- 我们为什么要用hadoop?因为hadoop是一个分布式系统基础架构,我们可以在不了解分布式底层细节的情况下,开发分布式

OpenCV安装和测试

参考链接:http://blog.csdn.net/bruce_zeng/article/details/7961153 OpenCv下载链接:http://sourceforge.net/projects/opencvlibrary/files/opencv-win/2.2/ 下载的是 OpenCV-2.2.0-win32-vs2010.exe,傻瓜式安装.安装过程中勾选"Add OpenCV to system PATH for all users"选项.或者安装完后自己添加环境变

mycat安装和测试

mycat安装和测试 一.  环境准备 本机环境是三台centos6.5 IP 主机名 数据库名 安装软件 192.168.17.4 master db1 mycat,mysql 192.168.17.5 slave1 db2 mysql 192.168.17.6 slave2 db3 mysql 二.  安装mysql 1.安装mysql软件 linux下安装mysql有两种方式:一种是通过下载源码编译安装,一种是通过rpm包安装,如果配置了yum直接用yum安装会更快 编译安装步骤: ./c