kafka集群配置和java编写生产者消费者操作例子

  • kafka

    • 安装
    • 修改配置文件
    • java操作kafka

kafka

kafka的操作相对来说简单很多

安装

下载kafka http://kafka.apache.org/downloads

tar -zxvf kafka_2.12-2.1.0.tgz
rm kafka_2.12-2.1.0.tgz
mv kafka_2.12-2.1.0 kafka

sudo vim /etc/profile
    export KAFKA_HOME=/usr/local/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

准备 worker1 worker2 worker3 这四台机器

首先确保你的zookeeper集群能够正常运行worker1 worker2 worker3为zk集群
具体配置参照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html

修改配置文件

  1. server.properties

    sudo vim server.properties
    添加如下属性
    broker.id=0 # 3台机器分别设置成0 1 2
    log.dirs=/usr/local/kafka/logs
    zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
  2. 运行
    运行
        bin/kafka-server-start.sh config/server.properties
    创建topic
        bin/kafka-topics.sh --create --zookeeper worker1:2181 --replication-factor 2 --partitions 2 --topic test
    查看topic
        bin/kafka-topics.sh --list --zookeeper worker1:2181
    订阅topic,利用worker2来订阅
        bin/kafka-console-consumer.sh --bootstrap-server worker1:9092 --topic test --from-beginning
    topic发送消息
        bin/kafka-console-producer.sh --broker-list worker1:9092 --topic test
        键入任何消息,worker2都能接收到
    查看topic详情
        bin/kafka-topics.sh --describe --zookeeper worker1:2181 --topic test

java操作kafka

  1. 依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>2.1.0</version>
    </dependency>
  2. 生产者
    public class Producer
    {
        public static void main( String[] args ){
            Properties props = new Properties();
            // 服务器ip
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
            // 属性键值对都序列化成字符串
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 创建一个生产者,向test主题发送数据
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            producer.send(new ProducerRecord<String, String>("test", "生产者传递的消息"));
            producer.close();
        }
    }
  3. 消费者
    public class Consumer
    {
        public static void main( String[] args ){
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 消费者对象
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
            kafkaConsumer.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.between(
                LocalDateTime.parse("2019-01-09T11:30:30"), LocalDateTime.now()));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, value = %s", record.offset(), record.value());
                    System.out.println();
                }
            }
        }
    }

原文地址:https://www.cnblogs.com/ye-hcj/p/10260954.html

时间: 2024-07-29 14:23:02

kafka集群配置和java编写生产者消费者操作例子的相关文章

Kafka 集群配置SASL+ACL

** Kafka 集群配置SASL+ACL 测试环境:** 系统: CentOS 6.5 x86_64 JDK : java version 1.8.0_121 kafka: kafka_2.11-1.0.0.tgz zookeeper: 3.4.5 ip: 192.168.49.161 (我们这里在一台机上部署整套环境) kafka 名词解析: Broker: Kafka 集群包含一个或多个服务器,这种服务器被称为broker Topic: 每条发布到Kafka 集群的消息都有一个类别,这个类

ELK5.3+Kafka集群配置

[一]资源准备 # 3台4C*8G, 安装Zookeeper.Kafka.Logstash--Broker(input: filebeat; output: Kafka) 10.101.2.23 10.101.2.24 10.101.2.25 # 2台4C*8G, 安装Logstash--Indexer(input: Kafaka; output: Elasticsearch) 10.101.2.26 10.101.2.27 # 3台8C*16G, 安装Elasticsearch 10.101.

kafka集群配置与测试

刚接触一些Apache Kafka的内容,用了两天时间研究了一下,仅以此文做相关记录,以供学习交流.  概念: kafka依赖的项: 1. 硬件上,kafka利用线性存储来进行硬盘直接读写. 2. kafka没有使用内存作为缓存. 3. 用zero-copy. 4. Gzip和Snappy压缩, 5. kafka对事务处理比较弱,但是message分发上还是做了一定的策略来保证数据递送的准确性的. kafka关于存储的几个概念 1. Partition:同一个topic下可以设置多个partit

HyperLedger Fabric基于zookeeper和kafka集群配置解析

简述 在搭建HyperLedger Fabric环境的过程中,我们会用到一个configtx.yaml文件(可参考Hyperledger Fabric 1.0 从零开始(八)--Fabric多节点集群生产部署),该配置文件主要用于构建创世区块(在构建创世区块之前需要先创建与之对应的所有节点的验证文件集合),其中在配置Orderer信息中有一个OrdererType参数,该参数可配置为"solo" and "kafka",之前博文所讲的环境配置皆是solo,即单节点共

kafka 集群环境搭建 java

简单记录下kafka集群环境搭建过程, 用来做备忘录 安装 第一步: 点击官网下载地址 http://kafka.apache.org/downloads.html 下载最新安装包 第二步: 解压 tar xvf kafka_2.12-2.2.0.tgz 第三步: 检查服务器有没有安装zookeeper集群, 没有的话,自行百度补充 第四步:修改config/server.properties 文件 # Licensed to the Apache Software Foundation (AS

[Golang] kafka集群搭建和golang版生产者和消费者

一.kafka集群搭建 至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了. 1. 下载zookeeper  https://zookeeper.apache.org/releases.html 2. 下载kafka http://kafka.apache.org/downloads 3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台) 1)配置zookeeper. 修改复制一份 zookeeper-3.4.13/c

Kafka集群配置

kafka_2.11-0.9.0.1.tgz 1.进入项目前的目录 cd /home/dongshanxia mkdir kafka #创建项目目录 cd kafka #进入项目目录 mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息 2.进入配置文件目录 cd /home/dongshanxia/kafka/kafka_2.11-0.9.0.1/config //打开配置文件 vim server.properties ----------------------

spark集群配置以及java操作spark小demo

spark 安装 配置 使用java来操作spark spark 安装 tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz rm spark-2.4.0-bin-hadoop2.7.tgz mv spark-2.4.0-bin-hadoop2.7 spark sudo vim /etc/profile export SPARK_HOME=/usr/local/storm export PATH=$PATH:$SPARK_HOME/bin source /etc/pro

Kafka详解之二、如何配置Kafka集群

Kafka集群配置比较简单,为了更好的让大家理解,在这里要分别介绍下面三种配置 单节点:一个broker的集群 单节点:多个broker的集群 多节点:多broker集群 一.单节点单broker实例的配置 1.首先启动zookeeper服务 Kafka本身提供了启动zookeeper的脚本(在kafka/bin/目录下)和zookeeper配置文件(在kafka/config/目录下),首先进入Kafka的主目录(可通过 whereis kafka命令查找到): [[email protect