RocketMQ源码解析-消息消费
1.消费者相关类
2.消费者的启动
3.消息的拉取
4.消费者的负载均衡
5.消息的消费
6.消费进度管理
看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教
RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送
那什么时候可以用pull模式呢?比如在高并发的场景下,消费端的性能可能会达到瓶颈的情况下,消费端可以采用pull模式,消费端根据自身消费情况去拉取,虽然push模式在消息拉取的过程中也会有流控(当前ProcessQueue队列有1000条消息还没有消费或者当前ProcessQueue中最大偏移量和最小偏移量超过2000将会触发流控,流控的策略就是延迟50ms再拉取消息),但是这个值在实际情况下,可能每台机器的性能都不太一样,会不好控制。
消费者相关类和API
- MQConsumer
- sendMessageBack(),If consuming failure,message will be send back to the broker,and delay consuming some time,如果消费端消费失败会把消息发送回broker端,稍后会重新消费
- fetchSubscribeMessageQueues(topic) ,Fetch message queues from consumer cache according to the topic,根据topic从消费端缓存中获取MessageQueue的Set集合
- MQPushConsumer
- registerMessageListener() 注册消息事件监听器,有并发模式和顺序模式俩种
- subscribe() 基于主题订阅消息,可以带上消息过滤表达式subExpression,TAG或者SQL92,类模式过滤会在5.0.0版本中去掉,不建议使用
- DefaultMQPushConsumer
- 先看下DefaultMQPushConsumer的重要参数,如果第一次看会不记得,后面的代码看多了慢慢会体会到这些参数的作用,类中持有DefaultMQPushConsumerImpl对象,DefaultMQPushConsumerImpl类实现了大部分的消息消费功能
//消费组
private String consumerGroup;
//消费端模式,默认为集群模式,还有一种广播模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//根据消费进度从broker拉取不到消息时采取的策略
//1.CONSUME_FROM_LAST_OFFSET 最大偏移量开始
//2.CONSUME_FROM_FIRST_OFFSET 最小偏移量开始
//3.CONSUME_FROM_TIMESTAMP 从消费者启动时间戳开始
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
//消息过滤关系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
//消息消费监听器
private MessageListener messageListener;
//消息消费进度存储器
private OffsetStore offsetStore;
//消费线程最小线程数
private int consumeThreadMin = 20;
//消费线程最大线程数,因为消费线程池用的是无界队列,所以这个参数用不上,原因请参考线程池原理
private int consumeThreadMax = 64;
//动态调整线程数量的阀值
private long adjustThreadPoolNumsThreshold = 100000;
//并发消费时拉取消息前会有流控,会判断处理队列中最大偏移量和最小偏移量的跨度,不能大于2000
private int consumeConcurrentlyMaxSpan = 2000;
//push模式下任务拉取的时间间隔
private long pullInterval = 0;
//每次消费者实际消费的数量,不是从broker端拉取的数量
private int consumeMessageBatchMaxSize = 1;
//从broker端拉取的数量
private int pullBatchSize = 32;
//是否每次拉取之后都跟新订阅关系
private boolean postSubscriptionWhenPull = false;
//消息最大消费重试次数
private int maxReconsumeTimes = -1;
//延迟将该消息提交到消费者的线程池等待时间,默认1s
private long suspendCurrentQueueTimeMillis = 1000;
//消费超时时间,15分钟
private long consumeTimeout = 15;
消费者的启动
代码入口,DefaultMQPushConsumerImpl#start()方法
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
...
this.serviceState = ServiceState.START_FAILED;
//1.检查配置信息
this.checkConfig();
//2.加工订阅信息,将Map<String /* topic*/, String/* subExtentions*/>转换为Map<String,SubscriptionData>,同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。
this.copySubscription();
进入copySubscription()方法:
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
//集群模式下给topic创建一个retry的topic,retry+comsumerGroup
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
···
原文地址:https://www.cnblogs.com/langtutengyixiao/p/10807497.html
时间: 2024-11-05 18:51:45