RocketMQ——消息消费重试原理分析

背景

前面的文章中对消息发送过程中消息的重试进行了分析和说明,同时在上一篇文章中对消息的消费流程进行了较为深入的学习和分析。但是在整个消息的消费流程中会经常有消费失败的场景,这个场景主要包括两个方面,一方面是网络引起的消费失败,另一方面则主要是客户端业务异常返回的消费失败。本篇内容则主要是针对这种消费失败或者消费后没有响应的消息处理的逻辑进行相关的分析和说明(Push场景的CLUSTERING模式)。

接上篇

上篇RocektMQ文章“RocketMQ——消息消费流程”中已经对RocektMQ的整个消费流程做了说明,分析了从Broker拿到消息后,将拿到的消息分组封装成具体的ConsumeRequest的任务,交由具体的线程池去处理,在该人物的run方法中,拿到Consumer注册的Listener,然后根据该listener中的consumeMessage完成消息的具体消费,客户端业务处理完后再返回对应的处理状态 status,主要是 CONSUME_SUCCESS 和 RECONSUME_LATER。最后根据拿到的返回结果去处理结果

代码入口 ConsumeMessageConcurrentlyService.java

1
2
// 处理结果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

在processConsumeResult逻辑中,首先判断判断了消息是否为空,然后针对status初始化了ackIndex变量,该变量主要是后面用来判断是否需要执行sendMessageBack逻辑,其次对于需要进行消费重试的这一批消息做了sendMessageBack的逻辑处理,当然针对sendMessageBack失败的消息通过分组的方式以consumeRequest 任务的形式重新提交给消费线程处理(前面文章已分析过)。

相关实现逻辑和具体说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex(); // 默认ackIndex = Integer.MAX_VALUE;

if (consumeRequest.getMsgs().isEmpty())
return;

switch (status) {
case CONSUME_SUCCESS:
// 消费成功 初始化ackIndex,便于后续判断是否需要执行重试逻辑
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1; // -1
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING: // 广播消费
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING: // 集群消费
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());

/**
* 失败的时候 ackIndex = -1;
* 成功的时候 ackIndex = consumeRequest.getMsgs().size() - 1;
*/
// 成功的时候,i = consumeRequest.getMsgs().size() - 1 + 1 不满足 i < consumeRequest.getMsgs().size()
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
// 只有失败逻辑才会进入该逻辑
MessageExt msg = consumeRequest.getMsgs().get(i);
// 消费失败后,将该msg重新send到Broker端
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 对于重试失的消息,分组以consumeRequest 任务的形式重新提交给消费线程处理(上篇内容已说明)
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 从本地缓存processQueue中删除消息,并返回删除后第红黑树的第一个数据的key
// 因为红黑树天然有序,第一个及删除后最小的(红黑树中最老得消息)
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 确认offset,告诉broker已经消费了
// 这里面的实现:将结果放到缓存中,然后通过定时线程将这个缓存进行上报
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}

消费重试消息处理(Client)

该章节内容中,主要是针对需要进行消费重试消息消费流程和机制尽心分析说明,相关逻辑入口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// delayLevel 延迟等级,默认是0
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}

在sendMessageBack逻辑中,需要注意的是,在进行后续consumerSendMessageBack的逻辑逻辑的时候Broker的选择,brokerName = context.getMessageQueue().getBrokerName() ,即原消息所在的Broker。如果consumerSendMessageBack操作异常,会根据原Msg创建一个新的Msg,向Topic为%RETRY%+ConsumerGroupName发送一条延迟消息,便于后续逻辑的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
// 原Msg在哪个Broker,这次还会send到哪个Broker
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
// 异常逻辑
// 创建新消息 topic = %RETRY%+ConsumerGroup
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
// 备份MsgId
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
// realTopic
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

而在consumerSendMessageBack的逻辑中,只做了一件事,即封装了一个RequestCode为 RequestCode.CONSUMER_SEND_MSG_BACK 的RemotingCommand的请求,该对象中主要包含了原消息的consumerGroup、Topic、CommitLogOffset、MsgId。在Broker端的会有专门的方法出来该Request,处理方法:org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack。

消费重试消息处理(Broker)

如下逻辑是Broker端处理消息重试的入口,该逻辑中的核心思想是根据request中原消息的一些信息,从Broker中找到具体的消息信息MessageExt,根据MessageExt创建一个新Msg(newMsg):MessageExtBrokerInner(继承了MessageExt)。newMsg中主要对原来消息的相关信息做了备份(Properties),同时设置了新的Topic (%RETRY% + consumerGroup)。做好这些操作之后,将这个新的消息写入到CommitLog。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
// 找出原先的Msg
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
// 将原Topic相关信息备份到这个Msg的Property属性中,便于消息重试
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);

int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {
} else {
if (0 == delayLevel) {
// +3 的原因可能是由于1和2对应的重试间隔分别是1s和5s,时间太短,如果这种消息很多,会很频繁,容易引起消息风暴,可能会让Broker产生较大的压力
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic); // nowTopic = %RETRY% + consumerGroup;
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
// 重试次数+1
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
// 对于延迟消息的处理在后续 org.apache.rocketmq.store.CommitLog.putMessage 逻辑中
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
// 省略response...
return response;
}

在MessageStore.putMessage逻辑中,真正落盘消息之前针对这种延迟消息做了如下特殊处理:

  1. 改变新的Topic(SCHEDULE_TOPIC_XXXX),延时消息的固定Topic
  2. 更改QueueId,不同级别的延迟消息写入到不同的队列中,新的队列为delayLevel - 1
  3. 备份源 Topic 和 QueueId,源Topic为:%RELAY%+GroupName

CommitLog.putMessage针对延迟消息的相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 处理延时、定时消息。 对于延迟消息来说DelayTimeLevel一定是大于0的
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 更改Topic(SCHEDULE_TOPIC_XXXX),延时消息固定Topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 不同级别的延迟消息写入到不同的队列中,新的队列为 延迟级别 - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// Backup real topic, queueId
// 备份源 Topic 和 QueueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}

至此,针对消息重试的流程中已经将重试的消息写入到Topic为SCHEDULE_TOPIC_XXXX的队列中

主要流程图如下:

01.png

至此,重试消息已经落盘到CommitLog中,后面则介绍消费者对重试消息的接受和处理。

重试订阅关系绑定

Consumer在启动的时候,对于CLUSTERING消费模式,会为这个组自动订阅一个重试Topic的订阅关系便于后续消费重试处理,相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
// 只有Clustering模式回订阅这个重试的Topic
// %RELAY%+this.defaultMQPushConsumer.getConsumerGroup()
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}

ScheduleMessageService

Broker 启动的时候,会执行到ScheduleMessageService中的start方法,该方法主要是为了给每个延迟等级创建一个Task(DeliverDelayedMessageTimerTask)即不同Level对应的每一个Queue(QueueId=DelayLevel -1,Topic:SCHEDULE_TOPIC_XXXX),并将该Task 仍进一个Timer中,定时执行周期1000ms。

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
// 创建Task 并仍进timer ---> run
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

DeliverDelayedMessageTimerTask

每个Task的核心逻辑如下:

  1. 根据当前的Level 获取到对应的ConsumeQueue(Level -1)
  2. 根据offset获取到需要处理的DelayMsg以及对应的tagsCode(如果是延迟消息则是投递的时间戳)
  3. 根据这个投递时间戳和当前时间计算是否需满足投递,如果不满足,则计算好投递时间重新生成任务
  4. 如果满足投递条件,根据这个消息体创建一条新的Message,这个新的Msg(NewMsg)对应的Topic为DelayMsg中备份的msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC),即消费端真实的Topic
  5. 将NewMsg处理干净后(clearProperty)重新写入到CommitLog,便于消费者消费
  6. 重新计算更新offset

具体逻辑如下(省略无关内容):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public void executeOnTimeup() {
// 根据leve 拿到对应的ConsumeQueue,假设level是3,则拿到的是第二号队列
ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 物理便宜量 + 物理大小即可找到一条完整的MSG
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
// tagsCode
long tagsCode = bufferCQ.getByteBuffer().getLong();

if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 正常情况下tagsCode是Tag对应的HashCode的值,但是这个地方被用作的时间处理
// 被用作判断当前消息是否满足投递的条件,说明这个地方的tagsCode是预计的投递时间
// 这个值的具体修改位置为doReput 方法中调用的逻辑中checkMessageAndReturnSize中 :org.apache.rocketmq.store.CommitLog.checkMessageAndReturnSize 方法中
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 满足投递条件,获取到对应需要投递的消息
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt != null) {
try {
// 将符合投递的这个Msg 投递到真实的Topic ----》 再次Broker投递投递一条消息,topic为真实的Topic
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
continue;
}
PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// XXX: warn and notify me
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} catch (Exception e) {
}
}
} else {
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE);
}

大致交互如下:
02.png

总结

对于消费端消息消费重试的整个过程大致如下:

  1. Consumer消息消费返回RECONSUME_LATER
  2. Client会发送一个SendBack的Request,RequestCode为:CONSUMER_SEND_MSG_BACK
  3. Broker接收到请求后对于这种延迟消息,会根据原消息创建一个新的Msg,Topic为SCHEDULE_TOPIC_XXXX,QueueId为DelayLevel-1,并将该消息写入到CommitLog
  4. Broker在启动的时候,会对SCHEDULE_TOPIC_XXXX这个Topic对应的每一个队列创建一个Task(DeliverDelayedMessageTimerTask),并交由Timer处理
  5. Timer在在处理每一个Task的时候会先会获取到对应的Queue,以及Queue中要处理的Msg
  6. 判断这个Msg是否满足投递时机(TagCode和CurrentTime比较)
  7. 满足投递时机,会根据这个Msg创建一个消息,这个时候这个新消息的Topic为原来消息备份在Properties中的RealTopic,创建完Msg后,将这个Msg按照正常的消息重新写入到CommitLog,便于Consumer再次消费

RocketMQ——消息消费重试原理分析
http://yoursite.com/post/c49a5efa.html/
Author
Chase Wang
Posted on
April 22, 2023
Licensed under