kafka 自定义分区器

package cn.xiaojf.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义分区方式
 */
public class CustomPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public CustomPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    /**
     * 自定义分区规则
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if(keyBytes == null) {
            int nextValue = this.nextValue(topic);
            List availablePartitions = cluster.availablePartitionsForTopic(topic);
            if(availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return ((PartitionInfo)availablePartitions.get(part)).partition();
            } else {
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
        if(null == counter) {
            counter = new AtomicInteger((new Random()).nextInt());
            AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
            if(currentCounter != null) {
                counter = currentCounter;
            }
        }

        return counter.getAndIncrement();
    }

    public void close() {
    }
}
package cn.xiaojf.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 消息生产者
 * @author xiaojf 2017/3/22 14:27
 */
public class MsgProducer extends Thread {

    private final KafkaProducer<String, String> producer;
    private final String topic;
    private final boolean isAsync;

    public MsgProducer(String topic, boolean isAsync) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.59.130:9092");//broker 集群地址
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");//自定义客户端id
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列号方式
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value 序列号方式
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数

//        properties.load("properties配置文件");

        this.producer = new KafkaProducer<String, String>(properties);
        this.topic = topic;
        this.isAsync = isAsync;
    }

    @Override
    public void run() {
        int msgNo = 0;

        while (true) {
            String msg = "Msg: " + msgNo;
            String key = msgNo + "";
            if (isAsync) {//异步
                producer.send(new ProducerRecord<String, String>(this.topic,msg));
//                producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
            } else {//同步
                producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
                        new MsgProducerCallback(System.currentTimeMillis(), key, msg));
            }
        }
    }

    /**
     * 消息发送后的回调函数
     */
    class MsgProducerCallback implements Callback {

        private final long startTime;
        private final String key;
        private final String msg;

        public MsgProducerCallback(long startTime, String key, String msg) {
            this.startTime = startTime;
            this.key = key;
            this.msg = msg;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            if (recordMetadata != null) {
                System.out.println(msg + " be sended to partition no : " + recordMetadata.partition());
            }
        }
    }

    public static void main(String args[]) {
        new MsgProducer("my-replicated-topic",true).start();//开始发送消息
    }
}
时间: 2024-12-05 19:11:50

kafka 自定义分区器的相关文章

spark自定义分区器实现

在spark中,框架默认使用的事hashPartitioner分区器进行对rdd分区,但是实际生产中,往往使用spark自带的分区器会产生数据倾斜等原因,这个时候就需要我们自定义分区,按照我们指定的字段进行分区.具体的流程步骤如下: 1.创建一个自定义的分区类,并继承Partitioner,注意这个partitioner是spark的partitioner 2.重写partitioner中的方法 override def numPartitions: Int = ??? override def

spark自定义分区器

1.spark中默认的分区器: Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数. 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的2. 参考博客:https://www.jianshu.

Kafka 分区分配计算(分区器 Partitions )

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段.在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可.在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区.读者有可能刚睡醒,看到这个ProducerRecord似

Apache Flink流分区器剖析

这篇文章介绍Flink的分区器,在流进行转换操作后,Flink通过分区器来精确得控制数据流向. StreamPartitioner StreamPartitioner是Flink流分区器的基类,它只定义了一个抽象方法: public abstract StreamPartitioner<T> copy(); 但这个方法并不是各个分区器之间互相区别的地方,定义不同的分区器的核心在于--各个分区器需要实现channel选择的接口方法: int[] selectChannels(T record,

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

SPARK之分区器

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数.RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的 原文地址:https://www.cnblogs.com/xiangyuguan/p/1

五 、 Kafka producer 拦截器(interceptor) 和 六 、Kafka Streaming案例

五 Kafka producer 拦截器(interceptor) 5.1 拦截器原理 Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定 制化控制逻辑. 对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会 对消息做一些定制化需求,比如修改消息等.同时,producer 允许用户指定多个 interceptor 按序作用于同一条消息从而形成一个拦截链(in

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析

Kafka 自定义指定消息partition策略规则及DefaultPartitioner源码分析 一.概述 kafka默认使用DefaultPartitioner类作为默认的partition策略规则,具体默认设置是在ProducerConfig类中(如下图) 二.DefaultPartitioner.class 源码分析 1.类关系图 2.源码分析 public class DefaultPartitioner implements Partitioner { //缓存map key->to

使用python创建自定义分区的topic

通常需要创建自定义分区的topic 可以使用以下方法创建名称为test,12个分区3份副本的topic from kafka.admin import KafkaAdminClient, NewTopic c = KafkaAdminClient(bootstrap_servers="localhost:9092") topic_list = [] topic_list.append(NewTopic(name="test", num_partitions=12,