RocketMQ——消息重试原理分析
背景
RocketMQ中Producer在发送消息的时候,可能存在发送失败的情况(如网络通信、部分Broker异常等),RocketMQ默认支持了失败重试机制,本文基于上一篇文章 https://blog.5iwork.com/post/efbf9e99.html/ 展开对消息重试进行分析和说明
RocketMQ中对重试机制几个配置,可支持用户自定义配置:
retryTimesWhenSendFailed
同步发送重试次数,默认为2,最多可尝试 n + 1次
retryTimesWhenSendAsyncFailed
异步发送重试次数,默认为2,最多可尝试 n + 1 次
retryAnotherBrokerWhenNotStoreOK
默认为false,用于同步刷盘、异步复制场景,发送成功,复制状态异常时是否进行重试操作。
发送消息时间,SendResult有如下四种状态,分别发送成功、主刷盘超时、备刷盘超时、备机不可用,如果出现后面三种状态(FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE),它会认为可能会有消息丢失的可能,需不需要进行重试
1
2
3
4
5
6public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
上一篇文章中有说到,RocketMQ中对于消息发送模式支持有三种,分别是SYNC同步消息、ASYNC异步消息以及ONEWAY单向不可靠消息。而在RocketMQ中,重试机制只适用于同步消息和异步消息,不支持单向消息。在RocketMQ中,由于同步消息和异步消息的实现有差别,在进行消息重试的处理逻辑上也存在相应的差异,故本文会分别对同步消息和异步消息的重试机制进行对应的说明。
同步消息重试
同步消息重试是根据配置的重试次数+1计算出总共执行的次数,通过for循环实现的
本章节只介绍消息重试相关的逻辑,其余的逻辑可以参考上一篇消息发送相关的内容
对应源码如下(精简掉不相关逻辑):
1 |
|
总结
RocketMQ中,针对消息发送超时时间在新老版本中是有做改动的,在老版本中,超时时间指的是每次执行Send操作时的超时时间,即当设置的超时时间是3s的时候,配置的重试次数为2,总次数为2+1,这时候最多的超时时间为 3 * 3 = 9s,但是在新版本中做了调整,新版本中超时时间为总的超时时间(包含了重试次数在内),所以在配置超时时间的时候可以适当的配置大一些。
所以在每次执行比较重要的操作之前,会先计算一次costTime,然后timeout在每次调用的时候会减去计算的costTime,作为最新的timeout并将其作为参数传递到后续的逻辑中。如果在调用的时候发现 timeout < costTime, 则会抛出RemotingTooMuchRequestException 或RemotingTimeoutException,这两个异常在外面的逻辑中不会被catch掉,意味着超时,直接返回异常给客户端,不会再进行后续的重试操作。
对于同步消息而言,消息重试的处罚包含如下场景:
- RemotingException 异常
- MQClientException 异常
- MQBrokerException 异常
对于Broker异常这种场景中,还区分了返回的ResponseCode,如果是如下几种ResponseCode则会进行重试操作,其余场景不会触发重试动作
TOPIC_NOT_EXIST、SERVICE_NOT_AVAILABLE、SYSTEM_ERROR、NO_PERMISSION、NO_BUYER_ID、NOT_IN_CURRENT_UNIT
需要注意的是BrokerExcepiton中,比较常见的一种异常Code为SYSTEM_BUSY(broker busy),这种异常在实际的工作中会比较容易见到,该编码类型的异常不会进行重试,如果业务场景需要,可根据实际场景手动进行补偿处理。 - sendResult中,sendStatus结果不是SEND_OK,且isRetryAnotherBrokerWhenNotStoreOK设置为true的情况
该场景主要是针对主刷盘超时、备刷盘超时、备机不可用这种场景
异步消息重试
对于异步消息而言,在异步发送的逻辑中,是向线程池中提交任务处理的,如果线程池拒绝了该任务,则直接抛异常给Client,在提交任务的过程中,也会重新计算对应的costTime,然后同timeout进行比较,如果超时,则直接调用sendCallback的onException方法
1 |
|
上面逻辑中调用的发送逻辑sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);同样会进入到同步消息的发送逻辑中,上面同步消息逻辑中有介绍,因为消息模式为异步,所以在执行 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; 这一行逻辑的时候,计算出的 timesTotal 为1,所以在调用该方法中,指挥执行一次for循环,具体的重试逻辑会在后续调用 sendMessageAsync 逻辑中执行
在调用sendKernelImpl中,执行异步消息发送的时候,会执行到如下的逻辑:
1 |
|
在上面sendMessage方法中,会执行到如下逻辑:
1 |
|
异步消息的重试和同步消息的重试在实现上是有差异的,同步消息是通过for 来便利总共执行的次数,然后根据超时时间来判断是否需要执行后续的循环来进行重试操作的,而异步的重试逻辑是通过调用 onExceptionImpl 方法来实现的,
该方法中是通过AtomicInteger curTimes参数来记录当前的执行次数,以及是否需要重试 boolean needRetry,来判断后续的重试逻辑的,然后在执行onExceptionImpl逻辑中,通过递归的方式来判断当前重试次数和总共需要重试的次数进而确定是否需要重试。
具体逻辑如下:
1 |
|
总结:
异步消息的重试机制和同步消息的同步虽在实现逻辑上有差异,但是本质还是一样的,唯一需要注意的是,同步消息针对BrokerException的时候,针对部分ResponseCode的时候,是可以进行重试处理的,但是在异步消息处理逻辑中,BrokerException异常均不会进行重试处理。