Kafka broker配置介绍 (四)

这部分内容对了解系统和提高软件性能都有很大的帮助,kafka官网上也给出了比较详细的配置详单,但是我们还是直接从代码来看broker到底有哪些配置需要我们去了解的,配置都有英文注释,所以每一部分是干什么的就不翻译了,都能看懂:

?


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

/**

 * Licensed to the Apache Software Foundation (ASF) under one or more

 * contributor license agreements.  See the NOTICE file distributed with

 * this work for additional information regarding copyright ownership.

 * The ASF licenses this file to You under the Apache License, Version 2.0

 * (the "License"); you may not use this file except in compliance with

 * the License.  You may obtain a copy of the License at

 *

 *    http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package kafka.server

import java.util.Properties

import kafka.utils.{Utils, ZKConfig}

import kafka.message.Message

/**

 * Configuration settings for the kafka server

 */

class KafkaConfig(props: Properties) extends ZKConfig(props) {

  /* the port to listen and accept connections on */

  val port: Int = Utils.getInt(props, "port", 6667)

  /* hostname of broker. If not set, will pick up from the value returned from getLocalHost. If there are multiple interfaces getLocalHost may not be what you want. */

  val hostName: String = Utils.getString(props, "hostname", null)

  /* the broker id for this server */

  val brokerId: Int = Utils.getInt(props, "brokerid")

  

  /* the SO_SNDBUFF buffer of the socket sever sockets */

  val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)

  

  /* the SO_RCVBUFF buffer of the socket sever sockets */

  val socketReceiveBuffer: Int = Utils.getInt(props, "socket.receive.buffer", 100*1024)

  

  /* the maximum number of bytes in a socket request */

  val maxSocketRequestSize: Int = Utils.getIntInRange(props, "max.socket.request.bytes", 100*1024*1024, (1, Int.MaxValue))

  /* the maximum size of message that the server can receive */

  val maxMessageSize = Utils.getIntInRange(props, "max.message.size", 1000000, (0, Int.MaxValue))

  /* the number of worker threads that the server uses for handling all client requests*/

  val numThreads = Utils.getIntInRange(props, "num.threads", Runtime.getRuntime().availableProcessors, (1, Int.MaxValue))

  

  /* the interval in which to measure performance statistics */

  val monitoringPeriodSecs = Utils.getIntInRange(props, "monitoring.period.secs", 600, (1, Int.MaxValue))

  

  /* the default number of log partitions per topic */

  val numPartitions = Utils.getIntInRange(props, "num.partitions", 1, (1, Int.MaxValue))

  

  /* the directory in which the log data is kept */

  val logDir = Utils.getString(props, "log.dir")

  

  /* the maximum size of a single log file */

  val logFileSize = Utils.getIntInRange(props, "log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))

  /* the maximum size of a single log file for some specific topic */

  val logFileSizeMap = Utils.getTopicFileSize(Utils.getString(props, "topic.log.file.size", ""))

  /* the maximum time before a new log segment is rolled out */

  val logRollHours = Utils.getIntInRange(props, "log.roll.hours", 24*7, (1, Int.MaxValue))

  /* the number of hours before rolling out a new log segment for some specific topic */

  val logRollHoursMap = Utils.getTopicRollHours(Utils.getString(props, "topic.log.roll.hours", ""))

  /* the number of hours to keep a log file before deleting it */

  val logRetentionHours = Utils.getIntInRange(props, "log.retention.hours", 24*7, (1, Int.MaxValue))

  /* the number of hours to keep a log file before deleting it for some specific topic*/

  val logRetentionHoursMap = Utils.getTopicRetentionHours(Utils.getString(props, "topic.log.retention.hours", ""))

  

  /* the maximum size of the log before deleting it */

  val logRetentionSize = Utils.getLong(props, "log.retention.size", -1)

  /* the maximum size of the log for some specific topic before deleting it */

  val logRetentionSizeMap = Utils.getTopicRetentionSize(Utils.getString(props, "topic.log.retention.size", ""))

  /* the frequency in minutes that the log cleaner checks whether any log is eligible for deletion */

  val logCleanupIntervalMinutes = Utils.getIntInRange(props, "log.cleanup.interval.mins", 10, (1, Int.MaxValue))

  

  /* enable zookeeper registration in the server */

  val enableZookeeper = Utils.getBoolean(props, "enable.zookeeper", true)

  /* the number of messages accumulated on a log partition before messages are flushed to disk */

  val flushInterval = Utils.getIntInRange(props, "log.flush.interval", 500, (1, Int.MaxValue))

  /* the maximum time in ms that a message in selected topics is kept in memory before flushed to disk, e.g., topic1:3000,topic2: 6000  */

  val flushIntervalMap = Utils.getTopicFlushIntervals(Utils.getString(props, "topic.flush.intervals.ms", ""))

  /* the frequency in ms that the log flusher checks whether any log needs to be flushed to disk */

  val flushSchedulerThreadRate = Utils.getInt(props, "log.default.flush.scheduler.interval.ms"3000)

  /* the maximum time in ms that a message in any topic is kept in memory before flushed to disk */

  val defaultFlushIntervalMs = Utils.getInt(props, "log.default.flush.interval.ms", flushSchedulerThreadRate)

   /* the number of partitions for selected topics, e.g., topic1:8,topic2:16 */

  val topicPartitionsMap = Utils.getTopicPartitions(Utils.getString(props, "topic.partition.count.map", ""))

  /* the maximum length of topic name*/

  val maxTopicNameLength = Utils.getIntInRange(props, "max.topic.name.length", 255, (1, Int.MaxValue))

}

上面这段代码来自kafka.server包下的KafkaConfig类,之前我们就说过,broker就是kafka中的server,所以讲配置放在这个包中也不奇怪。这里我们顺着代码往下读,也顺便看看scala的语法。和java一样也要import相关的包,kafka将同一包内的两个类写在大括号中:

?


1

import kafka.utils.{Utils, ZKConfig}

然后我们看类的写法:

?


1

class KafkaConfig(props: Properties) extends ZKConfig(props)

我们看到在加载kafkaConfig的时候会加载一个properties对象,同时也会加载有关zookeeper的properties,这个时候我们可以回忆一下,之前我们启动kafka broker的命令:

1.  启动zookeeper server :bin/zookeeper-server-start.sh ../config/zookeeper.properties  & (用&是为了能退出命令行)

2.  启动kafka server:  bin/kafka-server-start.sh ../config/server.properties  &

所以你能明白,初始化kafka broker的时候程序一定是去加载位于config文件夹下的properties,这个和java都一样没有区别。当然properties我们也可以通过程序来给出,这个我们后面再说,继续看我们的代码。既然找到了对应的properties文件,我们就结合代码和properties一起来看。

Kafka broker的properties中,将配置分为以下六类:

l  Server Basics:关于brokerid,hostname等配置

l  Socket Server Settings:关于传输的配置,端口、buffer的区间等。

l  Log Basics:配置log的位置和partition的数量。

l  Log Flush Policy:这部分是kafka配置中最重要的部分,决定了数据flush到disk的策略。

l  Log Retention Policy:这部分主要配置日志处理时的策略。

l  Zookeeper:配置zookeeper的相关信息。

在文件properties中的配置均出现在kafkaConfig这个类中,我们再看看kafkaConfig中的代码:

?


1

2

3

4

5

/* the broker id for this server */

  val brokerId: Int = Utils.getInt(props, "brokerid")

  

  /* the SO_SNDBUFF buffer of the socket sever sockets */

  val socketSendBuffer: Int = Utils.getInt(props, "socket.send.buffer", 100*1024)

凡是参数中有三个的,最后一个是default,而参数只有两个的则要求你一定要配置,否则的话则报错。当然在这么多参数中肯定是有一些经验参数的,至于这些参数怎么配置我确实没有一个特别的推荐,需要在不断的测试中才能磨合出来。

当然你也可以将配置写在程序里,然后通过程序去启动broker,这样kafka的配置就可以像下面一样写:

?


1

2

3

Properties props = new Properties();

props.setProperty("port","9093");

props.setProperty("log.dir","/home/kafka/data1");

我倒是觉得配置还是直接写在配置文件中比较好,如果需要修改也不会影响正在运行的服务,写在内存中,总是会有些不方便的地方。所以还是建议大家都写配置好了,后面讲到的producer和consumer都一样。

这里再提两个参数一个是brokerid,每个broker的id必须要区分;第二个参数是hostname,这个是broker和producer、consumer联系的关键,这里记住一定要改成你的地址和端口,否则永远连得都是localhost。

--------------------------------------------------------

下一篇将写producer和consumer的配置了,涉及到这部分就要开始编程了,写着写着又往源码里看进去了,下篇会先讲如何搭建开发环境,然后再写两个简单那的例子去熟悉配置。

时间: 2024-08-28 03:04:44

Kafka broker配置介绍 (四)的相关文章

_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)

博文作者:妳那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Kfaka的体系结构 # 学习前言 Kafka的整个学习过程就是自己看官网的文档,出

kafka设计原理介绍

背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统.主要设计目标如下: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输 同时支持离线数据处理和实时数据处理 为什么要用消息系统 解耦在项目启动之初来预测将来项目会碰到什么需求,是极其困难的.消息队

Kafka详细配置

转自:http://blog.csdn.net/suifeng3051/article/details/38321043?utm_source=tuicool&utm_medium=referral Kafka集群配置比较简单,为了更好的让大家理解,在这里要分别介绍下面三种配置 单节点:一个broker的集群 单节点:多个broker的集群 多节点:多broker集群 单节点单broker实例的配置 1.启动zookeeper服务 bin/zkServer.sh start 2.启动Kafka

Kafka设计解析(四)- Kafka Consumer设计解析

本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/08/09/KafkaColumn4 摘要 本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景.以及未来版本中对High Level Consumer的重新设计–使用Consumer Coordinator解决Split Brain和Herd等问题. H

kafka主要配置

Kafka为broker,producer和consumer提供了很多的配置参数. 了解并理解这些配置参数对于我们使用kafka是非常重要的. 官网配置地址: Configuration 每个kafka broker中配置文件server.properties默认必须配置的属性如下: 1 broker.id=0 2 port=9092 3 num.network.threads=2 4 num.io.threads=8 5 socket.send.buffer.bytes=1048576 6 s

[Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset存

Redis 的安装配置介绍

redis 是一个高性能的key-value数据库. redis的出现,很大程度补偿了memcached这类keyvalue存储的不足,在部 分场合可以对关系数据库起到很好的补充作用.它提供了Python,Ruby,Erlang,PHP客户端,使用很方便.问题是这个项目还很新,可能还不足够稳定,而且没有在实际的一些大型系统应用的实例.此外,缺乏mc中批量get也是比较大的问题,始终批量获取跟多次获取的网络开销是不一样的. 性能测试结果: SET操作每秒钟 110000 次,GET操作每秒钟 81

Apache kafka 工作原理介绍

消息队列 消息队列技术是分布式应用间交换信息的一种技术.消息队列可驻留在内存或磁盘上, 队列存储消息直到它们被应用程序读走.通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置.或在继续执行前不需要等待接收程序接收此消息.在分布式计算环境中,为了集成分布式应用,开发者需要对异构网络环境下的分布式应用提供有效的通信手段.为了管理需要共享的信息,对应用提供公共的信息交换机制是重要的.常用的消息队列技术是 Message Queue. Message Queue 的通讯模式 点对点通讯:点对

转:Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个 Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset