rocketmq发送消息的期间的broker选择

DefaultMQProducerImpl文件中有一个sendDefaultImpl,发送消息的时候就是从这里走的,路由信息怎么拿的 这里就不展开讲了。在这个方法里面,同步模式下,消息一次没有发送成功就会按照重试次数继续走selectOneMessageQueue逻辑进行重试。
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;

这里的selectOneMessageQueue的其实内部调用MQFaultStrategy内部对象的selectOneMessageQueue:

我个人看来,这个估算功能倒不是特别重要,所以mq默认是不使用这个逻辑,不过这个不妨碍我们研究下。下面是MQFaultStrategy的selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

  

如果sendLatencyFaultEnable是false,默认也是false。那么每次所有队列号+1取出消息队列(消息队列说白了就是每个broker单位有一个队列,队列长度由每个broker配置指定)里面的消息,同时剔除掉上次失败的brokername。

这里有一个问题是,如果只有两个broker那么可以解决大部分问题,但是如果broker很多,那么我们希望mq有一个时间维度上、可以估算出来一个broker什么时候可用。尤其对于rocketmq来说,因为broker发生变化的时候,producer不是第一时间被通知,而是异步轮训得到的。另外nameserver跟broker之间也是异步轮询探活。

打开sendLatencyFaultEnable的话,也就是在发送消息前,估算下这个broker是否可用的,如果是可用的那么直接返回。上面代码:

if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

我感觉应该是写错了,应该是mq.getBrokerName().notEquals(lastBrokerName)

这里有一个调用latencyFaultTolerance.isAvailable来判断broker是否可用,这个怎么来的呢?

实际上,在sendDefaultImpl的时候,无论消息是否发送成功与否,都会调用producer内部MQFaultStrategy的updateFaultItem,在这里会去更新latencyFaultTolerance

下面是MQFaultStrategy一些重要成员和重要方法:

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }

  

在sendDefaultImpl的发送消息期间,只有发送成,这个isolation才是false,这个时候通过computeNotAvailableDuration拿到的duration一般就是0,否则发送消息消耗时间越大,从latencyMax拿到的序列号越大,从notAvailableDuration拿到的duration也就越大。

如果有故障,isolation是true,那么认为这个broker不可用时间是180000L,也就是3分钟

继续进入LatencyFaultToleranceImpl的updateFaultItem:

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }

  这里构造一个faultitem,顾名思义就是错误的、有问题的科目,name就是broker-name,currentLatency就是上次发送消息从开始到结束的消耗时间,starttimestamp就是估算的下次可用的时间戳。

继续看FaultItem各个重要方法:

      @Override
        public int compareTo(final FaultItem other) {
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable())
                    return -1;

                if (other.isAvailable())
                    return 1;
            }

            if (this.currentLatency < other.currentLatency)
                return -1;
            else if (this.currentLatency > other.currentLatency) {
                return 1;
            }

            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }

        public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }

  

再回到策略MQFaultStrategy的selectOneMessageQueue,结合上面的代码,如果找到一个可用broker那么直接返回。如果找不到调用pickOneAtLeast找一个差不多的返回

public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            Collections.shuffle(tmpList);

            Collections.sort(tmpList);

            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
                final int i = this.whichItemWorst.getAndIncrement() % half;
                return tmpList.get(i).getName();
            }
        }

        return null;
    }

  faultiitem已经支持按照好坏排序,那么排好序后,从好的前半部分再进行随机选一个brokername

原文地址:https://www.cnblogs.com/notlate/p/11615878.html

时间: 2024-08-04 17:21:39

rocketmq发送消息的期间的broker选择的相关文章

rocketmq发送消息代码

DefaultMQProducer defaultMQProducer = new DefaultMQProducer(); defaultMQProducer.setProducerGroup(Constant.operationLogGroup); defaultMQProducer.setInstanceName(Constant.operationLogInstance); defaultMQProducer.setNamesrvAddr(Constant.rocketQueneAddr

rocketmq双主发送消息 SLAVE_NOT_AVAILABLE 状态

RocketMQ最佳实践之Producer 投递状态 发送消息时,将得到包含SendStatus的SendResult.首先,我们假设消息的isWaitStoreMsgOK = true(默认是true).如果不是,我们将总会得到SEND_OK,如果没有抛出异常.下面是关于每个状态的描述列表: FLUSH_DISK_TIMEOUT 如果 Broker 设置MessageStoreConfig的FlushDiskType=SYNC_FLUSH(默认是ASYNC_FLUSH),并且代理没有在Mess

解决rocketmq发送消息报错: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small

1.问题出现 搭完mq单主单从集群之后,美滋滋想发一下message, 没想到碰到一个坑爹的问题: com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small. 1 com.ali

发送消息,修改其他程序的下拉框的选择

发送消息,修改其他程序的下拉框的选择. 先说结论 public void SelectItem(int index) { COMBOBOXINFO cbi = new COMBOBOXINFO(); cbi.cbSize = System.Runtime.InteropServices.Marshal.SizeOf(cbi); if (User32.GetComboBoxInfo(Wnd, ref cbi)) { User32.SendMessage(Wnd, ComboBoxMsg.CB_SE

rocketmq简单消息发送

有以下3种方式发送RocketMQ消息 可靠同步发送 reliable synchronous 可靠异步发送 reliable asynchronous 单向发送 one-way transmission 可靠同步发送 主要运用在比较重要一点消息传递/通知等业务 public class SyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new

RocketMQ源码 — 九、 RocketMQ延时消息

上一节消息重试里面提到了重试的消息可以被延时消费,其实除此之外,用户发送的消息也可以指定延时时间(更准确的说是延时等级),然后在指定延时时间之后投递消息,然后被consumer消费.阿里云的ons还支持定时消息,而且延时消息是直接指定延时时间,其实阿里云的延时消息也是定时消息的另一种表述方式,都是通过设置消息被投递的时间来实现的,但是Apache RocketMQ在版本4.2.0中尚不支持指定时间的延时,只能通过配置延时等级和延时等级对应的时间来实现延时. 一个延时消息被发出到消费成功经历以下几

聊一聊顺序消息(RocketMQ顺序消息的实现机制)

当我们说顺序时,我们在说什么? 日常思维中,顺序大部分情况会和时间关联起来,即时间的先后表示事件的顺序关系. 比如事件A发生在下午3点一刻,而事件B发生在下午4点,那么我们认为事件A发生在事件B之前,他们的顺序关系为先A后B. 上面的例子之所以成立是因为他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间.如果A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗? 如果没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么断定A和B的顺序? 显而易见的,

RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个

问题现象 RocketMQ3.2.2版本,测试时尝试发送消息时自动创建Topic,设置了队列数量为8: producer.setDefaultTopicQueueNums(8); 同时设置broker服务器的配置文件broker.properties: defaultTopicQueueNums=16 但实际创建后从控制台及后台打印代码观察到该Topic只创建了4个队列,反复重试确认发送消息时自动创建Topic,最大创建4个队列. 查找原因 服务端与客户端配置对比 阅读源码,在TopicConf

窗口发送消息参数详解

//    窗口.发送消息    函数功能: 将指定的消息发送到一个窗口,同win32 api 里面的SendMessage等同的效果 中文函数原型: 发送消息(hwnd,msg,wparam,iparam)      英文函数原型: sendmessage(hwnd,msg,wparam,iparam) 参数: hwnd: 窗口句柄 值,可以通过,找到窗口.顶层窗口句柄,等获取句柄的函数得到msg:指定被发送的消息wparam:指定附加的消息特定信息. iparam:指定附加的消息特定信息.举