RocketMQ——消息消费流程原理

消息消费

之前的内容主要针对消息发送相关的内容进行了较为深入的分析和说明,本篇文章内容主要针对RocketMQ消息消费的主要流程进行分析说明,主要针对Consumer的Push操作进行分析,以集群消费为例。

对于Consumer的消费理解,首先要从客户端的启动说起,如下是一个最简单的MQConsumer的消费逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("Chase_Topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
  1. 如上逻辑中,一个Consumer在启动的时候会注册一个MessageListener,这个Listener就是后续在消息拉取过来后真正负责消费消息的入口(后续会介绍到)。
  2. 准备工作完成后,会调用consumer.start()逻辑启动这个Consumer,而Consumer在进行消费消息的时候,真正的逻辑是启动之后的某个线程里面完成的,后面会主要对这个线程进行详细的分析和说明

Consumer启动

consumer.start()逻辑中,会执行this.defaultMQPushConsumerImpl.start()的start方法,当一个新的Consumer在启动的时候,会在这个start逻辑中执行做如下几个比较重要的操作:

  1. this.checkConfig(); 校验配置参数。

  2. this.copySubscription(); copy一个订阅关系,主要是针对重试逻辑的准备操作,后面文章会详细说明

  3. 消费相关快照的初始化操作

  4. this.consumeMessageService.start(); 对于并发消费Listener来说主要是清理过期的消息

  5. mQClientFactory.start(); mQClientFactory 的初始化和start

  6. mQClientFactory.start()的具体逻辑如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public void start() throws MQClientException {
    synchronized (this) {
    switch (this.serviceState) {
    case CREATE_JUST:
    this.serviceState = ServiceState.START_FAILED;
    // If not specified,looking address from name server
    if (null == this.clientConfig.getNamesrvAddr()) {
    this.mQClientAPIImpl.fetchNameServerAddr();
    }
    // Start request-response channel
    this.mQClientAPIImpl.start();
    // Start various schedule tasks
    this.startScheduledTask();
    // Start pull service
    this.pullMessageService.start();
    // Start rebalance service
    this.rebalanceService.start();
    // Start push service
    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    log.info("the client factory [{}] start OK", this.clientId);
    this.serviceState = ServiceState.RUNNING;
    break;
    case START_FAILED:
    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    default:
    break;
    }
    }
    }

    对于消息消费而言,在如上逻辑中 最终的的是 this.pullMessageService.start();

PullMessageService线程拉取消息

PullMessageService 是一个线程,在run方法中会不断的从Request请求队列中获取请求,从Broker拉消息,PullRequest 是一个异步调用过程,异步调用,拉到消息后,将消息放到本地缓存TreeMap中,,然后交给另外一个消费线程中(线程隔离思想,专事专干),当Case FOUND后,处理完当前逻辑后,就会一直循环拉取

mQClientFactory.start()逻辑中,启动Consumer的时候,会执行pullMessageService.start()逻辑,而PullMessageService是一个Thread,继承了ServiceThread,PullMessageService中对应的run方法逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void run() {
log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}

在该线程定义了一个名为pullRequestQueue的LinkedBlockingQueue,用来存放拉取数据的请求对象。

在线程的run方法中,会不断的从这个队列pullRequestQueue中take队列中的PullRequest对象,然后去拉取消息。针对pullRequestQueue队列中元素的来源,后面会详细介绍,先来了解一下pullMessage(pullRequest)的具体逻辑:

1
2
3
4
5
6
7
8
9
10
11
private void pullMessage(final PullRequest pullRequest) {
// 获取到对应的MQConsumerInner
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// pullMessage
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}

pullMessage 主要逻辑,为了减少篇幅过长,会适当省略不重要的逻辑和相关性不大的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public void pullMessage(final PullRequest pullRequest) {
// 本队消费队列
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// 确保consumer State 是 RUNNING 状态
try {
this.makeSureStateOK();
} catch (MQClientException e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}

if (this.isPause()) {
// 暂缓消费
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}

// 消息流控
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
// processQueue 的消息数量 大于 1000,触发流控
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
// 流控延迟拉取
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

// rocessQueue 的消息大小 大于 100 MB,触发流控
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 流控延迟拉取
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}

if (!this.consumeOrderly) {
// processQueue 中偏移量最大的消息与偏移量最小的消息的跨度超过阈值,触发流控(2000)
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
// 流控延迟拉取
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
return;
}
} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
// 计算拉取offset
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
}
} else {
// 取保在消费前processQueue是Locked
// 延迟拉取
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
}

final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
// 订阅信息为空时延迟拉取(不在线)
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
// Callback 定义
PullCallback pullCallback; // 省略(回调的时候分析)

boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {
commitOffsetEnable = true;
}
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()){
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
// 消息拉取
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
// 延迟拉取
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

上面逻辑中,在进行消息拉取之前,会先校验服务状态以及根据缓存的数据进行一些流控的限制(消息数量、消息大小、消息跨度)。不满足拉取消息的机制出触发后,会调用executePullRequestLater进行延迟拉取(将pullRequest put 到队列pullRequestQueue中,等待下一次的拉取操作)。

这也就对上面遗留的问题即pullRequestQueue队列中数据来源做了部分的回答。

在进行消息拉取的时候,会调用pullKernelImpl逻辑,在这个逻辑中,会封装一个PullMessage的请求(请求Code:RequestCode.PULL_MESSAGE)去完成消息的拉取

消息拉取回调

以异步消息为例,在消息拉取响应后,回调用pullCallback中的Success逻辑或者Exception逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// Callback 定义
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData);

switch (pullResult.getPullStatus()) {
case FOUND: // 找到消息
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
// 存在消息
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
// put MessageList 到TreeMap中
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// 并将Msg分组交给消费线程
/**
* 如果拉取到的数量小于等于consumeBatchSize ,直接将这一堆Msg 封装成一个ConsumeRequest的Run 提交给 consumeExecutor 线程
* 如果大于consumeBatchSize,则将这批消息按照consumeBatchSize 进行分组,然后分批提交到consumeExecutor线程
*/
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
case NO_NEW_MSG : // 没新消息
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 将pullRequest立刻放入队列中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG: // 没匹配到消息
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
// 将pullRequest立刻放入队列中
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
// 消息不合法,移除
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
} catch (Throwable e) {
}
}
}, 10000);
break;
default:
break;
}
}
}

@Override
public void onException(Throwable e) {
// 延迟拉取
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}

// 拉取逻辑
private void pullMessageAsync(
final String addr,
final RemotingCommand request,
final long timeoutMillis,
final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
// 获取到对应的PullRequest
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
// 成功,执行上面的成功回调
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
// 异常
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr +". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + "timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request, responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr +", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
}

即整个消费流程可以简单总结为:PullMessageService 从PullRequestQueue中获取到一个PullRequest后,交给具体的线程从Broker中拉取消息,如果没有拉取到,将这个PullRequest重新放到队列中,稍后再进行拉取操作。如果拉取到后,首先将消息放到ProcessQueue中的TreeMap中,然后将消息按照BatchMessageSize进行分组,然后将每一组消息封装成一个ConsumerRequest的Runnable,并将Runnable提交到ConsumeMessageThreadPool中,然后完成对应的消费,具体处理消息消费的逻辑如下

消息消费

下面逻辑为ConsumeRequest的一些核心逻辑,省略了一些非关键代码

注:ConsumeRequest是ConsumeMessageService的内部类,ConsumeMessageService在启动时注册的监听器在ConsumeRequest中时都能拿到的。

ConsumeMessageService的consumeExecutor获取到ConsumeRequest任务会调用到run,获取到对应的listener,然后通过这个listener调用客户端的消费方法consumeMessage处理用户自己的消费逻辑,并返回对应的消费状态,然后根据返回的消费状态来操作处理结果和更新消费进度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class ConsumeRequest implements Runnable {
private final List<MessageExt> msgs;
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

@Override
public void run() {
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
// 根据消费状态确定具体的ReturnType
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 消费后返回的Status
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) returnType = ConsumeReturnType.EXCEPTION;
else returnType = ConsumeReturnType.RETURNNULL;
// 异常逻辑,未返回具体的消费状态场景
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (null == status) {
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
if (!processQueue.isDropped()) {
// 处理结果,更新消费进度
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}

}

遗留问题

前面遗留的pullRequestQueue的数据来源问题,在前面PullMessageService线程拉取消息章节的内容中已经解答了部分,这种情况主要是针对拉取失败后重新延迟拉取的场景,下面内容主要对另外一场场景的数据来源进行说明(重要)。

还有一种场景是在RebalanceService在dispatchPullRequest的时候,会执行该操作。RebalanceService在调用RebalanceByTopic的时候会对不同的Topic分配不同的MessageQueue,在dispatchPullRequest逻辑之前,会对每一个MessageQueue创建一个PullRequest,并添加到pullRequestQueue 中, 相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
} else {
// 创建pullRequest
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
}

// pullRequestQueue
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
for (PullRequest pullRequest : pullRequestList) {
this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); // this.pullRequestQueue.put(pullRequest);
}
}

消费流程总结

客户端启动之后,RebalanceService会为Topic分配不同的MessageQueue,同时会针对不同的MessageQueue创建不同的PullRequest对象,并将这些PullRequest添加到pullReqeustQueue队列中。PullMessageService线程会周期性的从这个队列中获取对应的PullRequest从Broker拉取消息(拉取消息又专门的拉取线程),拉取到消息后将拉取到的Message添加到本地缓存中(processQueue的TreeMap中),并将这些消息按照ConsumeBatchSize数量进行分组,每一组封装成一个ConsumeRequest(Runnable)并丢到ConsumeMessageService的一个线程池中,该类是一个ConsumeMessageService内部类,在处理该Runnable的时候会获取到对应的Listener,然后通过Listener调用客户端自定义的consumeMessage消费逻辑(从这个地方可以看出我们在处理消息时候的List< Message> msg 的数量是在封装ConsumeRequest的时候已经分配好的),客户端处理完成后,返回对应额消费状态,拿到结果处理消费进度或者将消息重新写入磁盘(失败的场景)

注:图片来自网络:


RocketMQ——消息消费流程原理
http://yoursite.com/post/1ae954e8.html/
Author
Chase Wang
Posted on
March 31, 2023
Licensed under