RocketMQ——长轮询机制源码分析

Long polling (长轮询)

Long polling 对比 常规的Push 和Pull

  • Push:Push主要优点是可以保证实时性,因为他的特点在于服务端主动向客户端推送,其次可以简化客户端设计的复杂度,但是正是因为它把主动权交给了服务端,存在一个天然的弊端就是当服务端向客户端推送大量消息的时,如果客户端的消费能力有限,就会压垮客户端。

  • Pull:Pull和Push模式正好相反,主动权是交给了客户端,由客户端主动相服务端进行消息拉取,但是这种情况需要在客户端在维护消息消费进度等一系列问题,同时也会损失一定的实效性,因为具体的消息消费取决于客户端的拉取周期,同时当服务端没有消息的时候,客户端在拉取消息的时候,会有很多无效操作。

  • Long polling:Long poll 则是结合了Push 和 Pull的特性,将二者的优点相结合,在尽可能保证实效性的同时尽可能保证了请求的有效性,避免资源浪费。

    RocketMQ中长轮询的实现机制:当Client在进行消息拉取的时候,请求到达Broker端后,会在Broker端查询是否存在符合要求的消息,如果存在则返回给客户端,正常进行消费。如果在拉取消息返回的状态为PULL_NOT_FOUND,这个时候服务端会Hold住这个请求的信息,当在设置的时间范围内检查到有新的消息到达时,会将最新到达的消息通过原来通道返回到客户端,这样便能实现在有限的请求资源范围内尽可能高时效的消费到服务端的消息

客户端消息拉取入口源码分析

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

DefaultMQPushConsumerImpl#pullMessage该方法在之前的文章中有详细的分析过,此处不在赘述,在该方法的最后逻辑中,具体核心拉取相关呢源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// buildSysFlag 异或逻辑
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS, // 长轮询参数,拉取消息的时候 broker 没有消息并不立即返回,而是先hold住该请求,超时时间默认15s
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}

首先需要关注一下 sysFlag 参数的生成规则,在该逻辑中时将属性suspend直接默认为true了,便于Broker判断是否需要长轮询操作。

其次在如上的代码的拉取操作中需要关注的有两个参数:

  1. BROKER_SUSPEND_MAX_TIME_MILLIS:broker没有消息并不立即返回,而是先hold住该请求,超时时间默认15s
  2. CommunicationMode.ASYNC:消息拉取模式,异步操作

Broker 端消息处理源码

client request:RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);

在Broker端处理请求的逻辑如下:

1
2
3
4
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}

processRequest逻辑中需要着重关注最后一个参数 brokerAllowSuspend = true。🌟

在processRequest逻辑中,经过一系列的鉴权判断和合法性的判断后(Topic、Queue、Broker是否可读等待),会拿着Request中的信息从Broker端的MessageStore(之前的文章中也有详细的说明和介绍)中查询符合条件的消息,最后根据实际的返回情况,判断是否返回的状态为ResponseCode.PULL_NOT_FOUND,长轮询机制只针对于PULL_NOT_FOUND这种没有找到消息的状态进行处理,相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
case ResponseCode.PULL_NOT_FOUND:
// brokerAllowSuspend 入参为true,hasSuspendFlag客户端设置为true
if (brokerAllowSuspend && hasSuspendFlag) {
// 客户端设置的suspendTimeout
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}

String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 封装相关参数为PullRequest
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
// 长轮询
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}

返回PULL_NOT_FOUND,标志着没有查询到符合条件的消息,这时将相关参数封装称一个PullRequest,交由PullRequestHoldService处理(长轮询),在该Service的suspendPullRequest逻辑中,会以topic和queueId构建一个key,然后以key为唯一标识构建一个对象,并添加到容器pullRequestList中(ArrayList pullRequestList),便于后续处理。

RequestHoldService

RequestHoldService也是一个线程服务,在该线程的run方法中,每隔5s中会执行一次this.checkHoldRequest(); checkHoldRequest逻辑中会将上面添加到pullRequestTable中的所有数据进行遍历,最终会调用到notifyMessageArriving逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// run 方法
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
// checkHoldRequest
private void checkHoldRequest() {
// 遍历容器中的Request
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 调用notifyMessageArriving
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}

在notifyMessageArriving逻辑中,会根据入参 topic 和 queueId 构建一个key,从pullRequestTable中获取到对应的ManyPullRequest,然后遍历这些Request,针对每一个PullRequest判断是否有匹配到新的消息,如果有匹配到,则调用executeRequestWhenWakeup逻辑,具体的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// 生成key,用于获取对应的request
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
// 遍历每一个PullRequest
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
}

if (newestOffset > request.getPullFromThisOffset()) {
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode, new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
}
if (match) {
// 匹配到新新消息,调用PullMessageProcessor的executeRequestWhenWakeup,
// 重新执行消息处理逻辑,但这次消息处理的时候,在executeRequestWhenWakeup逻辑中的brokerAllowSuspend时false,
// 这样在后续的判断逻辑中就不在对这个请求进行hold处理
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (Throwable e) {
}
continue;
}
}
// 超时处理
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),request.getRequestCommand());
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}

注:在上面逻辑中通过循环调用executeRequestWhenWakeup逻辑的时候,在executeRequestWhenWakeup内部进行消息处理的时候brokerAllowSuspend = false,这样在针对这种从容器中拿到的请求就Broker在后续的判断逻辑中就不会对这类请求进行hold处理,相关代码为:RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);

ReputMessageService#doReput

在上述checkHoldRequest逻辑中本身就是基于时间5s周期进行循环处理的,这样会存在一个时间误差和及时性处理消息的问题,为了避免这个问题和尽可能及时的消费,RocketMQ在进行构建ConsumeQueue的逻辑中也新增了一个处理该类PullRequest的入口,具体逻辑入口为:org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput,具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 构建ConsumerQueue / Index
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 判断Broker角色以及是否支持长轮询
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
// arriving
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}


// messageArrivingListener.arriving 逻辑
public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// notifyMessageArriving 逻辑同上
this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode,
msgStoreTime, filterBitMap, properties);
}

长轮询流程图

长轮询


RocketMQ——长轮询机制源码分析
http://yoursite.com/post/7584da62.html/
Author
Chase Wang
Posted on
May 28, 2023
Licensed under