!!!
RocketMQ在发消息之前会先从当前订阅信息中选择出一个比较合适的MessageQueue,本文主要针对MessageQueue以及在选取MessageQueue时相关的故障延时策略展开说明
MessageQueue选择
MessageQueue选取相关逻辑时在消息发送是调用sendDefaultImpl方法逻辑中,在进行MessageQueue选择之前,会先获取当前的消息对应Topic的订阅信息(TopicPublishInfo),获取Topic订阅信息的逻辑如下:
1 2 3
| TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
|
获取MessageQueue相关逻辑如下(精简掉不相关逻辑):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| MessageQueue mq = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); } }
|
在上述逻辑中,获取MessageQueue的时候,入参有两个,第一个是topicPublishInfo(上面有介绍),还有一个入参是lastBrokerName,该参数的作用是为了后续重试的时候,选择不同的Broker。因为当第一次消息发送失败的时候,可能是Broker节点异常或者Broker busy等异常引起的,为了提高消息发送的成功率,在消息发送重试逻辑中会选择其它的Broker来提升消息发送的成功率(容错机制)。
selectOneMessageQueue逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } }
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } return tpInfo.selectOneMessageQueue(lastBrokerName); }
|
针对if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { 判断逻辑的思考:
个人理解:参考后面 selectOneMessageQueue(lastBrokerName) 的逻辑,应该将 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) 改成 if (null == lastBrokerName || !mq.getBrokerName().equals(lastBrokerName)) 会比较合理一点,但是这个判断逻辑在高版本中已经被删除了,直接返回了对应的 MessageQueue。
针对这个判断逻辑的删除,个人有如下猜想:因为在该逻辑中开启了延时机制控制,已经做了是否可用的判断(isAvailable),所以在这个地方已经做了延时机制控制的基础上不需要在对当前 broker 和 lastBroker 进行再次的判断。
选择MessageQueue逻辑 selectOneMessageQueue(final String lastBrokerName) :
在该逻辑中给,先会判断 lastBrokerName 是否为空,lastBrokerName 主要是用于重试的时候,在后面重试逻辑中尽量避开之前下发消息失败的那个Broker,提升消息发送的成功率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { int index = this.sendWhichQueue.getAndIncrement(); for (int i = 0; i < this.messageQueueList.size(); i++) { int pos = Math.abs(index++) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
|
selectOneMessageQueue:
在该逻辑中,也运用到了容错负载思想,在进行调用的时候是通过随机一个整数来计算的,这样可以避免掉固定值在进行第一次选择的时候选择的都是同一个Queue的情况
1 2 3 4 5 6 7 8 9 10
| public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
|
故障延时
在RocketMQ中,故障延时是针对Broker的一种保护机制,会对应有三个参数,分别如下:
1 2 3
| private boolean sendLatencyFaultEnable = false; private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
|
sendLatencyFaultEnable:延时机制开关
默认是false
latencyMax:延时级别
在消息发送的过程中,会计算出当前消息的延时,然后通过计算的延时后,找到对应的延时区间
notAvailableDuration:不可用时长级别
该参数和latencyMax是一一对应的,根据当前的延时级别可以找到对应的不可用时长
从该参数配置可以看出,前面两个级别为Rocket认为消息发送比价合理的延时级别,因为对应配置的不可用时长都是0
在消息发送的逻辑中,是通过FaultItem 类来维护具体的延时以及broker对应的可用时间戳字段的,该类中维护了三个字段,分别是name(broker名称)、currentLatency(当前延时时长)、startTimestamp(Broker可用时间节点的时间戳) = 当前时间时间戳 + 不可用延时时长,具体逻辑如下:
维护FaultItem字段逻辑:
1 2 3 4 5 6 7 8
| public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
|
在调用该逻辑的时候,有一个布尔类型的参数 isolation 字段,该参数是用于控制延时时长隔离的表示,如果设置的true,则延时时长固定为30000,否则按照实际的延时处理
computeNotAvailableDuration逻辑如下:
1 2 3 4 5 6 7 8
| private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
|
latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration) 逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
|
后续在判断可用的时候,就是通过当前时间戳计算的 startTimestamp进行比较的
具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; }
public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
|