Kakfa

Kakfa分布式集群搭建

本位以最新版本kafka_2.11-0.10.1.0版本讲述分布式kafka集群环境的搭建过程。服务器列表:


1

2

3

172.31.10.1

172.31.10.2

172.31.10.3

1.下载kafka安装包

登录kafka官网http://kafka.apache.org/,

  • 单击左侧“Download”按钮
  • 选择对应的版本,版本2.11代表scala版本(kafka是由scala编写的),0.10.1.0代表kafka的版本
  • 在弹出的窗口中选择下载链接即可

2.下载zookeeper安装包

kafka整体架构如下:

而kafka集群通常会依赖zookeeper的命名服务,单机版的可以直接用kafka安装包的zookeeper,而通常生产环境为保证命名服务的可用性,一般会单独搭建zookeeper集群。服务器不足可以直接和kafka broker共用服务器,zookeeper命名服务队资源要求不高。

登录zookeeper官网http://www.apache.org/dyn/closer.cgi/zookeeper/,一路选择download下载即可,本文选择稳定版zookeeper-3.4.8

3.安装zookeeper集群

将安装包zookeeper-3.4.8.tar上传至服务器172.31.10.1,

  • 解压,目录/opt/zookeeper/zookeeper-3.4.8


    1

    tar -zxvf zookeeper-3.4.8.tar

  • 配置,切换到conf目录,并更改dataDir和server.x


    1

    2

    cd /opt/zookeeper/zookeeper-3.4.8/conf

    mv zoo_sample.cfg zoo.cfg

    更改后的zoo.cfg配置如下:


    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    # The number of milliseconds of each tick

    tickTime=2000

    # The number of ticks that the initial

    # synchronization phase can take

    initLimit=10

    # The number of ticks that can pass between

    # sending a request and getting an acknowledgement

    syncLimit=5

    # the directory where the snapshot is stored.

    # do not use /tmp for storage, /tmp here is just

    # example sakes.

    dataDir=/var/logs/data/zookeeper

    # the port at which the clients will connect

    clientPort=2181

    server.1=172.31.10.1:2888:3888

    server.2=172.31.10.2:2888:3888

    server.3=172.31.10.3:2888:3888

    # the maximum number of client connections.

    # increase this if you need to handle more clients

    #maxClientCnxns=60

    #

    # Be sure to read the maintenance section of the

    # administrator guide before turning on autopurge.

    #

    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance

    #

    # The number of snapshots to retain in dataDir

    #autopurge.snapRetainCount=3

    # Purge task interval in hours

    # Set to "0" to disable auto purge feature

    #autopurge.purgeInterval=1

    其中dataDir为zookeeper目录,server.x为zookeeper服务器列表的地址和通信端口

  • 远程复制到其他两台服务器,并在dataDir目录下创建myid文件,内容为server.x中的数字。本文设置如下:

1

2

3

4

5

6

7

8

9

10

11

#172.31.10.1执行

cd /var/logs/data/zookeeper

echo "1" >  /var/logs/data/zookeeper/myid

#172.31.10.2执行

cd /var/logs/data/zookeeper

echo "2" >  /var/logs/data/zookeeper/myid

#172.31.10.3执行

cd /var/logs/data/zookeeper

echo "3" >  /var/logs/data/zookeeper/myid

  • 启动zookeeper集群和验证

1

2

3

4

5

6

7

#在每台服务器上启动zookeeper

cd /opt/zookeeper/zookeeper-3.4.8/bin

/opt/zookeeper/zookeeper-3.4.8/bin/zkServer.sh start

#查看服务器上zookeeper节点角色

cd /opt/zookeeper/zookeeper-3.4.8/bin

/opt/zookeeper/zookeeper-3.4.8/bin/zkServer.sh status

4.安装kafka集群

  • 解压,到/opt/kafka/kafka_2.11-0.10.1.0

1

2

tar -zxvf kafka_2.11-0.10.1.0.tgz

cd /opt/kafka/kafka_2.11-0.10.1.0

  • 更改conf/server.properties配置,主要是更改如下几项:

  • 1

    2

    3

    4

    broker.id=1

    host.name=172.31.10.1

    log.dirs=/var/logs/data/kafka

    zookeeper.connect=172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka

  注意每台服务器上的broker.id均不同,需要保证整个集群中唯一性

  更改后的server.properties如下:


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.

broker.id=1

# The port the socket server listens on

port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces

host.name=172.31.10.1

# Switch to enable topic deletion or not, default value is false

#delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from

# java.net.InetAddress.getCanonicalHostName() if not configured.

#   FORMAT:

#     listeners = security_protocol://host_name:port

#   EXAMPLE:

#     listeners = PLAINTEXT://your.host.name:9092

#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,

# it uses the value for "listeners" if configured.  Otherwise, it will use the value

# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://your.host.name:9092

# The number of threads handling network requests

num.network.threads=3

# The number of threads doing disk I/O

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files

log.dirs=/var/logs/data/kafka

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.

# This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync

# the OS cache lazily. The following configurations control the flush of data to disk.

# There are a few important trade-offs here:

#    1. Durability: Unflushed data may be lost if you are not using replication.

#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.

#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.

# The settings below allow one to configure the flush policy to flush data after a period of time or

# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can

# be set to delete segments after a period of time, or after a given size has accumulated.

# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens

# from the end of the log.

# The minimum age of a log file to be eligible for deletion

log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining

# segments don‘t drop below log.retention.bytes.

#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according

# to the retention policies

log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=6000

  • 同步到其他服务器,更改broker.id
    • kafka启动和验证


      1

      2

      cd /opt/kafka/kafka_2.11-0.10.1.0/bin

      nohup /opt/kafka/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh config/server.properties &

      创建topic,如能成功创建topic则表示集群安装完成,也可以用jps命令查看kafka进程是否存在。


      1

      /opt/kafka/kafka_2.11-0.10.1.0/bin/kafka-topics.sh --create --zookeeper 172.31.10.1:2181,172.31.10.2:2181,172.31.10.2:2181/kafka --replication-factor 3 --partitions 1 --topic test

      至此,kafka分布式集群安装完成,后续将深入讲解kafka其他内容。

时间: 2024-08-08 13:45:45

Kakfa的相关文章

【原创】Kakfa cluster包源代码分析

kafka.cluster包定义了Kafka的基本逻辑概念:broker.cluster.partition和replica——这些是最基本的概念.只有弄懂了这些概念,你才真正地使用kakfa来帮助完成你的需求.因为scala文件不多,还是老规矩,我们一个一个分析. 一.Broker.scala broker可以说是Kafka最基础的概念了,没有broker就没有kafka集群,更不用提负责解耦生产者和消费者了.Kafka使用了一个case class来定义Broker类.一个Broker的属性

Kakfa的设计思想

Kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统.低延迟的实时系统.storm/Spark流式处理引擎,web/nginx日志.访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目. 1.前言 消息队列的性能好坏

kakfa+zookeeper集群搭建

1:下载对应安装包 zookeeper-3.4.14.tar.gz:链接:https://pan.baidu.com/s/19_eYsv5r9bg0AKv3TzjpZQ    提取码:ik8a kafka_2.12-2.1.1.tgz:     链接:https://pan.baidu.com/s/1VcLtB17fEzFrVyvran1qhQ     提取码:e4f1 2:解压对应软件包 mkdir /data tar zxf zookeeper-3.4.14.tar.gz -C /data/

kakfa 入门

springboot官网 Apache Kafka is supported by providing auto-configuration of the spring-kafka project.  Kafka configuration is controlled by external configuration properties in spring.kafka.*. For example, you might declare the following section in .  

Kakfa分布式集群搭建

本位以最新版本kafka_2.11-0.10.1.0版本讲述分布式kafka集群环境的搭建过程.服务器列表: 172.31.10.1 172.31.10.2 172.31.10.3 1.下载kafka安装包 登录kafka官网http://kafka.apache.org/, 单击左侧"Download"按钮 选择对应的版本,版本2.11代表scala版本(kafka是由scala编写的),0.10.1.0代表kafka的版本 在弹出的窗口中选择下载链接即可 2.下载zookeeper

【原创】Kakfa log包源代码分析(一)

Kafka日志包是提供的是日志管理系统.主要的类是LogManager——该类负责处理所有的日志,并根据topic/partition分发日志.它还负责flush策略以及日志保存策略.Kafka日志本身是由多个日志段组成(log segment).一个日志是一个FileMessageSet,它包含了日志数据以及OffsetIndex对象,该对象使用位移来读取日志数据. 下面我们一个一个地分析,先说最简单的: 一.LogConfig.scala 该scala定义了Defaults object,里

【原创】Kakfa message包源代码分析

笔者最近在研究Kafka的message包代码,有了一些心得,特此记录一下.其实研究的目的从来都不是只是看源代码,更多地是想借这个机会思考几个问题:为什么是这么实现的?你自己实现方式是什么?比起人家的实现方式,你的方案有哪些优缺点? 任何消息引擎系统最重要的都是定义消息,使用什么数据结构来保存消息和消息队列?刚刚思考这个问题的时候,我自己尝试实现了一下Message的定义: public class Message implements Serializable { private CRC32

Flume+kakfa+sparkStream实时处理数据测试

flume:从数据源拉取数据 kafka:主要起到缓冲从flume拉取多了的数据 sparkStream:对数据进行处理 一.flume拉取数据 1.源数据文件读取配置 在flume目录的conf目录下配置读取数据源的配置,配置一个test.properties文件,内容如下: a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home

【原创】Kakfa network包源代码分析

kafka.network包主要为kafka提供网络服务,通常不包含具体的逻辑,都是一些最基本的网络服务组件.其中比较重要的是Receive.Send和Handler.Receive和Send封装了底层的入站(inbound)和出站(outbound)字节传输请求,而Handler在此二者间做了一个映射.一个handler就代表了一个函数,该函数接收Receive类型的对象并返回Send类型的对象.用户可以处理过冲中添加逻辑代码,并且需要自行捕获传输读写过程中的异常,将其序列化之后发送给客户端.