消息消费 之前的内容主要针对消息发送相关的内容进行了较为深入的分析和说明,本篇文章内容主要针对RocketMQ消息消费的主要流程进行分析说明,主要针对Consumer的Push操作进行分析,以集群消费为例。
对于Consumer的消费理解,首先要从客户端的启动说起,如下是一个最简单的MQConsumer的消费逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1" ); consumer.subscribe("Chase_Topic" , "*" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }
如上逻辑中,一个Consumer在启动的时候会注册一个MessageListener,这个Listener就是后续在消息拉取过来后真正负责消费消息的入口(后续会介绍到)。
准备工作完成后,会调用consumer.start()逻辑启动这个Consumer,而Consumer在进行消费消息的时候,真正的逻辑是启动之后的某个线程里面完成的,后面会主要对这个线程进行详细的分析和说明
Consumer启动 consumer.start()逻辑中,会执行this.defaultMQPushConsumerImpl.start()的start方法,当一个新的Consumer在启动的时候,会在这个start逻辑中执行做如下几个比较重要的操作:
this.checkConfig(); 校验配置参数。
this.copySubscription(); copy一个订阅关系,主要是针对重试逻辑的准备操作,后面文章会详细说明
消费相关快照的初始化操作
this.consumeMessageService.start(); 对于并发消费Listener来说主要是清理过期的消息
mQClientFactory.start(); mQClientFactory 的初始化和start
mQClientFactory.start()的具体逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void start () throws MQClientException { synchronized (this ) { switch (this .serviceState) { case CREATE_JUST: this .serviceState = ServiceState.START_FAILED; if (null == this .clientConfig.getNamesrvAddr()) { this .mQClientAPIImpl.fetchNameServerAddr(); } this .mQClientAPIImpl.start(); this .startScheduledTask(); this .pullMessageService.start(); this .rebalanceService.start(); this .defaultMQProducer.getDefaultMQProducerImpl().start(false ); log.info("the client factory [{}] start OK" , this .clientId); this .serviceState = ServiceState.RUNNING; break ; case START_FAILED: throw new MQClientException("The Factory object[" + this .getClientId() + "] has been created before, and failed." , null ); default : break ; } } }
对于消息消费而言,在如上逻辑中 最终的的是 this.pullMessageService.start();
PullMessageService线程拉取消息 PullMessageService 是一个线程,在run方法中会不断的从Request请求队列中获取请求,从Broker拉消息,PullRequest 是一个异步调用过程,异步调用,拉到消息后,将消息放到本地缓存TreeMap中,,然后交给另外一个消费线程中(线程隔离思想,专事专干),当Case FOUND后,处理完当前逻辑后,就会一直循环拉取
mQClientFactory.start()逻辑中,启动Consumer的时候,会执行pullMessageService.start()逻辑,而PullMessageService是一个Thread,继承了ServiceThread,PullMessageService中对应的run方法逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public void run () { log.info(this .getServiceName() + " service started" ); while (!this .isStopped()) { try { PullRequest pullRequest = this .pullRequestQueue.take(); this .pullMessage(pullRequest); } catch (InterruptedException ignored) { } catch (Exception e) { log.error("Pull Message Service Run Method exception" , e); } } log.info(this .getServiceName() + " service end" ); }
在该线程定义了一个名为pullRequestQueue的LinkedBlockingQueue,用来存放拉取数据的请求对象。
在线程的run方法中,会不断的从这个队列pullRequestQueue中take队列中的PullRequest对象,然后去拉取消息。针对pullRequestQueue队列中元素的来源,后面会详细介绍,先来了解一下pullMessage(pullRequest)的具体逻辑:
1 2 3 4 5 6 7 8 9 10 11 private void pullMessage (final PullRequest pullRequest) { final MQConsumerInner consumer = this .mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null ) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it" , pullRequest); } }
pullMessage 主要逻辑,为了减少篇幅过长,会适当省略不重要的逻辑和相关性不大的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 public void pullMessage (final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { return ; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this .makeSureStateOK(); } catch (MQClientException e) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return ; } if (this .isPause()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return ; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024 ); if (cachedMessageCount > this .defaultMQPushConsumer.getPullThresholdForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return ; } if (cachedMessageSizeInMiB > this .defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return ; } if (!this .consumeOrderly) { if (processQueue.getMaxSpan() > this .defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this .executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); return ; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this .rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); pullRequest.setLockedFirst(true ); pullRequest.setNextOffset(offset); } } else { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return ; } } final SubscriptionData subscriptionData = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return ; } PullCallback pullCallback; boolean commitOffsetEnable = false ; long commitOffsetValue = 0L ; if (MessageModel.CLUSTERING == this .defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this .offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0 ) { commitOffsetEnable = true ; } } String subExpression = null ; boolean classFilter = false ; SubscriptionData sd = this .rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null ) { if (this .defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()){ subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, true , subExpression != null , classFilter ); try { this .pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this .defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }
上面逻辑中,在进行消息拉取之前,会先校验服务状态以及根据缓存的数据进行一些流控的限制(消息数量、消息大小、消息跨度)。不满足拉取消息的机制出触发后,会调用executePullRequestLater进行延迟拉取(将pullRequest put 到队列pullRequestQueue中,等待下一次的拉取操作)。
这也就对上面遗留的问题即pullRequestQueue队列中数据来源做了部分的回答。
在进行消息拉取的时候,会调用pullKernelImpl逻辑,在这个逻辑中,会封装一个PullMessage的请求(请求Code:RequestCode.PULL_MESSAGE)去完成消息的拉取
消息拉取回调 以异步消息为例,在消息拉取响应后,回调用pullCallback中的Success逻辑或者Exception逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 PullCallback pullCallback = new PullCallback() { @Override public void onSuccess (PullResult pullResult) { if (pullResult != null ) { pullResult = DefaultMQPushConsumerImpl.this .pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0 ).getQueueOffset(); DefaultMQPushConsumerImpl.this .getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this .consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval() > 0 ) { DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this .defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); } } break ; case NO_NEW_MSG : pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this .correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); break ; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this .correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this .executePullRequestImmediately(pullRequest); break ; case OFFSET_ILLEGAL: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true ); DefaultMQPushConsumerImpl.this .executeTaskLater(new Runnable() { @Override public void run () { try { DefaultMQPushConsumerImpl.this .offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false ); DefaultMQPushConsumerImpl.this .offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this .rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); } catch (Throwable e) { } } }, 10000 ); break ; default : break ; } } } @Override public void onException (Throwable e) { DefaultMQPushConsumerImpl.this .executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); } }private void pullMessageAsync ( final String addr, final RemotingCommand request, final long timeoutMillis, final PullCallback pullCallback ) throws RemotingException, InterruptedException { this .remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override public void operationComplete (ResponseFuture responseFuture) { RemotingCommand response = responseFuture.getResponseCommand(); if (response != null ) { try { PullResult pullResult = MQClientAPIImpl.this .processPullResponse(response); assert pullResult != null ; pullCallback.onSuccess(pullResult); } catch (Exception e) { pullCallback.onException(e); } } else { if (!responseFuture.isSendRequestOK()) { pullCallback.onException(new MQClientException("send request failed to " + addr +". Request: " + request, responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response from " + addr + "timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknown reason. addr: " + addr +", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause())); } } } }); }
即整个消费流程可以简单总结为:PullMessageService 从PullRequestQueue中获取到一个PullRequest后,交给具体的线程从Broker中拉取消息,如果没有拉取到,将这个PullRequest重新放到队列中,稍后再进行拉取操作。如果拉取到后,首先将消息放到ProcessQueue中的TreeMap中,然后将消息按照BatchMessageSize进行分组,然后将每一组消息封装成一个ConsumerRequest的Runnable,并将Runnable提交到ConsumeMessageThreadPool中,然后完成对应的消费,具体处理消息消费的逻辑如下
消息消费 下面逻辑为ConsumeRequest的一些核心逻辑,省略了一些非关键代码
注:ConsumeRequest是ConsumeMessageService的内部类,ConsumeMessageService在启动时注册的监听器在ConsumeRequest中时都能拿到的。
ConsumeMessageService的consumeExecutor获取到ConsumeRequest任务会调用到run,获取到对应的listener,然后通过这个listener调用客户端的消费方法consumeMessage处理用户自己的消费逻辑,并返回对应的消费状态,然后根据返回的消费状态来操作处理结果和更新消费进度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 class ConsumeRequest implements Runnable { private final List<MessageExt> msgs; private final ProcessQueue processQueue; private final MessageQueue messageQueue; @Override public void run () { MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this .messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null ; defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); long beginTimestamp = System.currentTimeMillis(); boolean hasException = false ; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { hasException = true ; } long consumeRT = System.currentTimeMillis() - beginTimestamp; if (null == status) { if (hasException) returnType = ConsumeReturnType.EXCEPTION; else returnType = ConsumeReturnType.RETURNNULL; } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000 ) { returnType = ConsumeReturnType.TIME_OUT; } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { returnType = ConsumeReturnType.FAILED; } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { returnType = ConsumeReturnType.SUCCESS; } if (null == status) { status = ConsumeConcurrentlyStatus.RECONSUME_LATER; } ConsumeMessageConcurrentlyService.this .getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this .consumerGroup, messageQueue.getTopic(), consumeRT); if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this .processConsumeResult(status, context, this ); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}" , messageQueue, msgs); } } }
遗留问题 前面遗留的pullRequestQueue的数据来源问题,在前面PullMessageService线程拉取消息 章节的内容中已经解答了部分,这种情况主要是针对拉取失败后重新延迟拉取的场景,下面内容主要对另外一场场景的数据来源进行说明(重要)。
还有一种场景是在RebalanceService在dispatchPullRequest的时候,会执行该操作。RebalanceService在调用RebalanceByTopic的时候会对不同的Topic分配不同的MessageQueue,在dispatchPullRequest逻辑之前,会对每一个MessageQueue创建一个PullRequest,并添加到pullRequestQueue 中, 相关逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 for (MessageQueue mq : mqSet) { if (!this .processQueueTable.containsKey(mq)) { this .removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this .computePullFromWhere(mq); if (nextOffset >= 0 ) { ProcessQueue pre = this .processQueueTable.putIfAbsent(mq, pq); if (pre != null ) { } else { PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true ; } } } public void dispatchPullRequest (List<PullRequest> pullRequestList) { for (PullRequest pullRequest : pullRequestList) { this .defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); } }
消费流程总结 客户端启动之后,RebalanceService会为Topic分配不同的MessageQueue,同时会针对不同的MessageQueue创建不同的PullRequest对象,并将这些PullRequest添加到pullReqeustQueue队列中。PullMessageService线程会周期性的从这个队列中获取对应的PullRequest从Broker拉取消息(拉取消息又专门的拉取线程),拉取到消息后将拉取到的Message添加到本地缓存中(processQueue的TreeMap中),并将这些消息按照ConsumeBatchSize数量进行分组,每一组封装成一个ConsumeRequest(Runnable)并丢到ConsumeMessageService的一个线程池中,该类是一个ConsumeMessageService内部类,在处理该Runnable的时候会获取到对应的Listener,然后通过Listener调用客户端自定义的consumeMessage消费逻辑(从这个地方可以看出我们在处理消息时候的List< Message> msg 的数量是在封装ConsumeRequest的时候已经分配好的),客户端处理完成后,返回对应额消费状态,拿到结果处理消费进度或者将消息重新写入磁盘(失败的场景)
注:图片来自网络: