背景
前面的文章中对消息发送过程中消息的重试进行了分析和说明,同时在上一篇文章中对消息的消费流程进行了较为深入的学习和分析。但是在整个消息的消费流程中会经常有消费失败的场景,这个场景主要包括两个方面,一方面是网络引起的消费失败,另一方面则主要是客户端业务异常返回的消费失败。本篇内容则主要是针对这种消费失败或者消费后没有响应的消息处理的逻辑进行相关的分析和说明(Push场景的CLUSTERING模式)。
接上篇
上篇RocektMQ文章“RocketMQ——消息消费流程”中已经对RocektMQ的整个消费流程做了说明,分析了从Broker拿到消息后,将拿到的消息分组封装成具体的ConsumeRequest的任务,交由具体的线程池去处理,在该人物的run方法中,拿到Consumer注册的Listener,然后根据该listener中的consumeMessage完成消息的具体消费,客户端业务处理完后再返回对应的处理状态 status,主要是 CONSUME_SUCCESS 和 RECONSUME_LATER。最后根据拿到的返回结果去处理结果
代码入口 ConsumeMessageConcurrentlyService.java
1 2
| ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
|
在processConsumeResult逻辑中,首先判断判断了消息是否为空,然后针对status初始化了ackIndex变量,该变量主要是后面用来判断是否需要执行sendMessageBack逻辑,其次对于需要进行消费重试的这一批消息做了sendMessageBack的逻辑处理,当然针对sendMessageBack失败的消息通过分组的方式以consumeRequest 任务的形式重新提交给消费线程处理(前面文章已分析过)。
相关实现逻辑和具体说明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) { int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty()) return;
switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; }
switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
|
消费重试消息处理(Client)
该章节内容中,主要是针对需要进行消费重试消息消费流程和机制尽心分析说明,相关逻辑入口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) { int delayLevel = context.getDelayLevelWhenNextConsume(); msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic())); try { this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName()); return true; } catch (Exception e) { log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e); } return false; }
|
在sendMessageBack逻辑中,需要注意的是,在进行后续consumerSendMessageBack的逻辑逻辑的时候Broker的选择,brokerName = context.getMessageQueue().getBrokerName() ,即原消息所在的Broker。如果consumerSendMessageBack操作异常,会根据原Msg创建一个新的Msg,向Topic为%RETRY%+ConsumerGroupName发送一条延迟消息,便于后续逻辑的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes()); } catch (Exception e) { Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes())); MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } finally { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); } }
|
而在consumerSendMessageBack的逻辑中,只做了一件事,即封装了一个RequestCode为 RequestCode.CONSUMER_SEND_MSG_BACK 的RemotingCommand的请求,该对象中主要包含了原消息的consumerGroup、Topic、CommitLogOffset、MsgId。在Broker端的会有专门的方法出来该Request,处理方法:org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack。
消费重试消息处理(Broker)
如下逻辑是Broker端处理消息重试的入口,该逻辑中的核心思想是根据request中原消息的一些信息,从Broker中找到具体的消息信息MessageExt,根据MessageExt创建一个新Msg(newMsg):MessageExtBrokerInner(继承了MessageExt)。newMsg中主要对原来消息的相关信息做了备份(Properties),同时设置了新的Topic (%RETRY% + consumerGroup)。做好这些操作之后,将这个新的消息写入到CommitLog。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel(); int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); return response; }
|
在MessageStore.putMessage逻辑中,真正落盘消息之前针对这种延迟消息做了如下特殊处理:
- 改变新的Topic(SCHEDULE_TOPIC_XXXX),延时消息的固定Topic
- 更改QueueId,不同级别的延迟消息写入到不同的队列中,新的队列为delayLevel - 1
- 备份源 Topic 和 QueueId,源Topic为:%RELAY%+GroupName
CommitLog.putMessage针对延迟消息的相关逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic); msg.setQueueId(queueId); } }
|
至此,针对消息重试的流程中已经将重试的消息写入到Topic为SCHEDULE_TOPIC_XXXX的队列中
主要流程图如下:

至此,重试消息已经落盘到CommitLog中,后面则介绍消费者对重试消息的接受和处理。
重试订阅关系绑定
Consumer在启动的时候,对于CLUSTERING消费模式,会为这个组自动订阅一个重试Topic的订阅关系便于后续消费重试处理,相关逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; }
|
ScheduleMessageService
Broker 启动的时候,会执行到ScheduleMessageService中的start方法,该方法主要是为了给每个延迟等级创建一个Task(DeliverDelayedMessageTimerTask)即不同Level对应的每一个Queue(QueueId=DelayLevel -1,Topic:SCHEDULE_TOPIC_XXXX),并将该Task 仍进一个Timer中,定时执行周期1000ms。
相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } }
|
DeliverDelayedMessageTimerTask
每个Task的核心逻辑如下:
- 根据当前的Level 获取到对应的ConsumeQueue(Level -1)
- 根据offset获取到需要处理的DelayMsg以及对应的tagsCode(如果是延迟消息则是投递的时间戳)
- 根据这个投递时间戳和当前时间计算是否需满足投递,如果不满足,则计算好投递时间重新生成任务
- 如果满足投递条件,根据这个消息体创建一条新的Message,这个新的Msg(NewMsg)对应的Topic为DelayMsg中备份的msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC),即消费端真实的Topic
- 将NewMsg处理干净后(clearProperty)重新写入到CommitLog,便于消费者消费
- 重新计算更新offset
具体逻辑如下(省略无关内容):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| public void executeOnTimeup() { ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (MixAll.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { continue; } PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { } } } else { ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } else { long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; } } } ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
|
大致交互如下:

总结
对于消费端消息消费重试的整个过程大致如下:
- Consumer消息消费返回RECONSUME_LATER
- Client会发送一个SendBack的Request,RequestCode为:CONSUMER_SEND_MSG_BACK
- Broker接收到请求后对于这种延迟消息,会根据原消息创建一个新的Msg,Topic为SCHEDULE_TOPIC_XXXX,QueueId为DelayLevel-1,并将该消息写入到CommitLog
- Broker在启动的时候,会对SCHEDULE_TOPIC_XXXX这个Topic对应的每一个队列创建一个Task(DeliverDelayedMessageTimerTask),并交由Timer处理
- Timer在在处理每一个Task的时候会先会获取到对应的Queue,以及Queue中要处理的Msg
- 判断这个Msg是否满足投递时机(TagCode和CurrentTime比较)
- 满足投递时机,会根据这个Msg创建一个消息,这个时候这个新消息的Topic为原来消息备份在Properties中的RealTopic,创建完Msg后,将这个Msg按照正常的消息重新写入到CommitLog,便于Consumer再次消费