kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

目录

  • kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用
  • 一.访问的kafka的一些配置(已集成kerberos )
  • 二.Shell 命令行使用Kafka(已集成sentry)
  • 三.代码访问(java)

kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用

一.访问的kafka的一些配置(已集成kerberos )

由于kafka集成了kerberos 所以需要通过kerberos的认证

认证方式有两种

  • 1.通过配置文件
  • 2.通过keytab文件

我们这里采用第一种

首先先在目录/usr/local/kafka_client下创建两个文件一个是client.properties,一个是jaas.conf

在client.properties文件里面写入

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
group.id=testgroup

在jaas.conf写入

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};

之后在shell命令行执行一下命令来配置环境变量(这样只针对当前进程有效)

[[email protected] kafka_client]# export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/kafka_client/jaas.conf"
[[email protected] kafka_client]# echo $KAFKA_OPTS

在执行kinit命令登陆kerberos用户

二.Shell 命令行使用Kafka(已集成sentry)

1.创建kafka topic

[[email protected] kafka_client]# kafka-topics --create --zookeeper cdh-master01:2181 --replication-factor 1 --partitions 1 --topic testTopic

2.查看Topic列表

[[email protected] kafka_client]# kafka-topics --zookeeper cdh-master01:2181 --list

3.删除Topic

[[email protected] kafka_client]# kafka-topics --delete --zookeeper cdh-master01:2181 --topic testTopic

4.向Topic生产数据(需要权限)

[[email protected] kafka_client]# kafka-console-producer --broker-list cdh-datanode03:9092,cdh-datanode04:9092 --topic testTopic --producer.config /usr/local/kafka_client/client.properties

5.消费Topic数据(需要权限)

[[email protected] kafka_client]# kafka-console-consumer --topic testTopic --from-beginning --bootstrap-server cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092 --consumer.config /usr/local/kafka_client/client.properties

此时会报以下错误 表示没有权限向testTopicTopic 写入数据此时我们需要给我们kinit登陆的用户赋予权限

ERROR internals.ErrorLoggingCallback: Error when sending message to topic testTopic with key: null, value: 3 bytes with error:
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [testTopic]

我们以fayson用户为例 它属于user组(id+用户名 查看组)

  • 1.我们需要首先创建一个kafka的principle
  • 2.我们给user用户组赋权可以写入数据到testTopic,注意需要使用管理员kafka用户登录Kerberos才能进行操作
[[email protected] kafka_client]# klist
Ticket cache: FILE:/tmp/krb5cc_0
Default principal: [email protected]

Valid starting       Expires              Service principal
09/11/2019 20:47:25  09/12/2019 20:47:25  krbtgt/[email protected]
    renew until 09/18/2019 20:47:25
  • 3.创建一个role
[[email protected] kafka_client]#  kafka-sentry -cr -r kafka_role
  • 4.给kafka_role赋予写入testTopic权限
[[email protected] kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=write"
[[email protected] kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=testTopic->action=describe"
  • 5.将角色加入到user组下面
[[email protected] kafka_client]#  kafka-sentry -arg -r kafka_role -g user
  • 6.以fayson用户登录(输入密码)
[[email protected] kafka_client]# kinit fayson

之后以此用户写入testTopic 就不会报权限问题了

此时我们还需要给 fayson 用户赋予读取testTopic的权限,所以需要给kafka_role赋予读取testtopic的权限

  • 1.我们在上面完成的基础之上需要对kafka_role角色赋予读取testTopic 的权限
  • 2.执行以下命令需要使用kafka 用户
[[email protected] kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=read"
[[email protected] kafka_client]# kafka-sentry -gpr -r kafka_role -p "CONSUMERGROUP=testgroup->action=describe"
[[email protected] kafka_client]# kafka-sentry -gpr -r kafka_role -p "Topic=zhcTestTopic->action=read"

三.代码访问(java)

需要创建consumer.properties,producer.properties,jaas.conf文件 还要引入krb5.conf文件

producer.properties文件内容

bootstrap.servers=cdh-datanode03:9092,cdh-datanode04:9092,cdh-datanode05:9092
#实现了Serializer接口的序列化类。用于告诉kafka如何序列化key
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#告诉kafka如何序列化value
value.serializer=org.apache.kafka.common.serialization.StringSerializer
acks=1
#访问kerberos的kafka client 配置
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

consumer.properties文件内容

bootstrap.servers=cdh-datanode04:9092
group.id=testgroup1
enable.auto.commit=true
session.timeout.ms=30000
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

jaas.conf文件内容

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  doNotPrompt=true
  useKeyTab=true
  storeKey=true
  renewTicket=true
  keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
  principal="[email protected]";
};

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  storeKey=true
  keyTab="D:/cdh/kafka/src/main/kerberos/gree1.keytab"
  principal="[email protected]";
};
  • 1.pom.xml
    <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.2.1-cdh6.3.0</version>
      </dependency>
  • 2.producer
package producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.Properties;

class MyProducer {
    private static final MyProducer Instance = new MyProducer();

    private MyProducer() {
    }

    public static MyProducer getInstance() {
        return Instance;
    }

    public int messageNo = 1;

    /**
     * 获得一个Kafka生产者实例
     *
     * @return
     */
    public KafkaProducer Produce() {
        System.setProperty("java.security.auth.login.config",             "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");

        Properties props = new Properties();
        try {
            props.load(this.getClass().getResourceAsStream("/producer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        KafkaProducer producer = new KafkaProducer(props);
        return producer;
    }
}

public class ProducerStarter implements Runnable {
    private int threadIndex;

    public ProducerStarter(int threadIndex) {
        this.threadIndex = threadIndex;
    }

    /**
     * 生产数据
     */

    public void run() {
        MyProducer pro = MyProducer.getInstance();
        KafkaProducer prod = pro.Produce();
        String topic = "testTopic";
        int i = 0;
        while (1 == 1) {
            final int index = i++;
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            prod.send(new ProducerRecord<String, String>(topic,String.valueOf(index), String.valueOf(i)), new Callback() {

                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    }
                    System.out.println("message send to partition " + recordMetadata.partition() + /*value*/ ": hello word " + index);
                }
        });
            prod.flush();
            //sleep 1min
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 启动200个线程,生产
     *
     * @param args
     */
    public static void main(String args[]) {

        for (int i = 0; i < 1; i++) {
            System.out.println("启动线程:" + i);
            Thread thread = new Thread(new ProducerStarter(i));
            thread.start();
        }
    }
}
  • 3.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;

public class ConsumerStarter {
    public static void main(String[] args) throws InterruptedException {

        KafkaConsumer consumer = Consumer.getInstance().Consume();
        consumer.subscribe(Arrays.asList("testTopic"));
        //消费并打印消费结果
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord record: records) {
                System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
            }
            Thread.sleep(1000);
        }
    }
}
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Properties;

/**
 * Created by 260212 on 2018/4/12.
 * Author:JackLee -赵化臣
 * 描述:
 */
class Consumer {
    private static final Consumer Instance=new Consumer();
    private Consumer(){}
    public static Consumer getInstance(){
        return Instance;
    }

    /**
     * 获得一个Kafka消费者
     * kafka-clients版本要高于0.9.0.1,否则会取出为null
     * @return
     */
    public KafkaConsumer Consume (){
        System.setProperty("java.security.auth.login.config",  "D:\\cdh\\kafka\\src\\main\\kerberos\\jaas.conf");
        System.setProperty("java.security.krb5.conf", "D:\\cdh\\kafka\\src\\main\\kerberos\\krb5.conf");
        Properties props=new Properties();
        try {
            props.load(this.getClass().getResourceAsStream("/consumer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        KafkaConsumer consumerSelf=new KafkaConsumer<String,String>(props);
        return consumerSelf;
    }
}

原文地址:https://www.cnblogs.com/HarSenZhao/p/11508687.html

时间: 2024-10-12 07:26:38

kafka(2.2.1)(kerberos+LDAP+Sentry)访问使用的相关文章

Kerberos+LDAP+NFSv4 实现单点登录(续2)--一键安装

( 附:LDAP简单认证登录 login4ldap-ver0.0.6.zip 源代码 下载地址 http://u.163.com/NeMVmlIT 提取码: ObEubL7Y ) 上篇Kerberos+LDAP+NFSv4 实现单点登录(续1)链接地址http://lulinlin1.lofter.com/post/1cf3848f_11f58066?act=qbbloglofter_20150506_01 本篇是前两篇的总结,编写成一键安装脚本onekeysso.sh,并需名为dns.ldif

Kerberos+LDAP+NFSv4 实现单点登录

Kerberos : 身份认证LDAP : 目录信息服务NFSv4 : 网络共享 实验环境 : debian 9 三台主机:nfs服务器 : 192.168.1.103nfs客户机 : 192.168.1.102 即SSSD客户端+NFS客户端kdc服务器 : 192.168.1.101 即Kerberos+LDAP 以下 [email protected]:~# 表示以root根用户运行命令 一.安装NTP时间同步要使用Kerberos提供身份认证,各主机需时间同步 在一台主机上安装时间同步服

Kerberos+LDAP+NFSv4 实现单点登录(上)

Kerberos : 身份认证LDAP : 目录信息服务NFSv4 : 网络共享 实验环境 : debian 9 三台主机:nfs服务器 : 192.168.1.103nfs客户机 : 192.168.1.102 即SSSD客户端+NFS客户端kdc服务器 : 192.168.1.101 即Kerberos+LDAP 以下 [email protected]:~# 表示以root根用户运行命令 一.安装NTP时间同步要使用Kerberos提供身份认证,各主机需时间同步 在一台主机上安装时间同步服

Kerberos+LDAP+NFSv4 实现单点登录(续4)--SASL/GSSAPI

前篇<Kerberos+LDAP+NFSv4 实现单点登录(续1)--dns+dhcp>的krb5 + ldap + bind9 + bind9-dyndb-ldap 全面升级到debian 10,出现bind9-dyndb-ldap的GSSAPI+krb5_keytab认证机制无法连接ldap数据库.查看日志:SASL/GSSAPI authentication startedError: Local errorAdditional info: SASL(-1): generic failu

Kerberos+LDAP+NFSv4 实现单点登录(下)

六.nfs客户机的安装nfs客户机也即SSSD客户机,需安装sssd和nfs-common 1.安装sssd会自动安装libsasl2-modules-gssapi-mit(非依赖)libsasl2-modules-gssapi-mit和libsasl2-modules-gssapi-heimdal两者冲突,安装libsasl2-modules-gssapi-heimdal也可以 [email protected]:~# apt-get install sssd sssd-krb5 sssd-l

kerberos下HBase访问Zookeeper的ACL问题

最近公司HBase(CDH-4.6.0)遇到了一个麻烦问题,觉得有必要记录下整个解决的过程. 问题起因 用户在跑mapreduce任务,从hdfs读取文件想写入到hbase table的时候失败了(这是hbase提供的一种mapred能力).这个问题发现在A环境(一个测试环境),自从启用了kerberos之后.运行了用户给的程序和自己写的sample之后,发现程序最后挂在NullPointerException上.这个NPE指示的是服务端的一个叫currentKey的变量为null. [emai

kafka安装和使用远程代码进行访问 ---附踩坑记录

kafka安装和使用java连接远程服务器进行消息的生成与消费 首先要使用kafka,要有jdk和zookeeper的环境 本文在阿里云的centos7环境上进行 jdk版本选择的是1.8.0_181 zookeeper的版本是3.4.12 kafka的版本是2.12-1.1.1 关于kafka命令的介绍 本文不介绍了 只介绍怎么搭建一个kafka单点服务器 以及怎么使用代码 远程连接kafka服务器 下载地址 kafka下载地址 :http://kafka.apache.org/downloa

yum安装kerberos LDAP,集成到CDH

一,配置yum源 ls -l /dev |grep cd mkdir /mnt/cdrom mount /dev/cdrom /mnt/cdrom cd /etc/yum.repos.d 将其他yum源备份并删除 vi media.repo [rh6-media] name=rh6-media autorefresh=0 baseurl=file:///mnt/cdrom/ gpgcheck=0 enabled=1 yum clean all yum makecache 二,安装Kerberos

Kafka集群常见的跨网络访问详解

场景说明:当客户端与服务端在不同区域(跨防火墙,地址均做了映射)时,客户端访问kafka时会出现获取不到broker的问题,但是网络之间是互通的.但在跨防火墙下,client请求zookeeper的时候,zookeeper返回给client的broker IP是kafka的实际地址,而不是映射地址,因此client会访问失败. 解决方式一.1.配置文件:listeners=PLAINTEXT://主机名:90922.服务端hosts:内网地址 主机名 3.客户端hosts :外网地址 主机名 注