kafka安装和使用远程代码进行访问 ---附踩坑记录

kafka安装和使用java连接远程服务器进行消息的生成与消费

首先要使用kafka,要有jdk和zookeeper的环境

本文在阿里云的centos7环境上进行

jdk版本选择的是1.8.0_181

zookeeper的版本是3.4.12

kafka的版本是2.12-1.1.1

关于kafka命令的介绍 本文不介绍了 只介绍怎么搭建一个kafka单点服务器 以及怎么使用代码 远程连接kafka服务器

下载地址

kafka下载地址 :http://kafka.apache.org/downloads

zookeeper下载地址:https://zookeeper.apache.org/

jdk下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

操作步骤

1、首先 使用tar命令对jdk进行解压
    tar -zxvf tar -zxvf jdk-8u181-linux-x64.tar.gz
    目录下面会多出一个jdk1.8.0_181  进入里面去  使用pwd命令查看绝对路径  并且复制找个路径
    最后进行jdk环境变量的配置
    编辑 vim /etc/profile文件
    在文件后面加上:
    export JAVA_HOME=(刚才pwd命令看到的路径)
    export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/            lib/    tools.jar
    export PATH=$PATH:${JAVA_HOME}/bin

    最后使用source /etc/profile 刷新文件

    使用java -version 查看环境变量是否配置成功

2、成功之后进行zookeeper的安装

    使用 tar -zxvf zookeeper-3.4.12.tar.gz 接下下载好的zookeeper安装包

    将zookeeper下的/conf/zookeeper.example改名成zoo.cfg
    使用mv 和cp命令都可以  然后vim这个文件 加上下面两行
    dataLogDir=/tmp/zookeeper-log #日志路径
    quorumListenOnAllIPs=true #在阿里云的服务器上保证外网可以访问到  刚开始没设置这个折腾了好久
3、最后,安装kafka
    使用 tar -zxvf kafka_2.12-1.1.1.tgz 解压下载好的kafka
    cd 到解压后的文件里面去  编辑配置文件  vim config/server.properties
    加上下面几行
    listeners=PLAINTEXT://:9092
    advertised.host.name=阿里云服务器公网ip #
    advertised.port=9092

    将zookeeper.connect的值改为阿里云的公网ip
    

至此,所有的环境的安装已经完成,下面使用kafka的命令进行消息的生成和消费

    首先cd到zookeeper的bin目录下  使用 ./zkServer.sh start 启动zookeeper
    再cd到kafka的bin目录下 使用 ./kafka-server-start.sh ../config/server.properties 启动kafka

    新建一个会话或者打开一个新的终端
    这时候使用jps命令  可以看到 Kafka和QuorumPeerMain表示启动全部成功,下面创建一个主题
    cd到kafka的bin目录下面,执行
    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --     partitions 1 --topic Hello-world

    输出Created topic "Hello-world".  表示topic创建成功
    使用./kafka-topics.sh --list --zookeeper localhost:2181 查看主题的列表
    输出里面会含有Hello-world

    下面进行消息的生产和消费
    先启动生产者 ./kafka-console-producer.sh --broker-list 阿里云公网ip:9092 --topic Hello-        world
    会出现一个 >  类似于交互界面 这时候就可以生产消息了

    启动消费者 ./kafka-console-consumer.sh --zookeeper 阿里云公网ip:2181 --topic Hello-       world --from-beginning 

    这时候当生产者生产消息的时候  消费者这边就可以看到了  

    

在服务器上面进行消息的生产和消费就完成了 下面介绍怎么使用java代码进行远程连接kafka服务器

这个地方真的踩了好多好多坑、有次晚上下班搞到了快两点 百度、谷歌、维基、Stack Overflow 能找解决问题的地方都找了浪费了好多不必要的时间

    首先、新建一个Maven工程(此处不再多描述),在pom文件中加入kafka的依赖
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
    </dependency>

    新建一个KafkaProducerDemo和KafkaConsumerDemo类(名字可以自定义):
    话不多说  上代码

    KafkaProducerDemo类:

            public class KafkaProducerDemo {
                public static void main(String[] args) {
                    //创建properties文件
                    Properties properties = new Properties();
                    //设置kafka服务器地址
                   properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里云公网ip:9092");
                   //设置key进行序列化
                   properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                   //设置value进行序列化
                   properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
                   //创建消息生产者
                   KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
                   //创建消息实体 制定主题、key、value
                   ProducerRecord<String,String> record = new ProducerRecord<>("Hello-world","haha","from java client");
                   //发送消息
                   producer.send(record);
                   System.out.println("消息发送成功");
                   //关闭生产者
                   producer.close();

                }
            }

    KafkaConsumerDemo类:

     public class KafkaConsumerDemo {

        public static void main(String[] args) {
            //新建配置文件
            Properties properties = new Properties();
            //设置kafka服务器地址
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"阿里云公网ip:9092");
            //设置key的反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //设置value的反序列化
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
            //设置groupid
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

            //创建消费者对象
            KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
            //订阅主题
            consumer.subscribe(Arrays.asList("Hello-world"));

            while (true) {
                //消费消息
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)

                    System.out.printf("offset = %d, key = %s, value = %s\n",
                            record.offset(), record.key(), record.value());
            }
        }
    }

    上面就是连接kafka远程服务器代码

    

但是上述过程做完之后还是不能正确运行、这个地方折腾了好久、最后在哪里看到解决的办法记不大清了

就是要阿里云服务器服务安全设置里面加个规则 将2181和9092端口开放就可以,但是我中间也使用命令的方式

关闭了防火墙、没什么用,不知道什么鬼。 搞得我头皮发麻

原文地址:https://www.cnblogs.com/evildoerdb/p/9650808.html

时间: 2024-10-15 21:34:00

kafka安装和使用远程代码进行访问 ---附踩坑记录的相关文章

华为云kafka POC 踩坑记录

2019/03/08 18:29 最近在进行华为云相关POC验证,个人主要负责华为云DMS kafka相关.大致数据流程是,从DIS取出数据,进行解析处理,然后放入kafka,再从kafka中取出数据然后放到ElasticSearch以及OBS里面.kafka作为中间层次,发挥着中间件的重要作用.关于华为云kafka的整合,这两天的确碰到一些坑,现进行相关总结加以记录. 第一个问题:kafka的jar包不要用开源的,而是用图中libs的华为官方的jar包.新建libs文件目录,然后把jarBao

Tomcat安装后,远程IP无法访问的问题。

我在使用阿里云与聚石塔的时候,发现Tomcat启动后,本地可以访问,但是外网无法访问,即使关闭防火墙也无法访问. 原因是 云平台的网络拦截. 阿里云:有一个入网规则 和 出网规则 ,流入数据端口  流出数据端口 ,是在操作系统的基础上 又一层 拦截. 聚石塔:聚石塔是打着安全的旗号,仅仅开放几个端口,其中开放了80端口,而tomcat用的是8080端口,所以遭到了 网络层面的拦截. ------------------------------------- 方法 阿里云的 就是增加端口规则.或采

nvm安装node和npm,个人踩坑记录

我采用nvm-setup安装windows版本的nvm nvm安装node出现的问题: 1.node成功了,npm没成功 解决:在nvm 安装了node之后,输入npm找不到该命令,当时安装报错如下: 报错其实也看不明白,大概感觉是npm包下载的地址没连上,出错了,查看nvm目录文件夹,有一个temp文件夹,里面存放着npm的压缩包,把temp整个文件夹删除,执行uninstall v6.10.2的时候,提示删除失败,需要手动删除,其实他是只删除了存放nodejs的文件夹,但是相应版本删除失败,

Ubuntu16.04安装K8s步骤和踩坑记录【不错】

文章目录环境信息安装步骤系统配置修改安装docker安装kubectl,kubelet,kubeadm配置Master配置Node部署结果检查K8S部署mysql学习新建mysql-rc.yaml创建mysql-svc.yaml安装K8S部署JAVA应用创建deployment创建service更新deployment其他命令参考K8S Deployment 命令环境信息名称: 版本Docker 18.06.1-ce操作系统 Ubuntu16.04K8s v1.13.2机器信息 IP 作用 组件

pyltp安装踩坑记录

LTP(Language Technology Platform)由哈工大社会计算与信息检索研究中心开发,提供包括中文分词.词性标注.命名实体识别.依存句法分析.语义角色标注等丰富. 高效.精准的自然语言处理技术. LTP的源码是C++,也提供Java和Python版本.Python版本的安装方法是在cmd下输入 pip install pyltp 安装需要C++的编译环境. 然而在安装包下载下来开始安装的时候,报下面的错误 Exception: Traceback (most recent c

Manjaro (KDE)安装踩坑记录

1.如果双显卡无法安装系统可以进如BIOS屏蔽显卡后进入安装 2.如果安装kde版本后容易冻屏.死机,可以尝试安装闭源驱动 3.如果出现resolving time out 10000ms 这样的问题,可以尝试进入    /etc/resolv.conf  修改dns地址 4.如若发现屏幕菜单界面过大问题,也许是字体太大 5.KDE目前无法安装QQ可以尝试 crossover 原文地址:https://www.cnblogs.com/iwannabe/p/10029518.html

安装Ubuntu16.04踩坑记录

下载Ubuntu16.04 中科大源 http://mirrors.ustc.edu.cn/ubuntu-releases/16.04/ 阿里云开源镜像站 http://mirrors.aliyun.com/ubuntu-releases/16.04/ 下载universal-USB-installer 制作U盘启动盘 准备一个U盘,格式化为FAT32 选择目标系统为Ubuntu,指定下载好的Ubuntu16.04 ISO镜像 制作完成 安装系统 重启计算机选择从U盘启动 如果U盘格式化为NTF

安装kafka并设置通过外网ip访问

1.kafka 安装 安装JDK tar xvf jdk1.8.0_231.tar.gz -C /usr/local && cd /usr/local ln -sv jdk1.8.0_231 jdk vim /etc/profile.d/java.sh JAVA_HOME=/usr/local/jdk PATH=$JAVA_HOME/bin:$PATH zookeeper安装(或使用kafka自带的) vim /usr/local/kafka/zookeeper/conf/zoo.cfg

Apache ActiveMQ Fileserver远程代码执行漏洞

扫端口的时候遇到8161端口,输入admin/admin,成功登陆,之前就看到过相关文章,PUT了一句话上去,但是没有什么效果,于是本地搭建了一个环境,记录一下测试过程. 环境搭建: ActiveMQ 5.1.0 下载地址:http://activemq.apache.org/activemq-510-release.html 解压后,双击运行abtivemq.bat运行.(进入bin目录,根据自己的操作系统选择win32或win64,5.1.0只有win32目录) 访问8161端口: 漏洞利用