Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析

一.概述

kafka默认使用DefaultPartitioner类作为默认的partition策略规则,具体默认设置是在ProducerConfig类中(如下图)

二.DefaultPartitioner.class 源码分析

1.类关系图

2.源码分析

public class DefaultPartitioner implements Partitioner {
    //缓存map key->topic  value->RandomNumber 随机数
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    //实现Configurable接口里configure方法,
    public void configure(Map<String, ?> configs) {}

    //策略核心方法
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //根据topic获取对应的partition
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //如果key是null的话
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            //获取可用的分区数量
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //如果存在可用的分区
            if (availablePartitions.size() > 0) {
                //消息随机分布到topic的可用partition中
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                //不存在可用分区 随机分配一个不可用的partition中
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            //使用自己的 hash 算法对 key 取 hash 值,使用 hash 值与 partition 数量取模,从而确定发送到哪个分区。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        //获取topic 获取对应的随机数
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            //获取一个随机值
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            //缓存到topicCounterMap中
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        //获取 并且实现自增,最终效果是实现轮训插入partition
        return counter.getAndIncrement();
    }

    public void close() {}

}

三.自定义Partition

自定义selfPartitioner类,并且实现Partitioner接口,重写partition和close方法

public class SelfPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //TODO 自定义数据分区策略
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

四.生产者中使用自定义的Partition

在初始化producer时候,配置项中指定对应的partitioner

props.put("partitioner.class", "org.jake.partitioner.selfPartitioner");

原文地址:https://www.cnblogs.com/jakaBlog/p/11956940.html

时间: 2024-08-13 20:55:46

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析的相关文章

django的RBAC认证z;自定义auth_user表;认证组件权限组件源码分析;认证组件;权限组件

一 RBAC 1.RBAC:全称(Role-Based Access Control):指的是基于用户权限访问控制的认证. 2.Django框架采用的是RBAC认证规则,RBAC认证规则通常会分为:三表规则,五表规则:Django采用的是六表规则. # 三表:用户表.角色表.权限表# 五表:用户表.角色表.权限表.用户角色关系表.角色权限关系表# 六表:用户表.角色表.权限表.用户角色关系表.角色权限关系表.用户权限关系表 3.在Django中六表之间是都是多对多的关系,可通过下面字段跨表访问

ZGC gc策略及回收过程-源码分析

源码文件:/src/hotspot/share/gc/z/zDirector.cpp 一.回收策略 main入口函数: void ZDirector::run_service() { // Main loop while (_metronome.wait_for_tick()) { sample_allocation_rate(); const GCCause::Cause cause = make_gc_decision(); if (cause != GCCause::_no_gc) { Z

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada

从源码分析如何优雅的使用 Kafka 生产者

前言 在大量消息的情况下 Kakfa 是如何保证消息的高效及一致性呢? 正好以这个问题结合 Kakfa 的源码讨论下如何正确.高效的发送消息. 内容较多,对源码感兴趣的朋友请系好安全带??(源码基于 v0.10.0.0 版本分析).同时最好是有一定的 Kafka 使用经验,知晓基本的用法. 简单的消息发送 在分析之前先看一个简单的消息发送是怎么样的.(以下代码基于 SpringBoot 构建.) 首先创建一个 org.apache.kafka.clients.producer.Producer

Kafka源码分析及图解原理之Producer端

一.前言 任何消息队列都是万变不离其宗都是3部分,消息生产者(Producer).消息消费者(Consumer)和服务载体(在Kafka中用Broker指代).那么本篇主要讲解Producer端,会有适当的图解帮助理解底层原理. 一.开发应用 首先介绍一下开发应用,如何构建一个KafkaProducer及使用,还有一些重要参数的简介. 1.1 一个栗子 1 /** 2 * Kafka Producer Demo实例类. 3 * 4 * @author GrimMjx 5 */ 6 public

cglib源码分析2 ----- Class name 生成策略

一.如何获取动态生成的class 字节码 结合生成的class文件是一个学习cglib的比较好的方法.在cglib中,生成的class文件默认只存储在内存中,我们可以在代码中加入下面语句来获取class file. System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY, "F:\\code "); 这里涉及到了一个比较关键的类DebuggingClassWriter,接下来我们简单的分析一下这个类.Debuggi

apache kafka源码分析走读-Producer分析

apache kafka中国社区QQ群:162272557 producer的发送方式剖析 Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式. sync架构图 async架构图 调用流程如下: 代码流程如下: Producer:当new Producer(new ProducerConfig()),其底层实现,实际会产生两个核心类的实例:Producer.DefaultEventHandler.在创建的同时,会默认new一个Prod

apache kafka源码分析-Producer分析---转载

原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读1.Kafka提供了Producer类作为java producer的api,此类有几种发送方式?2.总结调用producer.send方法包含哪些流程?3.Producer难以理解的在什么地方? producer的发送方式剖析Kafka提供了Producer类作为java producer的api,该类有sync和async两种发送方式.sync架构图 async架构图 调用流程如下:

kafka C客户端librdkafka producer源码分析

简介 kafka网站上提供了C语言的客户端librdkafka,地址在这. librdkafka是使用C语言根据apache kafka 协议实现的客户端.另外这个客户端还有简单的c++接口.客户端作者对这个客户端比较上心,经常会修改bug并提交新功能. librdkafka的基本原理和我之前博客说的java版producer类似,一个线程向队列中加数据,另一个线程通过非阻塞的方式从队列中取出数据,并写入到broker. 源码分析 源码包含两个文件夹src和src-cpp src是用c实现的源码