Kafka Topic的详细信息 捎带主要的安装步骤

1. 安装步骤

Kafka伪分布式安装的思路跟Zookeeper的伪分布式安装思路完全一样,不过比Zookeeper稍微简单些(不需要创建myid文件),

主要是针对每个Kafka服务器配置一个单独的server.properties,三个服务器分别使用server.properties,server.1.properties, server.2.properties

cp server.properties server.1.properties
cp server.properties server.2.properties

修改server.1.properties和server.2.properties,主要有三个属性需要修改

broker.id=1
port=9093
log.dirs=/tmp/kafka-logs-1  

port指的是Kakfa服务器监听的端口

启动三个Kafka:

bin/kafka-server-start.sh server.properties
bin/kafka-server-start.sh server.1.properties
bin/kafka-server-start.sh server.2.properties

2. Kafka脚本常用配置参数

2.1 kafka-console-consumer.sh

--from-beginning                        If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message.

--topic <topic>                           The topic id to consume on

--zookeeper <urls>                    REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.

--group <gid>                            The group id to consume on. (default: console-consumer-37803)

在consumer端,不需要指定broke-list,而是通过zookeeper和topic找到所有的持有topic消息的broker

2.2 kafka-console-producer.sh

--topic <topic>                         REQUIRED: The topic id to produce  messages to.

--broker-list <broker-list>        REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.

2.3 kafka-topic.sh

--create                                Create a new topic.

--describe                              List details for the given topics.

--list                                  List all available topics.

--partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING:   If partitions are increased for a  topic that has a key, the partition logic or ordering of the messages will be affected)

--replication-factor <Integer: replication factor> The replication factor for each partition in the topic being created

--zookeeper <urls>                    REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.

--topic <topic>                         The topic to be create, alter or describe. Can also accept a regular expression except for --create option

3. 伪机群测试

测试前,先总结有哪些测试点

目前想到的是,Partition有个leader的概念,leader partition是什么意思?干什么用的?

3.1 创建Topic

./kafka-topics.sh --create  --topic  topic_p10_r3 --partitions 10 --replication-factor 3  --zookeeper localhost:2181  

创建一个Topic,10个Partition,副本数为3,也就是说,每个broker上的每个分区,在其它节点都有副本,因为每个节点都有10个节点的数据

3.2 每个broker创建的目录

当创建完Topic后,每个Topic都会在Kakfa的配置目录下(比如/tmp/kafka-logs,建立相应的目录和文件)

topic_p10_r3-0

topic_p10_r3-1

----

topic_p10_r3-9

其中每个目录下面都有两个文件: 00000000000000000000.index  00000000000000000000.log

3.3 Topic的详细信息

./kafka-topics.sh --describe --topic topic_p10_r3  --zookeeper localhost:2181  

得到的结果如下:

Topic:topic_p10_r3    PartitionCount:10    ReplicationFactor:3    Configs:
    Topic: topic_p10_r3    Partition: 0    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: topic_p10_r3    Partition: 1    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: topic_p10_r3    Partition: 2    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: topic_p10_r3    Partition: 3    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0
    Topic: topic_p10_r3    Partition: 4    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: topic_p10_r3    Partition: 5    Leader: 1    Replicas: 1,0,2    Isr: 1,0,2
    Topic: topic_p10_r3    Partition: 6    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1
    Topic: topic_p10_r3    Partition: 7    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: topic_p10_r3    Partition: 8    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: topic_p10_r3    Partition: 9    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0

具体的含义是:

Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

3.4 问题: 如果副本数为1,是否表示每个partition在集群中只有1份(也就是说每个partition只会存在于一个broker上),那么leader自然就表示这个partition就在leader所指的broker上了?

建立包含10个分区,同时只有一个副本的topic

./kafka-topics.sh --create  --topic  topic_p10_r1 --partitions 10 --replication-factor 1  --zookeeper localhost:2181

  

[[email protected] bin]$ ./kafka-topics.sh --describe --topic topic_p10_r1  --zookeeper localhost:2181
Topic:topic_p10_r1  PartitionCount:10   ReplicationFactor:1 Configs:
    Topic: topic_p10_r1 Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic_p10_r1 Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 3    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 4    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic_p10_r1 Partition: 5    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 6    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 7    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic_p10_r1 Partition: 8    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 9    Leader: 1   Replicas: 1 Isr: 1 

可见理解不错,每个partition有不同的leader,Leader所在的broker同时也是Replicas所在的broker(ID号一样)

因此可以理解,

1. 每个partition副本集都有一个leader

2. leader指的是partition副本集中的leader,它负责读写,然后负责将数据复制到其它的broker上。

3.一个Topic的所有partition会比较均匀的分布到多个broker上

3.5 broker挂了,Kafka的容错机制

在上面已经建立了两个Topic,一个是10个分区3个副本, 一个是10个分区1个副本。此时,假如有一个broker挂了,看看这两个Topic的容错如何?

通过jps命令可以看到有三个Kafka进程。

通过ps -ef|grep server.2.properties可以找到brokerId为2的Kakfa进程,使用kill -9将其干掉。干掉的时候,console开始刷屏,异常信息一样,都是:

[2015-02-23 02:14:00,037] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2015-02-23 02:14:00,039] ERROR [ReplicaFetcherThread-0-2], Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 4325; ClientId: ReplicaFetcherThread-0-2; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic_p10_r3,3] -> PartitionFetchInfo(0,1048576),[topic_p10_r3,9] -> PartitionFetchInfo(0,1048576),[topic_p10_r3,6] -> PartitionFetchInfo(0,1048576),[topic_p10_r3,0] -> PartitionFetchInfo(0,1048576) (kafka.server.ReplicaFetcherThread)
java.net.ConnectException: Connection refused
    at sun.nio.ch.Net.connect0(Native Method)
    at sun.nio.ch.Net.connect(Net.java:465)
    at sun.nio.ch.Net.connect(Net.java:457)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
    at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
    at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
    at kafka.consumer.SimpleConsumer.reconnect(SimpleConsumer.scala:57)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:79)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2015-02-23 02:14:00,040] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)

3,9,6,0 这个四个分区 正是topic_p10_r3上broker2作为leader的partition,可见Kafka要做Leader移交,看看此时的topic_p10_r3和topic_p10_r1的情况,我们已经把broker2 kill掉了

topic_p10_r3(Partition切换到其它Leader上了。。。Rplicas还有3,。。。)

[[email protected] bin]$ ./kafka-topics.sh --describe --topic topic_p10_r3  --zookeeper localhost:2181
Topic:topic_p10_r3  PartitionCount:10   ReplicationFactor:3 Configs:
    Topic: topic_p10_r3 Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 0,1
    Topic: topic_p10_r3 Partition: 1    Leader: 0   Replicas: 0,1,2 Isr: 0,1
    Topic: topic_p10_r3 Partition: 2    Leader: 1   Replicas: 1,2,0 Isr: 1,0
    Topic: topic_p10_r3 Partition: 3    Leader: 1   Replicas: 2,1,0 Isr: 1,0
    Topic: topic_p10_r3 Partition: 4    Leader: 0   Replicas: 0,2,1 Isr: 0,1
    Topic: topic_p10_r3 Partition: 5    Leader: 1   Replicas: 1,0,2 Isr: 1,0
    Topic: topic_p10_r3 Partition: 6    Leader: 0   Replicas: 2,0,1 Isr: 0,1
    Topic: topic_p10_r3 Partition: 7    Leader: 0   Replicas: 0,1,2 Isr: 0,1
    Topic: topic_p10_r3 Partition: 8    Leader: 1   Replicas: 1,2,0 Isr: 1,0
    Topic: topic_p10_r3 Partition: 9    Leader: 1   Replicas: 2,1,0 Isr: 1,0

topic_p10_r1:没有切换,其中分区为1,47的Leader是-1了。。 这就出错了

[[email protected] bin]$ ./kafka-topics.sh --describe --topic topic_p10_r1  --zookeeper localhost:2181
Topic:topic_p10_r1  PartitionCount:10   ReplicationFactor:1 Configs:
    Topic: topic_p10_r1 Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 1    Leader: -1  Replicas: 2 Isr:
    Topic: topic_p10_r1 Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 3    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 4    Leader: -1  Replicas: 2 Isr:
    Topic: topic_p10_r1 Partition: 5    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 6    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 7    Leader: -1  Replicas: 2 Isr:
    Topic: topic_p10_r1 Partition: 8    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 9    Leader: 1   Replicas: 1 Isr: 1  

重启broker 2得到结果如下:(对于topic_p10_r3,leader没有变化,即每个Partition都有自己的Leader,新加入的broker只能follower;而topic_p10_r1,则会选出Leader)

[[email protected] bin]$ ./kafka-topics.sh --describe --topic topic_p10_r3  --zookeeper localhost:2181
Topic:topic_p10_r3  PartitionCount:10   ReplicationFactor:3 Configs:
    Topic: topic_p10_r3 Partition: 0    Leader: 0   Replicas: 2,0,1 Isr: 0,1,2
    Topic: topic_p10_r3 Partition: 1    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    Topic: topic_p10_r3 Partition: 2    Leader: 1   Replicas: 1,2,0 Isr: 1,0,2
    Topic: topic_p10_r3 Partition: 3    Leader: 1   Replicas: 2,1,0 Isr: 1,0,2
    Topic: topic_p10_r3 Partition: 4    Leader: 0   Replicas: 0,2,1 Isr: 0,1,2
    Topic: topic_p10_r3 Partition: 5    Leader: 1   Replicas: 1,0,2 Isr: 1,0,2
    Topic: topic_p10_r3 Partition: 6    Leader: 0   Replicas: 2,0,1 Isr: 0,1,2
    Topic: topic_p10_r3 Partition: 7    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2
    Topic: topic_p10_r3 Partition: 8    Leader: 1   Replicas: 1,2,0 Isr: 1,0,2
    Topic: topic_p10_r3 Partition: 9    Leader: 1   Replicas: 2,1,0 Isr: 1,0,2
[[email protected] bin]$ ./kafka-topics.sh --describe --topic topic_p10_r1  --zookeeper localhost:2181
Topic:topic_p10_r1  PartitionCount:10   ReplicationFactor:1 Configs:
    Topic: topic_p10_r1 Partition: 0    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 1    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic_p10_r1 Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 3    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 4    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic_p10_r1 Partition: 5    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 6    Leader: 1   Replicas: 1 Isr: 1
    Topic: topic_p10_r1 Partition: 7    Leader: 2   Replicas: 2 Isr: 2
    Topic: topic_p10_r1 Partition: 8    Leader: 0   Replicas: 0 Isr: 0
    Topic: topic_p10_r1 Partition: 9    Leader: 1   Replicas: 1 Isr: 1 

原文地址:https://www.cnblogs.com/jack-Star/p/9927557.html

时间: 2024-08-08 09:17:53

Kafka Topic的详细信息 捎带主要的安装步骤的相关文章

超详细Centos6.5文本模式安装步骤

对于刚接触Linux的用户来说,安装系统和配置网卡的过程也可能要用很长的时间,Centos6.5的安装方式有二种,图形模式和文本模式.文本模式从6开始就不支持自定义分区了(新手练习时默认分区足够用),如果需要自定义分区要使用图形模式安装.大家在学习Linux时,不需要一开始就在物理机上安装,下面就使用VMware Workstation虚拟机软件演示一下文本模式安装. 演示环境所需要的软件: 虚拟机版本:VMware Workstation12 Linux版本:CentOS-6.5-x86_64

Kafka topic常见命令解析

本文着重介绍几个常用的topic命令行命令,包括listTopic,createTopic,deleteTopic和describeTopic等.由于alterTopic并不是很常用,本文中就不涉及了.另外本文的代码分析是基于kafka_2.10-0.8.2.1的(虽然截图是Kafka 0.8.1的^_^ )   一. list topic 显示所有topic 1. 从zookeeper的/brokers/topics节点下获取所有topic封装成topic集合 2. 遍历该集合,查看每个top

Kafka Topic Partition Replica Assignment实现原理及资源隔离方案

本文共分为三个部分: Kafka Topic创建方式 Kafka Topic Partitions Assignment实现原理 Kafka资源隔离方案 1. Kafka Topic创建方式 Kafka Topic创建方式有以下两种表现形式: (1)创建Topic时直接指定Topic Partition Replica与Kafka Broker之间的存储映射关系 /usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeep

Add an Editor to a Detail View 将编辑器添加到详细信息视图

In this lesson, you will learn how to add an editor to a Detail View. For this purpose, the Department.Office property will be added to the Contact Detail View. You will also learn how to change the layout of a Detail View's editors. 在本课中,您将学习如何将编辑器添

javascript页面详细信息的显示和隐藏

页面详细信息的显示和隐藏 <!DOCTYPE html> <html> <head> <title></title> </head> <body> <script type="text/javascript"> function show_detail(detail){ var p=document.getElementById('p'); if(detail.open){ p.style.

微信网页授权认证获取用户的详细信息,实现自动登陆-微信公众号开发干货

原创声明:本文为本人原创作品,绝非他处转账,转载请联系博主 从接触公众号到现在,开发维护了2个公众号,开发过程中遇到很多问题,现在把部分模块功能在这备案一下,做个总结也希望能给其他人帮助 工欲善其事,必先利其器,先看看开发公众号需要准备或了解什么 web开发工具:官方提供的开发工具,使用自己的微信号来调试微信网页授权.调试.检验页面的 JS-SDK 相关功能与权限,模拟大部分 SDK 的输入和输出.下载地址:web开发工具下载 开发文档:https://mp.weixin.qq.com/wiki

VCF文件详细信息

Variant Call Format(VCF)是一个用于存储基因序列突变信息的文本格式.表示单碱基突变, 插入/缺失, 拷贝数变异和结构变异等.BCF格式文件是VCF格式的二进制文件. CHROM [chromosome]: 染色体名称. POS [position]: 参考基因组突变碱基位置,如果是INDEL(插入缺失),位置是INDEL的第一个碱基位置. ID [identifier]: 突变的名称.若没有,则用'.'表示其为一个新变种. REF [reference base(s)]:

异常详细信息: System.Data.SqlClient.SqlException:用户 &#39;IIS APPPOOL\DefaultAppPool&#39; 登录失败解决办法

1.安全性---登录名---新建登录名 2.常规----搜索 3.添加SERVICE用户-- 4.服务器角色---勾上sysadmin: IIS中: 应用程序池---对应的程序池上右键---高级设置 进程模块---标识---选择NetworkService(与数据库中设置统一) 异常详细信息: System.Data.SqlClient.SqlException:用户 'IIS APPPOOL\DefaultAppPool' 登录失败解决办法

高德地图返回地址详细信息

个人习惯,上图 关于高德地图自动定位返回地址详细信息一直没写,一方面太忙了(也可以说太懒了),另一方面这个地方的内容太少,因为项目后面会用到快速搜索提示,往地图中添加marker.以及导航以及语音提示等等 本来想等项目上线在好好总结一下,算了不想拖了, 上代码: package com.example.mydemo; import android.app.Activity;import android.location.Location;import android.os.Bundle;impo