RocketMQ——MessageQueue选择和故障延时机制

!!!

RocketMQ在发消息之前会先从当前订阅信息中选择出一个比较合适的MessageQueue,本文主要针对MessageQueue以及在选取MessageQueue时相关的故障延时策略展开说明

MessageQueue选择

MessageQueue选取相关逻辑时在消息发送是调用sendDefaultImpl方法逻辑中,在进行MessageQueue选择之前,会先获取当前的消息对应Topic的订阅信息(TopicPublishInfo),获取Topic订阅信息的逻辑如下:

1
2
3
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 该调用会先从 ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable 获取
// 如果获取不到,则从updateTopicRouteInfoFromNameServer中更新最新的订阅信息,前面文章都有介绍

获取MessageQueue相关逻辑如下(精简掉不相关逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MessageQueue mq = null;
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 获取MessageQueue入口
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
// ...
}
}

在上述逻辑中,获取MessageQueue的时候,入参有两个,第一个是topicPublishInfo(上面有介绍),还有一个入参是lastBrokerName,该参数的作用是为了后续重试的时候,选择不同的Broker。因为当第一次消息发送失败的时候,可能是Broker节点异常或者Broker busy等异常引起的,为了提高消息发送的成功率,在消息发送重试逻辑中会选择其它的Broker来提升消息发送的成功率(容错机制)。

selectOneMessageQueue逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// sendLatencyFaultEnable 默认为false,后续会介绍到相关逻辑
if (this.sendLatencyFaultEnable) {
try {
// 如果设置了 sendLatencyFaultEnable = true 的情况
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断broker是否可用(延时机制),是通过在调用过程中设置的延时时间同当前时间来判断broker是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
// 下面一行逻辑在高版本中已经删除
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 如果没有选择到 MessageQueue,则选择一个可能认为不是最优解的Broker
/**
* 在选择 notBestBroker 逻辑中,是通过一个排序算法来实现的,根据startTimestamp 和 currentLatency 做排序而得到的
* 在该逻辑中进行排序之前做了 Collections.shuffle(tepList);操作,这个操作是无效的操作,在后面的高版本中已经移除
*/
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 判断broker是否存在可写队列,如果存在,参考后面的逻辑得到一个MessageQueue并返回
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 选择具体的MessageQueue
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

针对if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { 判断逻辑的思考:

个人理解:参考后面 selectOneMessageQueue(lastBrokerName) 的逻辑,应该将 if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) 改成 if (null == lastBrokerName || !mq.getBrokerName().equals(lastBrokerName)) 会比较合理一点,但是这个判断逻辑在高版本中已经被删除了,直接返回了对应的 MessageQueue。

针对这个判断逻辑的删除,个人有如下猜想:因为在该逻辑中开启了延时机制控制,已经做了是否可用的判断(isAvailable),所以在这个地方已经做了延时机制控制的基础上不需要在对当前 broker 和 lastBroker 进行再次的判断。

选择MessageQueue逻辑 selectOneMessageQueue(final String lastBrokerName) :

在该逻辑中给,先会判断 lastBrokerName 是否为空,lastBrokerName 主要是用于重试的时候,在后面重试逻辑中尽量避开之前下发消息失败的那个Broker,提升消息发送的成功率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 首次进入该方法,会判断 lastBrokerName 是否为空
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
// 计算index,根据index 和 MessageQueueList 的大小取模,计算一个位点
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
// 根据计算的位点从MessageQueueList中获取到对应下标的MessageQueue
MessageQueue mq = this.messageQueueList.get(pos);
// 拿到获取到的 BrokerName, 同上一次选取的brokerName做比较,尽量选择不同Broker的MessageQueue(容错)
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 选出MessageQueue
return selectOneMessageQueue();
}
}

selectOneMessageQueue:

在该逻辑中,也运用到了容错负载思想,在进行调用的时候是通过随机一个整数来计算的,这样可以避免掉固定值在进行第一次选择的时候选择的都是同一个Queue的情况

1
2
3
4
5
6
7
8
9
10
public MessageQueue selectOneMessageQueue() {
// sendWhichQueue = new ThreadLocalIndex()
// ThreadLocalIndex 中是通过 ThreadLocal<Integer> 来保存一个随机的Integer,每次获取的时候,会做++操作,保证在选择的时候进行list的轮询
int index = this.sendWhichQueue.getAndIncrement(); // index = Math.abs(random.nextInt()); index < 0 ? 0 : index;
int pos = Math.abs(index) % this.messageQueueList.size();
// pos < 0 是防止 Math.abs(index) 操作时 index 随机到 Integer.MIN_VALUE
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}

故障延时

在RocketMQ中,故障延时是针对Broker的一种保护机制,会对应有三个参数,分别如下:

1
2
3
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  1. sendLatencyFaultEnable:延时机制开关

    默认是false

  2. latencyMax:延时级别

    在消息发送的过程中,会计算出当前消息的延时,然后通过计算的延时后,找到对应的延时区间

  3. notAvailableDuration:不可用时长级别

    该参数和latencyMax是一一对应的,根据当前的延时级别可以找到对应的不可用时长

    从该参数配置可以看出,前面两个级别为Rocket认为消息发送比价合理的延时级别,因为对应配置的不可用时长都是0

在消息发送的逻辑中,是通过FaultItem 类来维护具体的延时以及broker对应的可用时间戳字段的,该类中维护了三个字段,分别是name(broker名称)、currentLatency(当前延时时长)、startTimestamp(Broker可用时间节点的时间戳) = 当前时间时间戳 + 不可用延时时长,具体逻辑如下:

维护FaultItem字段逻辑:

1
2
3
4
5
6
7
8
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 根据计算的延时,计算出具体的不可用时长
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 维护FaultItem
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}

在调用该逻辑的时候,有一个布尔类型的参数 isolation 字段,该参数是用于控制延时时长隔离的表示,如果设置的true,则延时时长固定为30000,否则按照实际的延时处理

computeNotAvailableDuration逻辑如下:

1
2
3
4
5
6
7
8
private long computeNotAvailableDuration(final long currentLatency) {
// 从后向前便利 latency 中的元素,如果当时的延时 > latency,选择当前latencyMax对应的不可用时长
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}

latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration) 逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 判断是否之前设置过
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
// 设置新对象,并赋值相关属性
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
// 并发控制
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
// 更新当前延时
old.setCurrentLatency(currentLatency);
// 设置 startTimestamp:当前时间戳 + notAvailableDuration
// 后续判断是否可用逻辑就是通过该字段去实现的
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}

后续在判断可用的时候,就是通过当前时间戳计算的 startTimestamp进行比较的

具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
// 返回调用
return faultItem.isAvailable();
}
return true;
}

// isAvailable() 调用如下:
public boolean isAvailable() {
// 判断当前时间是否大于之前设置的 startTimestamp,大于表示可用
return (System.currentTimeMillis() - startTimestamp) >= 0;
}

RocketMQ——MessageQueue选择和故障延时机制
http://yoursite.com/post/ed300e58.html/
Author
Chase Wang
Posted on
September 20, 2022
Licensed under