kafka 2.12在linux下的安装部署及java客户端对接

一、下载kafka_2.12-2.4.0.tgz并解压至/home/kafka_2.12-2.4.0

二、配置kafka

2.1 创建kafka日志文件夹:/home/kafka_2.12-2.4.0/logs

2.2 创建zookeeper数据目录:/tmp/zookeeper

2.3 配置/home/kafka_2.12-2.4.0/config/server.properties   内容如下(SSL证书在下面介绍):

ssl.keystore.location=/home/ca/server/server.keystore.jks
ssl.keystore.password=mima123
ssl.key.password=mima123
ssl.truststore.location=/home/ca/trust/server.truststore.jks
ssl.truststore.password=mima123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
#security.inter.broker.protocol=SSL
inter.broker.listener.name=SSL

############################# Server Basics #############################
broker.id=0
listeners=SSL://阿里云内网IP:9093
advertised.listeners=SSL://阿里云外网IP:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################

log.dirs=/home/kafka_2.12-2.4.0/logs
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

#log.flush.interval.messages=10000
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=阿里云内网IP:2181
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
delete.topic.enble=true

2.4 配置 /home/kafka_2.12-2.4.0/config/zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

2.5 配置/etc/hosts文件,增加红色行,IP为阿里云内网IP

127.0.0.1    localhost    localhost.localdomain    localhost4    localhost4.localdomain4
::1    localhost    localhost.localdomain    localhost6    localhost6.localdomain6
172.18.54.18    iZwz9gq8vhwxtgpg21yonsZ    iZwz9gq8vhwxtgpg21yonsZ
172.18.54.18    kafka-single

三、生成SSL相关证书文件

3.1、创建四个文件夹 /home/ca/root、/home/ca/trust、/home/ca/server、/home/ca/client

3.2、签发相关证书

第一步:生成server.keystore.jks文件(即:生成服务端的keystore文件)
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123 -keyalg RSA -dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -storepass mima123 -ext SAN=DNS:kafka-single

第二步:生成CA认证证书(为了保证整个证书的安全性,需要使用CA进行证书的签名保证)
openssl req -new -x509 -keyout /home/ca/root/ca-key -out /home/ca/root/ca-cert -days 10000 -passout pass:mima123 -subj "/C=cn/ST=beijing/L=beijing/O=qmx/OU=qmx/CN=kafka-single"

第三步:通过CA证书创建一个客户端信任证书
keytool -keystore /home/ca/trust/client.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123

第四步:通过CA证书创建一个服务端器端信任证书
keytool -keystore /home/ca/trust/server.truststore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123

第五步:服务器证书的签名处理
第1小步:导出服务器端证书server.cert-file
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/server/server.cert-file -storepass mima123
第2小步:用CA给服务器端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/server/server.cert-file -out /home/ca/server/server.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第3小步:将CA证书导入到服务器端keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第4小步:将已签名的服务器证书导入到服务器keystore
keytool -keystore /home/ca/server/server.keystore.jks -alias ds-kafka-single -import -file /home/ca/server/server.cert-signed -storepass mima123

客户端SSL证书签发
第一步:生成client.keystore.jks文件
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -validity 10000 -genkey -keypass mima123-dname "CN=kafka-single,OU=qmx,O=qmx,L=beijing,S=beijing,C=cn" -ext SAN=DNS:kafka-single -storepass mima123
第二步:导出客户端证书client.cert-file
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -certreq -file /home/ca/client/client.cert-file -storepass mima123
第三步:用CA给客户端证书进行签名处理
openssl x509 -req -CA /home/ca/root/ca-cert -CAkey /home/ca/root/ca-key -in /home/ca/client/client.cert-file -out /home/ca/client/client.cert-signed -days 10000 -CAcreateserial -passin pass:mima123
第四步:将CA证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias CARoot -import -file /home/ca/root/ca-cert -storepass mima123
第五步:将已签名的证书导入到客户端keystore
keytool -keystore /home/ca/client/client.keystore.jks -alias ds-kafka-single -import -file /home/ca/client/client.cert-signed -storepass mima123

四、启动和停止kafka和zookeeper服务

cd /home/kafka_2.12-2.4.0/bin

启动zookeeper:

./zookeeper-server-start.sh /home/kafka_2.12-2.4.0/config/zookeeper.properties &

启动kafka:

./kafka-server-start.sh /home/kafka_2.12-2.4.0/config/server.properties &

查看topic情况:

./kafka-topics.sh --list --zookeeper localhost:2181

关闭kafka:

./kafka-server-stop.sh

关闭zookeeper:

./zookeeper-server-stop.sh

查看 kafka 的 topic 情况:

./kafka-topics.sh --list --zookeeper 172.18.54.18:2181

查看topic详细信息:

./kafka-topics.sh --describe --zookeeper 172.18.54.18:2181 --topic topic1

生产者客户端命令:

./kafka-console-producer.sh --broker-list 172.18.54.18:9092 --topic topic1

消费者客户端命令:

./kafka-console-consumer.sh --bootstrap-server 172.18.54.18:9092 --topic topic1 --from-beginning

五、JAVA客户端对接

5.1 Producer

package com.xrh.extend.kafka;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class Producer {

    public static String topic = "topic2";//定义主题

    public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");//kafka地址,多个地址用逗号分割
//        acks:消息的确认机制,默认值是0。
//        acks=0:如果设置为0,生产者不会等待kafka的响应。
//        acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
//        acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
        props.put("acks", "all");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
        props.put("ssl.truststore.password", "mima123");
        props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
        props.put("ssl.keystore.password", "mima123");
        props.setProperty("ssl.endpoint.identification.algorithm", "");
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        try {
            int i = 1;
            while (i < 20) {
                String msg = "测试 Hello," + new Random().nextInt(100);

                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic , "key1", msg);
                kafkaProducer.send(record, new MyProducerCallBack());
                System.out.println("消息发送成功:" + msg);
                ++ i;
                Thread.sleep(500);
            }
        } finally {
            kafkaProducer.close();
        }

    }

    private static class MyProducerCallBack implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
           if(null != e){
             e.printStackTrace();
             return;
          }
          System.out.println("时间戳,主题,分区,位移: " + recordMetadata.timestamp()
              + ", " + recordMetadata.topic() + "," + recordMetadata.partition()
              + " " + recordMetadata.offset());
       }
    };

//    acks = 1
//    batch.size = 16384 //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
//    bootstrap.servers = [39.108.124.173:9092]
//    buffer.memory = 33554432
//    client.dns.lookup = default
//    client.id =
//    compression.type = none
//    connections.max.idle.ms = 540000
//    delivery.timeout.ms = 120000
//    enable.idempotence = false
//    interceptor.classes = []
//    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
//    linger.ms = 0
//    max.block.ms = 60000
//    max.in.flight.requests.per.connection = 5
//    max.request.size = 1048576
//    metadata.max.age.ms = 300000
//    metric.reporters = []
//    metrics.num.samples = 2
//    metrics.recording.level = INFO
//    metrics.sample.window.ms = 30000
//    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
//    receive.buffer.bytes = 32768
//    reconnect.backoff.max.ms = 1000
//    reconnect.backoff.ms = 50
//    request.timeout.ms = 30000
//    retries = 2147483647   //配置为大于0的值的话,客户端会在消息发送失败时重新发送。
//    retry.backoff.ms = 100
//    sasl.client.callback.handler.class = null
//    sasl.jaas.config = null
//    sasl.kerberos.kinit.cmd = /usr/bin/kinit
//    sasl.kerberos.min.time.before.relogin = 60000
//    sasl.kerberos.service.name = null
//    sasl.kerberos.ticket.renew.jitter = 0.05
//    sasl.kerberos.ticket.renew.window.factor = 0.8
//    sasl.login.callback.handler.class = null
//    sasl.login.class = null
//    sasl.login.refresh.buffer.seconds = 300
//    sasl.login.refresh.min.period.seconds = 60
//    sasl.login.refresh.window.factor = 0.8
//    sasl.login.refresh.window.jitter = 0.05
//    sasl.mechanism = GSSAPI
//    security.protocol = PLAINTEXT
//    security.providers = null
//    send.buffer.bytes = 131072
//    ssl.cipher.suites = null
//    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
//    ssl.endpoint.identification.algorithm = https
//    ssl.key.password = null
//    ssl.keymanager.algorithm = SunX509
//    ssl.keystore.location = null
//    ssl.keystore.password = null
//    ssl.keystore.type = JKS
//    ssl.protocol = TLS
//    ssl.provider = null
//    ssl.secure.random.implementation = null
//    ssl.trustmanager.algorithm = PKIX
//    ssl.truststore.location = null
//    ssl.truststore.password = null
//    ssl.truststore.type = JKS
//    transaction.timeout.ms = 60000
//    transactional.id = null
//    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

}

5.2 Consumer

package com.xrh.extend.kafka;

import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import javafx.util.Duration;

public class Consumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云外网IP:9093");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", "D:\\ca\\client.truststore.jks");
        props.put("ssl.truststore.password", "mima123");
        props.put("ssl.keystore.location", "D:\\ca\\client.keystore.jks");
        props.put("ssl.keystore.password", "mima123");
        props.setProperty("ssl.endpoint.identification.algorithm", "");
//        p.put("auto.offset.reset", "latest");
//        bootstrap.servers: kafka的地址。
//        group.id:组名 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了。
//        enable.auto.commit:是否自动提交,默认为true。
//        auto.commit.interval.ms: 从poll(拉)的回话处理时长。
//        session.timeout.ms:超时时间。
//        max.poll.records:一次最大拉取的条数。
//        auto.offset.reset:消费规则,默认earliest 。
//        earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
//        latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
//        none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
//        key.serializer: 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
//        value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息
        while(true){

            ConsumerRecords<String, String> consumerDatas = consumer.poll(100);
            if( consumerDatas.count() > 0 ){
                Iterator<ConsumerRecord<String, String>> consumerIter = consumerDatas.iterator();
                while(consumerIter.hasNext()){
                    ConsumerRecord<String, String>  consumerData = consumerIter.next();
                    System.out.printf("offset = %d, key = %s, value = %s%n",
                            consumerData.offset(), consumerData.key(), consumerData.value());
                }
            }else{
                System.out.println("KafkaConsumer1 is waiting message...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }

       }
    }

}

原文地址:https://www.cnblogs.com/101key/p/12331060.html

时间: 2024-11-05 22:03:41

kafka 2.12在linux下的安装部署及java客户端对接的相关文章

Linux下Opengrok安装部署与使用

Opengrok 用于管理多项目的代码非常方便.本文以Opengrok 0.12.1来讲解在RedHat Enterprise Linux上的部署 http://opengrok.github.io/OpenGrok/ 软件依赖准备: 1 JAVA                          http://www.oracle.com/technetwork/java// 2. Tomcat                    http://tomcat.apache.org/ 3.Ex

Linux下vmware安装部署

Linux下vmware下载: 地址-Linux vmware : http://www.vmware.com/products/workstation/workstation-evaluation 安装依赖: yum -y install perl gcc kernel-devel libX11 libXinerama libXcursor libXtst yum install kernel-headers.x86_64 -y 安装桌面: yum groupinstall "Desktop&

JIRA 6.3.6在Linux下的安装部署

前提:已安装好JDK.MySQL JIRA 是澳大利亚 Atlassian 公司开发的一款优秀的问题跟踪管理软件工具,可以对各种类型的问题进行跟踪管理,包括缺陷.任务.需求.改进等.JIRA采用J2EE技术,能够跨平台部署.它正被广泛的开源软件组织,以及全球著名的公司使用. JIRA产品非常完善且功能强大,安装配置简单,多语言支持.界面十分友好,和其他系统如CVS.Subversion(SVN).VSS.LDAP.邮件服务整合得相当好,文档齐全,可用性以及可扩展性方面都十分出色,拥有完整的用户权

linux下docker安装部署项目(全)

一 .系统安装 基于CentOS-7-x86_64-Minimal-1708.iso安装系统 1.2.  配置系统 1.2.1  在线更新内核版本(建议更新,旧版内核会有docker BUG) 1.2.1.1 导入ELRepo软件仓库的公共秘钥,安装ELRepo软件仓库的yum源. rpm --import https://www.elrepo.org/RPM-GPG-KEY-elrepo.orgrpm -Uvh http://www.elrepo.org/elrepo-release-7.0-

Nginx在Linux下的安装部署

一.Nginx简介 Nginx ("engine x") 是一个高性能的 HTTP 和 反向代理 服务器,也是一个 IMAP/POP3/SMTP 服务器.Nginx作为负载均衡服务器:Nginx 既可以在内部直接支持 Rails 和 PHP 程序对外进行服务,也可以支持作为 HTTP代理服务器对外进行服务.nginx网站国内的用户有:百度.新浪.网易.腾讯等等. 二.Nginx的安装 下载Nginx(http://nginx.org/en/download.html),最新的好像是ng

Linux下Jetty9安装部署

在网上看Jetty觉得很NB就部署一下玩玩,长一下经验. 下载Jetty9地址 http://www.eclipse.org/jetty/previousversions.html 这里可以挑自己版本,我这里选的9版本 这里我把jetty 放在./usr/local/jetty目录下  下载好的上传就行了然后 tar解压 tar -xvf jetty-distribution-9.4.6.v20170531.tar.gz JDK我这里是已经部署好的 JDK1.7下载地址:http://downl

LINUX下svn安装部署

1.#安装yum install subversion 2.#测试svnserve --version3.#创建库根路径mkdir /usr/local/svn4.#创建一个项目库svnadmin create /usr/local/svn/project5.修改配置文件cd /usr/local/svn/project vi svnserve.conf [general]anon-access=none------------- #没有登录不能操作auth-access=write------

linux下oracle安装

本文主要介绍linux下oracle的安装,主要分为3部分:准本工作.安装oracle软件.用dbca工具创建数据库. 实验环境:rhel5.6+oracle_database_linux32.zip(10.2.0.1.0) 实验过程: 首先要确保linux系统内存大小在1G以上,另外/home与/目录也要足够大. 1.在安装oracle软件前,linux需要安装这些软件:binutils-2.17.50.0.6-5.el5.compat-db-4.2.52-5.1.control-center

Linux下编译安装qemu和libvirt

目录 [hide] 1 安装qemu 1.1 qemu介绍 1.2 下载源文件 1.3 编译安装 2 安装libvirt 2.1 libvirt介绍 2.2 下载libvirt 2.3 编译安装 3 参考资料 KVM虚拟机(英语:Kernel-based Virtual Machine),是一种用于Linux内核中的虚拟化基础设施.KVM目前支援Intel VT及AMD-V的原生虚拟技术.KVM在2007年2月被导入Linux 2.6.20核心中.它也被引入FreeBSD.在Mac OS X中,