Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计
- Kafka将消息以topic为单位进行归纳。
- 将向Kafka topic发布消息的程序成为producers.
- 将预订topics并消费消息的程序成为consumer.
- Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
下面来看下如何简单的使用:
首先,去官网下载kakfa的安装包 http://kafka.apache.org/downloads.html
下载完了自后,tar -zxvf *.tar.gz 解压,目录如下:
先来看下bin目录下:
从这些脚本可以看出kafka本身是结合了zookeeper来使用的
Zookeeper 协调控制
1. 管理broker与consumer的动态加入与离开。
2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一
个consumer group内的多个consumer的订阅负载平衡。
3. 维护消费关系及每个partion的消费信息。
Zookeeper上的细节:
1. 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
2. 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
3. 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。
对于入门,我们就暂且使用kafka安装包中自带的zookeeper程序,这个程序在libs库中
可以看到这里有zkclient和zookeeper的依赖包,所以当我们使用该kafka程序的时候,得先启动zookeeper
再来看下config下面是什么东东:
这里就是一些具体的配置信息了
我们来看一个producer.properties
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.producer.ProducerConfig for more details ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... metadata.broker.list=localhost:9092 # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync # specify the compression codec for all data generated: none , gzip, snappy. # the old config values work as well: 0, 1, 2 for none, gzip, snappy, respectivally compression.codec=none # message encoder serializer.class=kafka.serializer.DefaultEncoder # allow topic level compression #compressed.topics= ############################# Async Producer ############################# # maximum time, in milliseconds, for buffering data on the producer queue #queue.buffering.max.ms= # the maximum size of the blocking queue for buffering on the producer #queue.buffering.max.messages= # Timeout for event enqueue: # 0: events will be enqueued immediately or dropped if the queue is full # -ve: enqueue will block indefinitely if the queue is full # +ve: enqueue will block up to this many milliseconds if the queue is full #queue.enqueue.timeout.ms= # the number of messages batched at the producer #batch.num.messages=
这里配置了服务地址,消息发送的类型,同步还是异步,是否压缩,消息编码等信息
下面我们就来用一下,展示一下:
1、启动zookeeper
2、启动kafka
3、都启动成功了,我们就生产消费吧
创建topic
[[email protected] bin]# sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic CMCC Created topic "CMCC".
生产信息
[[email protected] bin]# sh kafka-console-producer.sh --broker-list localhost:9092 --topic CMCC SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Hwl^H^H Hwll Hello Kafka !
消费信息
[[email protected] bin]# sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic CMCC --from-beginning SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Hwl Hwll Hello Kafka !
至此,一个简单的DEMO就演示结束了,下面看看一个简单的集群怎么来玩??
首先,我们要再配置两个服务
第一个
broker.id=1 port=9093 log.dirs=/tmp/kafka-logs-1
第二个
broker.id=2 port=9094 log.dirs=/tmp/kafka-logs-2
可以看到这两个配置和初始的server.properties结构一模一样,只是文件中的属性值变了而已,kafka中通过brokder.id来唯一标识集群中的一个服务,所以我们基本是修改了配置中的broker.id
port log.dirs三个属性
下面将这个两个服务也启动起来
sh bin/kafka-server-start.sh config/server-1.properties &
sh bin/kafka-server-start.sh config/server-2.properties &
下面创建topic
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic chiwei
看下topic描述
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei Topic:chiwei PartitionCount:1 ReplicationFactor:3 Configs: Topic: chiwei Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 [[email protected] kafka_2.10-0.8.1.1]#
官方描述
- "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
- "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
还记得我们第一次创建的topic,看看他的描述
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic CMCC Topic:CMCC PartitionCount:1 ReplicationFactor:1 Configs: Topic: CMCC Partition: 0 Leader: 0 Replicas: 0 Isr: 0
下面我们开始生产消息
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic chiwei SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Hello Chiwei !
消费信息
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Hello Chiwei !
成功了!
下面我们来测试一下这个集群的容错能力怎么样?
刚才我们看到主题描述的信息显示brokder.id=2是leader,那么我现在把leader进程给杀了,来看看什么情况啊?
ps -ef | grep server-2
kill -9 9663
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic chiwei Topic:chiwei PartitionCount:1 ReplicationFactor:3 Configs: Topic: chiwei Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0 [[email protected] kafka_2.10-0.8.1.1]#
leader已经被换了,同时我们注意到replicas已然显示有2,而isr没有2了,因为isr显示的是“in-sync”的服务id,而2已经不再同步服务了;而replicas为什么还显示2呢
- "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
这句话就说明原因了“even if they are currently alive”
这时候我们再去消费刚才主题的消息看看
[[email protected] kafka_2.10-0.8.1.1]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic chiwei SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Hello Chiwei !
已然存在!说明消息同步到了集群的每个服务节点上。