RocketMQ——消息重试原理分析

背景

RocketMQ中Producer在发送消息的时候,可能存在发送失败的情况(如网络通信、部分Broker异常等),RocketMQ默认支持了失败重试机制,本文基于上一篇文章 https://blog.5iwork.com/post/efbf9e99.html/ 展开对消息重试进行分析和说明

RocketMQ中对重试机制几个配置,可支持用户自定义配置:

  1. retryTimesWhenSendFailed

    同步发送重试次数,默认为2,最多可尝试 n + 1次

  2. retryTimesWhenSendAsyncFailed

    异步发送重试次数,默认为2,最多可尝试 n + 1 次

  3. retryAnotherBrokerWhenNotStoreOK

    默认为false,用于同步刷盘、异步复制场景,发送成功,复制状态异常时是否进行重试操作。

    发送消息时间,SendResult有如下四种状态,分别发送成功、主刷盘超时、备刷盘超时、备机不可用,如果出现后面三种状态(FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE),它会认为可能会有消息丢失的可能,需不需要进行重试

    1
    2
    3
    4
    5
    6
    public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
    }

上一篇文章中有说到,RocketMQ中对于消息发送模式支持有三种,分别是SYNC同步消息、ASYNC异步消息以及ONEWAY单向不可靠消息。而在RocketMQ中,重试机制只适用于同步消息和异步消息,不支持单向消息。在RocketMQ中,由于同步消息和异步消息的实现有差别,在进行消息重试的处理逻辑上也存在相应的差异,故本文会分别对同步消息和异步消息的重试机制进行对应的说明。

同步消息重试

同步消息重试是根据配置的重试次数+1计算出总共执行的次数,通过for循环实现的

本章节只介绍消息重试相关的逻辑,其余的逻辑可以参考上一篇消息发送相关的内容

对应源码如下(精简掉不相关逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException,InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 根据配置的重试次数+1计算出最多执行次数,然后通过for循环进行循环便利
// 因为单向消息的消息发送也会执行到这个逻辑里面,所以在这儿会判断是不是同步消息,如果不是同步消息,即单向消息则只执行一次
// 异步消息发送也会执行到该逻辑中,也会执行一次,因为具体的重试会到后续的消息处理中进行重试操作
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
// 根据计算的总次数遍历执行,直到超时
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName()
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 重新计算到当前环节花费的时间,同超时时间做比较,如果超时则结束循环,后续逻辑中直接抛超时异常给客户端
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 消息发送逻辑,在执行消息发送逻辑这个方法的时候,会根据当前花费的时间,重新计算一个新的timeout时间,便于后续逻辑中计算是否超时
// 在sendKernelImpl方法逻辑中,如果计算出最终的timeout < costTime 则直接抛 RemotingTooMuchRequestException 或RemotingTimeoutException 异常,该异常不会被当前try catch 捕获,直接返回给客户端,不再进行重试
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 对于同步消息而言,如果返回的发送结果不是SEND_OK, 并且设置了isRetryAnotherBrokerWhenNotStoreOK = true,则需要进行下一次的重试操作
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
// RemotingException 异常,进入下一次循环,执行重试操作
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQClientException e) {
// MQClientException 异常,进入下一次循环,执行重试操作
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
// MQBrokerException 针对性的根据ResponseCode 判断是否需要进行重试
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
// InterruptedException 异常不会进入重试操作,直接throw e给Client
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
// 如果超时,直接抛异常
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
// 封装异常信息
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

总结
RocketMQ中,针对消息发送超时时间在新老版本中是有做改动的,在老版本中,超时时间指的是每次执行Send操作时的超时时间,即当设置的超时时间是3s的时候,配置的重试次数为2,总次数为2+1,这时候最多的超时时间为 3 * 3 = 9s,但是在新版本中做了调整,新版本中超时时间为总的超时时间(包含了重试次数在内),所以在配置超时时间的时候可以适当的配置大一些。
所以在每次执行比较重要的操作之前,会先计算一次costTime,然后timeout在每次调用的时候会减去计算的costTime,作为最新的timeout并将其作为参数传递到后续的逻辑中。如果在调用的时候发现 timeout < costTime, 则会抛出RemotingTooMuchRequestException 或RemotingTimeoutException,这两个异常在外面的逻辑中不会被catch掉,意味着超时,直接返回异常给客户端,不会再进行后续的重试操作。
对于同步消息而言,消息重试的处罚包含如下场景:

  1. RemotingException 异常
  2. MQClientException 异常
  3. MQBrokerException 异常
    对于Broker异常这种场景中,还区分了返回的ResponseCode,如果是如下几种ResponseCode则会进行重试操作,其余场景不会触发重试动作
    TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT
    需要注意的是BrokerExcepiton中,比较常见的一种异常Code为SYSTEM_BUSY(broker busy),这种异常在实际的工作中会比较容易见到,该编码类型的异常不会进行重试,如果业务场景需要,可根据实际场景手动进行补偿处理。
  4. sendResult中,sendStatus结果不是SEND_OK,且isRetryAnotherBrokerWhenNotStoreOK设置为true的情况
    该场景主要是针对主刷盘超时、备刷盘超时、备机不可用这种场景

异步消息重试

对于异步消息而言,在异步发送的逻辑中,是向线程池中提交任务处理的,如果线程池拒绝了该任务,则直接抛异常给Client,在提交任务的过程中,也会重新计算对应的costTime,然后同timeout进行比较,如果超时,则直接调用sendCallback的onException方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
// 任务提交计算耗费时间,并校验是否超时
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
// 发送逻辑
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
// 超时直接调用回调函数的onException方法
sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
// 任务被拒绝,直接抛异常给Client
throw new MQClientException("executor rejected ", e);
}

上面逻辑中调用的发送逻辑sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);同样会进入到同步消息的发送逻辑中,上面同步消息逻辑中有介绍,因为消息模式为异步,所以在执行 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 这一行逻辑的时候,计算出的 timesTotal 为1,所以在调用该方法中,指挥执行一次for循环,具体的重试逻辑会在后续调用 sendMessageAsync 逻辑中执行
在调用sendKernelImpl中,执行异步消息发送的时候,会执行到如下的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC: // 异步消息标记
Message tmpMessage = msg;
// 在执行异步逻辑发送的时候,同样会计算当前操作耗费的时间,并判断是否超时
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
// 省略其它两种消息发送逻辑
default:
assert false;
break;
}

在上面sendMessage方法中,会执行到如下逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
// 在invokeAsync中,最终会调用到 channel.writeAndFlush(request) ,这这只前同样会判断是否超时,如果超时,同样会返回RemotingTooMuchRequestException 或 RemotingTimeoutException的异常
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
// 处理结果回调
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 无需重试,因为 needRetry为false
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
// 如果SendRequestOK 为 false, 则进行重试操作
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
// 重试操作
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
// 超时操作
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
}

异步消息的重试和同步消息的重试在实现上是有差异的,同步消息是通过for 来便利总共执行的次数,然后根据超时时间来判断是否需要执行后续的循环来进行重试操作的,而异步的重试逻辑是通过调用 onExceptionImpl 方法来实现的,
该方法中是通过AtomicInteger curTimes参数来记录当前的执行次数,以及是否需要重试 boolean needRetry,来判断后续的重试逻辑的,然后在执行onExceptionImpl逻辑中,通过递归的方式来判断当前重试次数和总共需要重试的次数进而确定是否需要重试。
具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void onExceptionImpl(final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int timesTotal,
final AtomicInteger curTimes,
final Exception e,
final SendMessageContext context,
final boolean needRetry,
final DefaultMQProducerImpl producer
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
String retryBrokerName = brokerName;//by default, it will send to the same broker
if (topicPublishInfo != null) { //select one message queue accordingly, in order to determine which broker to send
MessageQueue mqChosen = producer.selectOneMessageQueue(topicPublishInfo, brokerName);
retryBrokerName = mqChosen.getBrokerName();
}
String addr = instance.findBrokerAddressInPublish(retryBrokerName);
try {
request.setOpaque(RemotingCommand.createNewRequestId());
// 在sendMessageAsync逻辑中同样会调用的onExceptionImpl的逻辑
sendMessageAsync(addr, retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, context, producer);
} catch (InterruptedException e1) {
// InterruptedException 异常不重试
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer);
} catch (RemotingConnectException e1) {
// RemotingConnectException 异常,可以重试
producer.updateFaultItem(brokerName, 3000, true);
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer);
} catch (RemotingTooMuchRequestException e1) {
// 和同步消息一样 RemotingTooMuchRequestException 异常是不会进行重试的
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, false, producer);
} catch (RemotingException e1) {
producer.updateFaultItem(brokerName, 3000, true);
// RemotingException 异常,可以重试
onExceptionImpl(retryBrokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, context, true, producer);
}
} else {
if (context != null) {
context.setException(e);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onException(e);
} catch (Exception ignored) {
}
}
}

总结:
异步消息的重试机制和同步消息的同步虽在实现逻辑上有差异,但是本质还是一样的,唯一需要注意的是,同步消息针对BrokerException的时候,针对部分ResponseCode的时候,是可以进行重试处理的,但是在异步消息处理逻辑中,BrokerException异常均不会进行重试处理。


RocketMQ——消息重试原理分析
http://yoursite.com/post/c17e13da.html/
Author
Chase Wang
Posted on
September 19, 2022
Licensed under