com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [696094]ms, Topic: TopicTest, BrokersSent: [broker-b, null] See https://github.com/alibaba/RocketMQ/issues/50 for further details. at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:578) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1031) at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1025) at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:95)
关键代码片段
final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000; final long beginTimestamp = System.currentTimeMillis(); long endTimestamp = beginTimestamp; TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(); int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); if (tmpmq != null) { mq = tmpmq; brokersSent[times] = mq.getBrokerName(); try { sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); endTimestamp = System.currentTimeMillis(); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch ... //省略部分代码 } else { break; } } // end of for if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", // times, // (System.currentTimeMillis() - beginTimestamp), // msg.getTopic(),// Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); throw new MQClientException(info, exception); }
1. 循环几次发送几次
2. selectOneMessageQueue 返回与上一个broker不同名的broker
3. timesTotal 是brokersSent 数组
4. 某broker发送失败时,如果想要重试其他broker,需要把retryAnotherBrokerWhenNotStoreOK设置为true(默认为false)
5. 最大超时时间是在超时时间基础上增加1s(坑?)
时间: 2024-11-02 15:20:33