Kafka 集群配置SASL+ACL

** Kafka 集群配置SASL+ACL

测试环境:**

                    系统: CentOS 6.5 x86_64
                    JDK : java version 1.8.0_121
                    kafka: kafka_2.11-1.0.0.tgz
                    zookeeper: 3.4.5
                    ip: 192.168.49.161 (我们这里在一台机上部署整套环境)

kafka 名词解析:

                    Broker: Kafka 集群包含一个或多个服务器,这种服务器被称为broker
                    Topic: 每条发布到Kafka 集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic 的消息
                    分开存储,逻辑上一个Topic 的消息虽然保存于一个或多个broker 上但用户只需指定消息的Topic 即可生产或
                    消费数据而不必关心数据存于何处)
                    Partition: Parition 是物理上的概念,每个Topic 包含一个或多个Partition
                    Producer: 负责发布消息到Kafka broker 即生产者
                    Consumer: 消息消费者,向Kafka broker 读取消息的客户端即消费者
                    Consumer Group:每个Consumer 属于一个特定的Consumer Group(可为每个Consumer 指定group
                    name,若不指定group name 则属于默认的group)

Kafka 拓扑结构(借用别人的图)

一个典型的Kafka 集群中包含若干Producer(可以是web 前端产生的Page View,或者是服务器日志,系统 CPU、Memory 等),若干broker(Kafka 支持水平扩展,一般broker 数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper 集群。Kafka 通过Zookeeper 管理集群配置,选举leader,以及在Consumer Group 发生变化时进行rebalance。Producer 使用push 模式将消息发布到broker,Consumer 使用pull 模式从broker 订阅并消费消息

kafka 集群部署
一 Zookeeper 集群部署(这里部署的伪集群)

  1. zookeeper 部署比较简单,解压修改配置文件后即可使用, 默认的配置文件为zoo_sample.cfg 这里我们直接新建一个zoo.cfg 的文件内容如下:

各参数意义这里不作详细解释,想了解可以自行参考官网

                                        tickTime=2000
                                        initLimit=5
                                        syncLimit=2
                                        dataDir=/opt/zook/zookeeper1/data //指定数据路径
                                        dataLogDir=/opt/zook/zookeeper1/logs //指定日志路径
                                        clientPort=2181 //监听端口
                                        server.1=127.0.0.1:7771:7772
                                        server.2=127.0.0.1:7771:7772
                                        server.3=127.0.0.1:7771:7772
  2. 在zookeeper/data 下创建一个名为myid 的文件,并在文件内输入数字1
 3. 集群部署则将上述配置的单节点再复制两份,并将配置文件中
                            ```
                            dataDir , dataLogDir 两个参数根据实际路径修改
                            clientPort 分别配为2182,2183
                            myid 分别配置为2 3
                            ```
  4. 启动集群

二 Kafka 集群部署

 由于集群最终是对外部网络开放的,出于安全考虑本文采用的是SASL+ACL 控制和授权

1. broker 配置
1.1. 将配置server.properties 复制2 份并分别重命名成server-1.properties server-2.properties server-3.properties 修改server.properties 配置:(请根据实际环境填写)

                                                                                                                    broker.id = 1 //另外两个节点分别为2 3
                                                                                                                    host.name=192.168.49.161
                                                                                                                    log.dirs=/tmp/kafka-logs-1 //另外两个节点分别为kafka-logs-2 kafka-logs-3 可以不
                                                                                                                    放在/tmp 下,如果放在其他地方必须注意目录权限
                                                                                                                    zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183
                                                                                                                    port=9092 //另外两个节点分别为9093 9094 也可以自定义
                                                                                                                    listeners=SASL_PLAINTEXT://192.168.49.161:9092 //另外两个节点分别为9093 9094
                                    到这里基础配置已完成
                    1.2    要配置SASL 和ACL,需要在broker 端进行两个方面的设置。
               首先是创建包含所有认证用户信息的JAAS 文件,本例中我们有admin qjld 两个用户,其中admin 为管理员用于broker 集群间的认证, qjld 为远程应用的认证用户, 新增认证信息文件
                                                                                                                        ```
                                                                                                                        kafka_server_jaas.conf 内容为:
                                                                                                                        KafkaServer {
                                                                                                                        org.apache.kafka.common.security.plain.PlainLoginModule required
                                                                                                                        username="admin"
                                                                                                                        password="admin-mgr998778123"
                                                                                                                        user_admin="admin-mgr998778123"
                                                                                                                        user_qjld="123456";
                                                                                                                        };
                                                                                                                        ```
                                                                        本例中路径为: /opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf , 我们需要将配置文件中的内容传递给JVM,因此需要修改/opt/kafka_2.11-                                                         1.0.0/bin/kafka-server-start.sh 脚本文件如下:
                                                                    ```
                                                                    在exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "[email protected]" 之前一行添加export KAFKA_OPTS="-                                  Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_server_jaas.conf"
                                                                    然后保存退出。
                                                                    ```
                            其次我们要在所有broker 配置文件(server-x.properties)中增加:
                                                                                            ```
                                                                                            #ACL 入口类
                                                                                            authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
                                                                                            #SASL_PLAINTEXT
                                                                                            sasl.enabled.mechanisms=PLAIN
                                                                                            security.inter.broker.protocol=SASL_PLAINTEXT
                                                                                            sasl.mechanism.inter.broker.protocol=PLAIN
                                                                                            allow.everyone.if.no.acl.found=true
                                                                                            #设置admin 为超级用户
                                                                                            super.users=User:admin
                                                                                            ```
                            现在我们可以启动broker 测试了
                                                                                        ```
                                                                                        bin/kafka-server-start.sh config/server-1.properties &
                                                                                        bin/kafka-server-start.sh config/server-2.properties &
                                                                                        bin/kafka-server-start.sh config/server-3.properties &
                                                                                        ```
           启动成功,此时broker 可以接收已认证的客户端连接了,下面我们进行客户端配置
                                                                        ```
                                                                        //创建一个topic 名为test
                                            bin/kafka-topics.sh--create --zookeeper 192.168.49.161:2181,192.168.49.161:2182, 192.168.49.161:2183 --replication-factor 1 --partitions 1 --topic test
                                                                        //查看topic 是否创建成功
                                                                        bin/kafka-topics.sh --list --zookeeper 192.168.49.161:2181
                                                                        ```
  **2. Client 配置**
            2.1 新增一个配置文件如:/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf,内容为:
                                                                                                                        ```
                                                                                                                        KafkaClient {
                                                                                                                        org.apache.kafka.common.security.plain.PlainLoginModule required
                                                                                                                        username="qjld"
                                                                                                                        password="123456";
                                                                                                                        };
                                                                                                                        ```

同理我们也要将配置文件内容传递给JVM, 因此需要修改

                                                                /opt/kafka_2.11-1.0.0/bin/kafka-console-producer.sh 脚本文件如下:在exec $base_dir/kafka-run-class.sh
                                                                $EXTRA_ARGS kafka.Kafka "[email protected]" 之前一行添加export
                                                                KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka_2.11-1.0.0/config/kafka_client_jaas.conf"
                                                                同样的方法修改kafka-console-consumer.sh
                            2.2 需要在config/consumer.properties 和config/producer.properties 配置文件中追加:
                                                                ```
                                                                security.protocol=SASL_PLAINTEXT
                                                                sasl.mechanism=PLAIN
                                                                ```
                                    这两行配置, 完成后再试运行:
                                                                ```
                                                                bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test                                                                 --producer.config config/producer.properties
                                                                bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test
                                                                --from-beginning  --consumer.config config/consumer.properties
                                                                ```
                                    接下来测试消息的写入和读取, 开启两个终端分别输入下列
                                                            ```
                                                            bin/kafka-console-producer.sh --broker-list 192.168.49.161:9092 --topic test //消息写入
                                                            bin/kafka-console-consumer.sh --bootstrap-server 192.168.49.161:9092 --topic test  --from-beginning 消息读取
                                                            ```
               如果报错信息如下:
                           `WARN Bootstrap broker 192.168.49.161:9092 disconnected (org.apache.kafka.clients.NetworkClient)`
               则说明配置的security 已生效, 要想普通用户能读写消息,需要配置ACL
         2.3 配置ACL
                                                            ```
                                                            bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties
                                                            zookeeper.connect=192.168.49.161:2181,192.168.49.161:2182,192.168.49.161:2183 --add
                                                            --allow-principal User:qjld --operation All --topic test
                                                            ```
             这是为qjld 这个用户配置所有权限,如果有更细的要求则可参考网上的,如:Read Write 等只需要将--operation all 改为--operation Read 多种则在其后面加一个--         operation write 即可

三 测试
这里我选用python2.7 编写的脚本进行测试
先安装pip27 install kafka-python

原文地址:http://blog.51cto.com/xiaoyouyou/2061143

时间: 2024-09-29 09:38:19

Kafka 集群配置SASL+ACL的相关文章

ELK5.3+Kafka集群配置

[一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10.101.2.24 10.101.2.25 # 2台4C*8G, 安装Logstash--Indexer(input: Kafaka; output: Elasticsearch) 10.101.2.26 10.101.2.27 # 3台8C*16G, 安装Elasticsearch 10.101.

kafka集群配置与测试

刚接触一些Apache Kafka的内容,用了两天时间研究了一下,仅以此文做相关记录,以供学习交流.  概念: kafka依赖的项: 1. 硬件上,kafka利用线性存储来进行硬盘直接读写. 2. kafka没有使用内存作为缓存. 3. 用zero-copy. 4. Gzip和Snappy压缩, 5. kafka对事务处理比较弱,但是message分发上还是做了一定的策略来保证数据递送的准确性的. kafka关于存储的几个概念 1. Partition:同一个topic下可以设置多个partit

HyperLedger Fabric基于zookeeper和kafka集群配置解析

简述 在搭建HyperLedger Fabric环境的过程中,我们会用到一个configtx.yaml文件(可参考Hyperledger Fabric 1.0 从零开始(八)--Fabric多节点集群生产部署),该配置文件主要用于构建创世区块(在构建创世区块之前需要先创建与之对应的所有节点的验证文件集合),其中在配置Orderer信息中有一个OrdererType参数,该参数可配置为"solo" and "kafka",之前博文所讲的环境配置皆是solo,即单节点共

kafka集群配置和java编写生产者消费者操作例子

kafka 安装 修改配置文件 java操作kafka kafka kafka的操作相对来说简单很多 安装 下载kafka http://kafka.apache.org/downloads tar -zxvf kafka_2.12-2.1.0.tgz rm kafka_2.12-2.1.0.tgz mv kafka_2.12-2.1.0 kafka sudo vim /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PAT

Kafka集群配置

kafka_2.11-0.9.0.1.tgz 1.进入项目前的目录 cd /home/dongshanxia mkdir kafka #创建项目目录 cd kafka #进入项目目录 mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息 2.进入配置文件目录 cd /home/dongshanxia/kafka/kafka_2.11-0.9.0.1/config //打开配置文件 vim server.properties ----------------------

Kafka详解之二、如何配置Kafka集群

Kafka集群配置比较简单,为了更好的让大家理解,在这里要分别介绍下面三种配置 单节点:一个broker的集群 单节点:多个broker的集群 多节点:多broker集群 一.单节点单broker实例的配置 1.首先启动zookeeper服务 Kafka本身提供了启动zookeeper的脚本(在kafka/bin/目录下)和zookeeper配置文件(在kafka/config/目录下),首先进入Kafka的主目录(可通过 whereis kafka命令查找到): [[email protect

Zookeeper+Kafka集群部署

Zookeeper+Kafka集群部署 主机规划: 10.200.3.85  Kafka+ZooKeeper 10.200.3.86  Kafka+ZooKeeper 10.200.3.87  Kafka+ZooKeeper 软件下载地址: #wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz #wget http://mirror.bit.edu.cn/apache/

Kafka单节点及集群配置安装

一.单节点 1.上传Kafka安装包到Linux系统[当前为Centos7]. 2.解压,配置conf/server.property. 2.1配置broker.id 2.2配置log.dirs 2.3配置zookeeper.connect 3.启动Zookeeper集群 备注:zookeeper集群启动时,先启动的节点因节点启动过少而出现not running这种情况,是正常的,把所有节点都启动之后这个情况就会消失! 3.启动Kafka服务 执行:./kafka-server-start.sh

kafka集群精华之-----安装配置、扩容、监控

kafka集群细节支出折磨我死去活来,查看众多文档进度缓慢,内容重复,所以站在大佬的肩膀上 整理文档记录它. kafka说明: 一个Topic可以认为是一类消息,每个topic将被分成多个partition(区),,此外kafka还可以配置partitions需要备份的个数(replicas).有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定. kafka是通过zookeeper