Kafka 分区分配计算(分区器 Partitions )

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可。在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区。读者有可能刚睡醒,看到这个ProducerRecord似曾相识,没有关系,先看段Kafka生产者的示例片段:

Producer<String,String> producer = new KafkaProducer<String,String>(properties);
String message = "kafka producer demo";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
try {
    producer.send(producerRecord).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

没错,ProducerRecord只是一个封装了消息的对象而已,ProducerRecord一共有5个成员变量,即:

private final String topic;//所要发送的topic
private final Integer partition;//指定的partition序号
private final Headers headers;//一组键值对,与RabbitMQ中的headers类似,kafka0.11.x版本才引入的一个属性
private final K key;//消息的key
private final V value;//消息的value,即消息体
private final Long timestamp;//消息的时间戳,可以分为Create_Time和LogAppend_Time之分,这个以后的文章中再表。123456

在KafkaProducer的源码(1.0.0)中,计算分区时调用的是下面的partition()方法:

/**
 * computes partition for given record.
 * if the record has partition returns the value otherwise
 * calls configured partitioner class to compute the partition.
 */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

可以看出的确是先判断有无指明ProducerRecord的partition字段,如果没有指明,则再进一步计算分区。上面这段代码中的partitioner在默认情况下是指Kafka默认实现的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

由上源码可以看出partition的计算方式:

  • 如果key为null,则按照一种轮询的方式来计算分区分配
  • 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。

KafkaProducer中还支持自定义分区分配方式,与org.apache.kafka.clients.producer.internals.DefaultPartitioner一样首先实现org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class为对应的自定义分区器(Partitioners)即可,即:

properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");

自定义DemoPartitioner主要是实现Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的计算方式,详细参考如下:

public class DemoPartitioner implements Partitioner {

    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes || keyBytes.length<1) {
            return atomicInteger.getAndIncrement() % numPartitions;
        }
        //借用String的hashCode的计算方式
        int hash = 0;
        for (byte b : keyBytes) {
            hash = 31 * hash + b;
        }
        return hash % numPartitions;
    }

    @Override
    public void close() {}
}

这个自定义分区器的实现比较简单,读者可以根据自身业务的需求来灵活实现分配分区的计算方式,比如:一般大型电商都有多个仓库,可以将仓库的名称或者ID作为Key来灵活的记录商品信息。

原文地址:https://blog.51cto.com/14230003/2394305

时间: 2024-10-04 02:44:02

Kafka 分区分配计算(分区器 Partitions )的相关文章

Kafka分区分配策略(Partition Assignment Strategy

问题 用过 Kafka 的同学用过都知道,每个 Topic 一般会有很多个 partitions.为了使得我们能够及时消费消息,我们也可能会启动多个 Consumer 去消费,而每个 Consumer 又会启动一个或多个streams去分别消费 Topic 里面的数据.我们又知道,Kafka 存在 Consumer Group 的概念,也就是 group.id 一样的 Consumer,这些 Consumer 属于同一个Consumer Group,组内的所有消费者协调在一起来消费订阅主题(su

动态分区分配

一.目的 加深对动态分区分配的理解,进一步掌握首次适应算法和最佳适应算法的理解.了解动态分区分配方式中使用的数据结构和分配算法,进一步加深对动态分区存储管理方式及其实现过程的理解.提高学生设计实验.发现问题.分析问题和解决问题的能力. 学会可变式分区管理的原理是在处理作业过程中建立分区,使分区大小正好适合作业的需求. 当一个作业执行完成后,作业所占的分区应归还给系统. 二.原理 首次适应算法 以空闲分区链为例来说明采用FF算法时的分配情况.FF算法要求空闲分区链以地址递增的次序链接.在分配内存时

iOS开发中的内存分配与分区

iOS开发中的内存分配与分区 关于RAM&ROM RAM与ROM就是具体的存储空间,统称为存储器. RAM(random access memory):运行内存,CPU可以直接访问,读写速度非常快,但是不能掉电存储.它又分为: 动态DRAM,速度慢一点,需要定期的刷新(充电),我们常说的内存条就是指它,价格会稍低一点,手机中的运行内存也是指它. 静态SRAM,速度快,我们常说的一级缓存,二级缓存就是指它,当然价格高一点. ROM(read only memory):存储性内存,可以掉电存储,例如

使用动态分区分配方式的模拟

1实验目的 (1)了解动态分区分配方式中使用的数据结构和分配算法 (2)加深对动态分区存储管理方式及其实现过程的理解. 2实验内容 (1)分别实现采用首次适应算法和最佳适应算法的动态分区分配过程alloc()和回收过程free().其中,空闲分区通过空闲分区链来管理:在进行内存分配时,系统优先使用空闲区低端的空间. (2)假设初始状态下,可用的内存空间为640KB,并有下列的请求序列: •作业1申请130KB. •作业2申请60KB. •作业3申请100KB. •作业2释放60KB. •作业4申

操作系统——动态分区分配方式模拟

这里直接给出代码,如果不理解请参考左万历版<计算机操作系统教程>,先在给出四中模拟算法. 1.   设计目的 了解动态分区分配中使用的数据结构和分配算法,并进一步加深对动态分区存储管理方式及其实现过程的理解. 2.   设计内容 1)用C语言实现采用首次适应算法的动态分区分配过程alloc()和回收过程free().其中,空闲分区通过空闲分区链表来管理,在进行内存分配时,系统优先使用空闲区低端的空间. 2)假设初始状态如下,可用的内存空间为640KB,并有下列的请求序列: 作业1申请130KB

实验三:内存动态分区分配

内存动态分区分配和回收的模拟实现 实验目的 加深对内存管理的理解,进而对连续分配和离散分配有更深刻的认识. 通过内存管理,进一步理解进程等操作系统概念. 实验内容 模拟实现内存动态分区分配和回收 建立一个长度为1024的一维数组,用以模拟内存. 建立空闲分区表或空闲分区链,用来记录内存的使用情况. 为请求进入内存的作业分配内存,回收运行完成的作业所占内存. 键盘输入:表示一个请求进入内存的作业的三元组: (作业号(0–9),作业大小(1-1024),作业运行时间) 程序接受键盘输入,进行内存分配

mysql表分区、查看分区

原文地址:http://blog.csdn.net/feihong247/article/details/7885199 一.       mysql分区简介 数据库分区 数据库分区是一种物理数据库设计技术.虽然分区技术可以实现很多效果,但其主要目的是为了在特定的SQL操作中减少数据读写的总量以缩减sql语句的响应时间,同时对于应用来说分区完全是透明的. MYSQL的分区主要有两种形式:水平分区和垂直分区 水平分区(HorizontalPartitioning) 这种形式的分区是对根据表的行进行

Oracle 表分区与索引分区

分区技术简介 Oracle是最早支持物理分区的数据库管理系统供应商,表分区的功能是在Oracle 8.0版本推出的.分区功能能够改善应用程序的性能.可管理性和可用性,是数据库管理中一个非常关键的技术.尤其在今天,数据库应用系统的规模越来越大,还有海量数据的数据仓储系统,因此,几乎所有的Oracle数据库都使用分区功能来提高查询的性能,并且简化数据库的日常管理维护工作. 那么使用分区技术有哪些优点呢?具体如下: 减少维护工作量,独立管理每个分区比管理单个大表要轻松得多. 增强数据库的可用性,如果表

如何为Kafka集群选择合适的Partitions数量

转载http://blog.csdn.net/odailidong/article/details/52571901 这是许多kafka使用者经常会问到的一个问题.本文的目的是介绍与本问题相关的一些重要决策因素,并提供一些简单的计算公式. 文章目录 1 越多的分区可以提供更高的吞吐量 2 越多的分区需要打开更多地文件句柄 3 更多地分区会导致更高的不可用性 4 越多的分区可能增加端对端的延迟 5 越多的partition意味着需要客户端需要更多的内存 6 总结 越多的分区可以提供更高的吞吐量 首