Kafka系列之-自定义Producer

  前面已经讲到了,在Kafka中,Message是由Producer产生的,Producer产生的Message会发送到Topic的指定Partition中。Producer可以有多种形式,也可以由用户通过Java,C以及Python语言来自定义。

  Kafka中Producer的主要作用和地位如下图所示,Producer通过获取某个Topic指定Partition的Leader节点连接到Kafka集群中,

一、Java Producer API

  用户可以基于Kafka提供的API自定义Producer,在这些API中有几个主要的类:

1. kafka.javaapi.producer.Producer

  类定义:

class Producer[ K,V ](private val underlying: kafka.producer.Producer[K ,V])

  UML图:

  

2. kafka.producer.ProducerConfig

  类定义:   

class ProducerConfig private (val props: VerifiableProperties)
        extends AsyncProducerConfig with SyncProducerConfigShared

  UML图:

  

3. kafka.producer.KeyedMessage

  类定义:

case class KeyedMessage[ K, V ](val topic: String, val key: K, val partKey: Any , val message: V)

二、自定义简单的Producer

  接下来根据上面的三个类,使用Java代码实现一个简单的Producer向Broker发送Message。这个Producer会为特定的Topic生成Message并发送到默认的Partition中。

  具体代码和过程在代码和注释中。

1、Java代码

package ckm.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;

/**
 * 一个简单的Kafka Producer类,传入两个参数:
 * topic num
 * 设置主题和message条数
 *
 * 执行过程:
 * 1、创建一个topic
 * kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic xxxx
 * 2、运行本类中的代码
 * 3、查看message
 * kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx
 * kafka
 */
public class SimpleKafkaProducer {
    /**
     * Producer的两个泛型,第一个指定Key的类型,第二个指定value的类型
     */
    private static Producer<String, String> producer;

    public SimpleKafkaProducer() {
        Properties props = new Properties();
        /**
         * 指定producer连接的broker列表
         */
        props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");
        /**
         * 指定message的序列化方法,用户可以通过实现kafka.serializer.Encoder接口自定义该类
         * 默认情况下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /**
         * 这个参数用于通知broker接收到message后是否向producer发送确认信号
         *  0 - 表示producer不用等待任何确认信号,会一直发送消息,
         * 否则producer进入等待状态
         * -1 - 表示leader状态的replica需要等待所有in-sync状态的replica都接收到消息后才会向producer发送确认信号,
         * 再次之前producer一直处于等待状态
         */
        props.put("request.required.acks", "1");
        ProducerConfig config = new ProducerConfig(props);

        producer = new Producer<String, String>(config);
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.out.println("Please Input Topic and Message Numbers");
        }
        String topic = (String) args[0];
        int count = Integer.parseInt((String) args[1]);
        System.out.println("Topic = " + topic);
        System.out.println("Message Nums = " + count);

        SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer();
        simpleProducer.publishMessage(topic, count);
    }

    /**
     * 根据topic和消息条数发送消息
     * @param topic
     * @param count
     */
    private void publishMessage(String topic, int count) {
        for (int i = 0; i < count; i ++) {
            String runtime = new Date().toString();
            String msg = "Message published time - " + runtime;
            System.out.println("msg = " + msg);
            /**
             * 第一个泛型指定用于分区的key的类型,第二个泛型指message的类型
             * topic只能为String类型
             */
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);
            producer.send(data);
        }
        producer.close();
    }
}

2、运行

(1)启动ZooKeeper

$ZK_HOME/bin/zkServer.sh start

(2)启动Kafka集群

cd $KAFKA_HOME
nohup bin/kafka-server-start.sh config/server.properties &

(3)创建测试Topic

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 1 --partition 3 --topic simple-kafka-producer

(4)运行SimpleKafkaProducer 代码

  运行该代码,向simple-kafka-producer Topic发送10条Message

java -cp KafkaTestProgram.jar ckm.kafka.producer.SimpleKafkaProducer simple-kafka-producer 10

(5)查看simple-kafka-producer中的Message

bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic simple-kafka-producer

三、自定义Partition的Producer

  这一节中除了实现Producer之外,还自定义了Message的Partition划分过程。

  在这里,将会模拟一个网页访问日志生成的过程,每条随机生成的日志Message中包含三个部分的信息:

- 页面访问时间戳

- 页面名称

- 访问页面的IP地址

  

1、Java代码

(1)Producer

package ckm.kafka.producer;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

/**
 * 一个自定义分区的Kafka Producer类,传入两个参数:
 * topic num
 * 设置主题和message条数
 *
 * 模拟用户点击日志,日志格式为:“时间,网址,IP地址"格式
 *
 * 自定义分区,通过IP地址最后一位与分区数求余,message分散到0~partition - 1这些分区中
 *
 * 执行过程:
 * 1、创建一个topic
 * kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic xxxx
 * 2、运行本类中的代码
 * 3、查看message
 * kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic xxxx
 * kafka
 */
public class KafkaProducerWithPartition {
    /**
     * Producer的两个泛型,第一个指定Key的类型,第二个指定value的类型
     */
    private static Producer<String, String> producer;

    public KafkaProducerWithPartition() {
        Properties props = new Properties();
        /**
         * 指定producer连接的broker列表
         */
        props.put("metadata.broker.list", "m000:9092, m001:9092, m002:9092");
        /**
         * 指定message的序列化方法,用户可以通过实现kafka.serializer.Encoder接口自定义该类
         * 默认情况下message的key和value都用相同的序列化,但是可以使用"key.serializer.class"指定key的序列化
         */
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        /**
         * 这个参数用于通知broker接收到message后是否向producer发送确认信号
         *  0 - 表示producer不用等待任何确认信号,会一直发送消息
         *  1 - 表示leader状态的replica在接收到message后需要向producer发送一个确认信号,否则producer进入等待状态
         * -1 - 表示leader状态的replica需要等待所有in-sync状态的replica都接收到消息后才会向producer发送确认信号,再次之前producer一直处于等待状态
         */
        props.put("request.required.acks", "1");
        /**
         * 指定partition类,自定义的分区类,继承自kafka.producer.Partitioner接口
         */
        props.put("partitioner.class", "ckm.kafka.producer.SimplePartitioner");
        ProducerConfig config = new ProducerConfig(props);

        producer = new Producer<String, String>(config);
    }

    public static void main(String[] args) {
        if (args.length < 2) {
            System.out.println("Please Input Topic and Message Numbers");
        }
        String topic = (String) args[0];
        int count = Integer.parseInt((String) args[1]);
        System.out.println("Topic = " + topic);
        System.out.println("Message Nums = " + count);

        KafkaProducerWithPartition simpleProducer = new KafkaProducerWithPartition();
        simpleProducer.publishMessage(topic, count);
    }

    /**
     * 根据topic和消息条数发送消息
     * @param topic
     * @param count
     */
    private void publishMessage(String topic, int count) {
        Random random = new Random();
        for (int i = 0; i < count; i ++) {
            String runtime = new Date().toString();
            // 访问的IP地址
            String clientIP = "192.168.1." + random.nextInt(255);
            String msg = runtime + ",kafka.apache.org," + clientIP;
            System.out.println("msg = " + msg);
            /**
             * 第一个泛型指定用于分区的key的类型,第二个泛型指message的类型
             * topic只能为String类型
             * 和上一个Producer相比,多了一个用于分区的key
             */
            KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, clientIP, msg);
            producer.send(data);
        }
        producer.close();
    }
}

(2)Partitioner

package ckm.kafka.producer;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

/**
 * Created by ckm on 2016/8/3.
 */
public class SimplePartitioner implements Partitioner {
    /**
     * 不写这个方法,会报错
     * Exception in thread "main" java.lang.NoSuchMethodException: ckm.kafka.producer.SimplePartitioner.<init>(kafka.utils.VerifiableProperties)
     * at java.lang.Class.getConstructor0(Class.java:2892)
     * at java.lang.Class.getConstructor(Class.java:1723)
     * at kafka.utils.Utils$.createObject(Utils.scala:436)
     * at kafka.producer.Producer.<init>(Producer.scala:61)
     * at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
     * at ckm.kafka.producer.KafkaProducerWithPartition.<init>(KafkaProducerWithPartition.java:58)
     * at ckm.kafka.producer.KafkaProducerWithPartition.main(KafkaProducerWithPartition.java:70)
     * @param verifiableProperties
     */
    public SimplePartitioner(VerifiableProperties verifiableProperties) {

    }

    public int partition(Object key, int numPartitions) {
        int partition = 0;
        String partitionKey = (String) key;
        int offset = partitionKey.lastIndexOf(‘.‘);
        if (offset > 0) {
            partition = Integer.parseInt(partitionKey.substring(offset + 1)) % numPartitions;
        }
        return partition;
    }
}

2、运行

  由于前面已经启动了ZooKeeper以及Kafka,这里直接从创建Topic开始

(1)创建Topic

  创建一个partition为3,replication为3的topic。

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper m000:2181 --replication-factor 3 --partitions 3 --topic partition-kafka-producer

  如何使用list命令查看该Topic,可以参考前面的示例

 (2)运行Java代码

java -cp KafkaTestProgram.jar ckm.kafka.producer.KafkaProducerWithPartition partition-kafka-producer 100

  往partition-kafka-producer Topic中写入100条随机生成的Message。

(3)查看这些Message

$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper m000:2181 --from-beginning --topic partition-kafka-producer

四、自定义Producer的封装

  上面两种自定义的Producer中,其实有很多代码是重复性的。接下来对Kafka自定义Producer进行一定的封装,使其使用和配置更加简便。

  经过封装后,producer有关的参数都写在properties文件中。

  第二步中的Producer的调用方法为:

KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
kafkaProducerTool.publishMessage("test message");

  两行代码就可以将该message发送到配置的Kafka集群指定的topic中。

  第三步中的自定义Partitioner的Producer的调用方法为:

KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl();
Properties producerProperties = kafkaProducerTool.getProducerProperties();
// 如果properties配置文件中没有配置该参数的话,手动设置
producerProperties.put("partitioner.class", "SimplePartitioner");
kafkaProducerTool.publishPartitionedMessage("partition-key", "test messate");

  具体代码可以参考KafkaProducerTool

  欢迎提出宝贵意见。

时间: 2024-10-29 19:07:43

Kafka系列之-自定义Producer的相关文章

Kafka系列之-Kafka监控工具KafkaOffsetMonitor配置及使用

KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便.源项目Github地址为:https://github.com/quantifind/KafkaOffsetMonitor. 最简单的使用方式是从Github上下载一个最新的KafkaOffsetMonitor-assembly-0.2.1.jar,上传到某服务器上,然后执行一句命令就可以运行起来.但是在使用过程中有可能会发现页面反应缓慢或者无法显示相应内容的情况

apache kafka系列之Producer处理逻辑

最近研究producer的负载均衡策略,,,,我在librdkafka里边用代码实现了partition 值的轮询方法,,,但是在现场验证时,他的负载均衡不起作用,,,所以来找找原因: 下文是一篇描写kafka处理逻辑的文章,转载过来,研究一下. apache kafka系列之Producer处理逻辑 标签: Kafka ProducerKafka Producer处理逻辑kafka生产者处理逻辑apache kafka系列 2014-05-23 11:42 3434人阅读 评论(2) 收藏 举

Apache Kafka系列(五) Kafka Connect及FileConnector示例

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 Apache Kafka系列(五) Kafka Connect及FileConnector示例 一. Kafka Connect简介 Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析).为何集成其他系统和解耦应用,经常使用Producer来发送消

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

Apache Kafka系列(四) 多线程Consumer方案

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 Apache Kafka系列(四) 多线程Consumer方案 本文的图片是通过PPT截图出的,读者如果修改意见请联系我 一.Consumer为何需要实现多线程 假设我们正在开发一个消息通知模块,该模块允许用户订阅其他用户发送的通知/消息.该消息通知模块采用Apache Kafka,那么整个架构应该是消息的发布者通过Producer调用AP

Apache Kafka系列(二) 命令行工具(CLI)

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka命令行工具(Command Line Interface,CLI),下文简称CLI. 1. 启动Kafka 启动Kafka需要两步: 1.1. 启动ZooKeeper [[email protected] kafka_2.12-0.11.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties 1.2.

kafka C客户端librdkafka producer源码分析

简介 kafka网站上提供了C语言的客户端librdkafka,地址在这. librdkafka是使用C语言根据apache kafka 协议实现的客户端.另外这个客户端还有简单的c++接口.客户端作者对这个客户端比较上心,经常会修改bug并提交新功能. librdkafka的基本原理和我之前博客说的java版producer类似,一个线程向队列中加数据,另一个线程通过非阻塞的方式从队列中取出数据,并写入到broker. 源码分析 源码包含两个文件夹src和src-cpp src是用c实现的源码

SharePoint 2013 Designer系列之自定义列表表单

在SharePoint的使用中,默认的样式过于单调经常困扰着我们,其实,SharePoint使用Designer工具,可以很轻松解决这一问题,制作出各式各样漂亮的页面.下面,让我们简单介绍下这一过程. 1.首先创建一个测试列表,里面的字段如下: 2.看一眼默认的Dispform.aspx页面,对于新闻来说,基本属于不能看: 3.使用SharePoint Designer 2013打开列表,找到Dispform.aspx页面,右键在高级模式下编辑该文件,如下图: 4.一般都是隐藏默认的td,然后新

SpringMVC经典系列-14自定义SpringMVC的拦截器---【LinusZhu】

注意:此文章是个人原创,希望有转载需要的朋友们标明文章出处,如果各位朋友们觉得写的还好,就给个赞哈,你的鼓励是我创作的最大动力,LinusZhu在此表示十分感谢,当然文章中如有纰漏,请联系[email protected],敬请朋友们斧正,谢谢. 这部分主要讲解SpringMVC的拦截器的部分,会带着大家完成定义拦截器的两种方式的实例,不多说了,开始-- SpringMVC的拦截器主要是用于拦截用户的请求,并且进行相应的处理,如:权限验证.判断登录等. 定义拦截器的两种方式,如下: 1. 实现接