Kafka笔记整理(二):Kafka Java API使用

[TOC]


Kafka笔记整理(二):Kafka Java API使用

下面的测试代码使用的都是下面的topic:

$ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
Topic:hadoop    PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: hadoop   Partition: 0    Leader: 103     Replicas: 103,101,102   Isr: 103,101,102
        Topic: hadoop   Partition: 1    Leader: 101     Replicas: 101,102,103   Isr: 101,102,103
        Topic: hadoop   Partition: 2    Leader: 102     Replicas: 102,103,101   Isr: 102,103,101

Kafka Java API之producer

关于producer API的使用说明,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释,有非常详细的说明,下面就直接给出程序代码及测试。

程序代码

KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;

import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

/**
 * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
 * <p>
 * Producer
 */
public class KafkaProducerOps {
    public static void main(String[] args) throws IOException {
        /**
         * 专门加载配置文件
         * 配置文件的格式:
         * key=value
         *
         * 在代码中要尽量减少硬编码
         *  不要将代码写死,要可配置化
         */
        Properties properties = new Properties();
        InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
        properties.load(in);
        /**
         * 两个泛型参数
         * 第一个泛型参数:指的就是kafka中一条记录key的类型
         * 第二个泛型参数:指的就是kafka中一条记录value的类型
         */
        String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
        String key = "1";
        String value = "今天的姑娘们很美";
        ProducerRecord<String, String> producerRecord =
                new ProducerRecord<String, String>(topic, key, value);
        producer.send(producerRecord);
        producer.close();
    }
}
Constants.java
package com.uplooking.bigdata.kafka.constants;

public interface Constants {
    /**
     * 生产的key对应的常量
     */
    String KAFKA_PRODUCER_TOPIC = "producer.topic";
}
producer.properties
############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092

# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none

# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=

# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=

# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=

# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
#linger.ms=

# the maximum size of a request in bytes
#max.request.size=

# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=

# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#buffer.memory=

#####设置自定义的topic
producer.topic=hadoop

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

其实这个配置文件就是kafka conf目录下的配置文件,只是这里要做相应的修改,关于每个字段的含义,可以查看org.apache.kafka.clients.producer.KafkaProducer这个类的代码注释。

测试

在终端中启动消费者监听topic的消息:

[[email protected] ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181

然后执行生产者程序,再查看终端输出:

[[email protected] ~]$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181
今天的姑娘们很美

Kafka Java API之consumer

程序代码

KafkaConsumerOps.java
package com.uplooking.bigdata.kafka.consumer;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class KafkaConsumerOps {
    public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
        properties.load(in);
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        Collection<String> topics = Arrays.asList("hadoop");
        // 消费者订阅topic
        consumer.subscribe(topics);
        ConsumerRecords<String, String> consumerRecords = null;
        while (true) {
            // 接下来就要从topic中拉取数据
            consumerRecords = consumer.poll(1000);
            // 遍历每一条记录
            for (ConsumerRecord consumerRecord : consumerRecords) {
                long offset = consumerRecord.offset();
                int partition = consumerRecord.partition();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
            }

        }
    }
}
consumer.properties
# Zookeeper connection string
# comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181

bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

#consumer group id
group.id=test-consumer-group

#consumer timeout
#consumer.timeout.ms=5000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

测试

先执行消费者的代码,然后再执行生产者的代码,在消费者终端可以看到如下输出:

2   0   1   今天的姑娘们很美
(分别是:offset partition key value)

Kafka Java API之partition

可以通过自定义partitioner来决定我们的消息应该存到哪个partition上,只需要在我们的代码上实现Partitioner接口即可。

程序代码

MyKafkaPartitioner.java
package com.uplooking.bigdata.kafka.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;
import java.util.Random;

/**
 * 创建自定义的分区,根据数据的key来进行划分
 * <p>
 * 可以根据key或者value的hashCode
 * 还可以根据自己业务上的定义将数据分散在不同的分区中
 * 需求:
 * 根据用户输入的key的hashCode值和partition个数求模
 */
public class MyKafkaPartitioner implements Partitioner {

    public void configure(Map<String, ?> configs) {

    }

    /**
     * 根据给定的数据设置相关的分区
     *
     * @param topic      主题名称
     * @param key        key
     * @param keyBytes   序列化之后的key
     * @param value      value
     * @param valueBytes 序列化之后的value
     * @param cluster    当前集群的元数据信息
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer partitionNums = cluster.partitionCountForTopic(topic);
        int targetPartition = -1;
        if (key == null || keyBytes == null) {
            targetPartition = new Random().nextInt(10000) % partitionNums;
        } else {
            int hashCode = key.hashCode();
            targetPartition = hashCode % partitionNums;
            System.out.println("key: " + key + ", value: " + value + ", hashCode: " + hashCode + ", partition: " + targetPartition);
        }
        return targetPartition;
    }

    public void close() {
    }
}
KafkaProducerOps.java
package com.uplooking.bigdata.kafka.producer;

import com.uplooking.bigdata.kafka.constants.Constants;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Random;

/**
 * 通过这个KafkaProducerOps向Kafka topic中生产相关的数据
 * <p>
 * Producer
 */
public class KafkaProducerOps {
    public static void main(String[] args) throws IOException {
        /**
         * 专门加载配置文件
         * 配置文件的格式:
         * key=value
         *
         * 在代码中要尽量减少硬编码
         *  不要将代码写死,要可配置化
         */
        Properties properties = new Properties();
        InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
        properties.load(in);
        /**
         * 两个泛型参数
         * 第一个泛型参数:指的就是kafka中一条记录key的类型
         * 第二个泛型参数:指的就是kafka中一条记录value的类型
         */
        String[] girls = new String[]{"姚慧莹", "刘向前", "周  新", "杨柳"};
        Producer<String, String> producer = new KafkaProducer<String, String>(properties);
        Random random = new Random();
        int start = 1;
        for (int i = start; i <= start + 9; i++) {
            String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
            String key = i + "";
            String value = "今天的<--" + girls[random.nextInt(girls.length)] + "-->很美很美哦~";
            ProducerRecord<String, String> producerRecord =
                    new ProducerRecord<String, String>(topic, key, value);
            producer.send(producerRecord);
        }
        producer.close();
    }
}

继续使用前面的消费者的代码,同时需要在producer.properties中指定我们定义的partitioner,如下:

partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner

测试

先执行消费者代码,然后再执行生产者代码,查看终端输出。

生产者终端输出(主要是自定义partitioner中的输出):

key: 1, value: 今天的<--刘向前-->很美很美哦~, hashCode: 49, partition: 1
key: 2, value: 今天的<--杨柳-->很美很美哦~, hashCode: 50, partition: 2
key: 3, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 51, partition: 0
key: 4, value: 今天的<--周  新-->很美很美哦~, hashCode: 52, partition: 1
key: 5, value: 今天的<--刘向前-->很美很美哦~, hashCode: 53, partition: 2
key: 6, value: 今天的<--周  新-->很美很美哦~, hashCode: 54, partition: 0
key: 7, value: 今天的<--周  新-->很美很美哦~, hashCode: 55, partition: 1
key: 8, value: 今天的<--刘向前-->很美很美哦~, hashCode: 56, partition: 2
key: 9, value: 今天的<--杨柳-->很美很美哦~, hashCode: 57, partition: 0
key: 10, value: 今天的<--姚慧莹-->很美很美哦~, hashCode: 1567, partition: 1

消费者终端输出:

3   0   3   今天的<--姚慧莹-->很美很美哦~
4   0   6   今天的<--周  新-->很美很美哦~
5   0   9   今天的<--杨柳-->很美很美哦~
0   2   2   今天的<--杨柳-->很美很美哦~
1   2   5   今天的<--刘向前-->很美很美哦~
2   2   8   今天的<--刘向前-->很美很美哦~
1   1   1   今天的<--刘向前-->很美很美哦~
2   1   4   今天的<--周  新-->很美很美哦~
3   1   7   今天的<--周  新-->很美很美哦~
4   1   10  今天的<--姚慧莹-->很美很美哦~
(分别是:offset partition key value)

原文地址:http://blog.51cto.com/xpleaf/2090936

时间: 2024-10-18 19:22:22

Kafka笔记整理(二):Kafka Java API使用的相关文章

_00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)

博文作者:妳那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术 转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作! qq交流群:214293307  (期待与你一起学习,共同进步) # Kfaka的体系结构 # 学习前言 Kafka的整个学习过程就是自己看官网的文档,出

Kafka笔记整理(一)

[TOC] Kafka笔记整理(一) Kafka简介 消息队列(Message Queue) 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据.例如说:文本.音乐.视频等内容. 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部追加元素.入队.出队. 消息队列 MQ 消息+队列,保存消息的队列.消息的传输过程中的容器:主要提供生产.消费接口供外部调用做数据的存储和获取. MQ分类 MQ主要分为两类:点对点(p2p).发布订阅(P

Kafka笔记整理(三):消费形式验证与性能测试

[TOC] Kafka笔记整理(三):消费形式验证与性能测试 Kafka消费形式验证 前面的<Kafka笔记整理(一)>中有提到消费者的消费形式,说明如下: 1.每个consumer属于一个consumer group,可以指定组id.group.id 2.消费形式: 组内:组内的消费者消费同一份数据:同时只能有一个consumer消费一个Topic中的1个partition: 一个consumer可以消费多个partitions中的消息.所以,对于一个topic,同一个group中推荐不能有

Hadoop读书笔记(三)Java API操作HDFS

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 JAVA URL 操作HDFS OperateByURL.java package hdfs; import java.io.InputStream; import jav

笔记:MyBatis 使用 Java API配置

我们已经讨论了各种MyBatis配置元素,如envronments.typeAlias和typeHandlers,以及如何使用XML配置它们.即使你想使用基于JavaAPI的MyBatis配置,MyBatis的SqlSessionFactory接口除了使用基于XML的配置创建外也可以通过Java API 编程式地被创建.每个在XML中配置的元素,都可以编程式的创建.使用Java API创建SqlSessionFactory,代码如下: ? ? ????public?static?SqlSessi

HBase学习(十二)Java API 与HBase交互实例

HBase提供了Java Api的访问接口,掌握这个就跟Java应用使用RDBMS时需要JDBC一样重要 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoo

Java 学习整理笔记(二)Java基本语法结构

一.源文件的编写 Java是完全面向对象的语言,所以Java的所有操作都是基于类(class)完成的.Java中所有程序的代码都需要放在一个类中,类用关键字class声明,在class之前可以添加一些修饰符,Java应用程序的源文件由若干个书写形式相互独立的类组成. 1.例子: // HelloDate.java import java.util.Date; //引入 java.util 包中的Date类 public class HelloDate{ /* * 该程序的目的是:输出当前系统的时

【Java数据结构学习笔记之二】Java数据结构与算法之队列(Queue)实现

  本篇是数据结构与算法的第三篇,本篇我们将来了解一下知识点: 队列的抽象数据类型 顺序队列的设计与实现 链式队列的设计与实现 队列应用的简单举例 优先队列的设置与实现双链表实现 队列的抽象数据类型   队列同样是一种特殊的线性表,其插入和删除的操作分别在表的两端进行,队列的特点就是先进先出(First In First Out).我们把向队列中插入元素的过程称为入队(Enqueue),删除元素的过程称为出队(Dequeue)并把允许入队的一端称为队尾,允许出的的一端称为队头,没有任何元素的队列

使用Visual Studio Code开发Asp.Net Core WebApi学习笔记(二)-- Web Api Demo

在上一篇里,我已经建立了一个简单的Web-Demo应用程序.这一篇将记录将此Demo程序改造成一个Web Api应用程序. 一.添加ASP.NET Core MVC包 1. 在project.json文件添加Microsoft.AspNetCore.Mvc包 1 { 2 "version": "1.0.0-*", 3 "buildOptions": { 4 "debugType": "portable",