每一个record 都有一个key
其作用在于
1 作为一个record的元数据
2 用于分区,以便确定record进入到哪个,那么具体代码是如何实现的呢?我们查看下源码
partition
如果所有的message是同一个key,将会被放入同一个partition
kafka发送一个消息的流程
1 判断有没有拦截器,如果有拦截器就会执行拦截器的send
2 拿到cluster
3 对key和value序列化
4 计算此消息发送到哪个partition
partition计算过程具体如下:
第二步:找到此接口的默认实现类,我们点进去看看
第三步: 我们查看实现类
了解了源码以后,我们自己来实现一个partition
下面我们进行测试,在本机启动两个kafka,然后发送数据,测试数据会在哪个分区
第一步:我们没有使用spring,那么框架并不会把我们自定义的partition来告知kafka,那么我们如何把自己写的partition加进去呢?我们来看源码
猜测配置信息应该在kafka的配置类中,我们找到 ProducerConfig类 搜索partition开头的信息
第二步: 创建topic
第三步:编写Sender
第四步:测试
当我们把key设置为LOGOFF时候,控制台打印如下
当我们把key设置LOGIN的时候,控制台打印如下
当我们设置非法key的时候
---恢复内容结束---
原文地址:https://www.cnblogs.com/wangpipi/p/11103951.html
时间: 2024-10-08 03:02:35