RocketMQ——事务消息原理分析之消息回查

背景

基于上一篇文章对事务消息发送的原理分析,本文继续对事务消息原理做进一步的梳理说明

本文主要针对事务消息的回查机制做对应的分析和说明

TransactionalMessageCheckService

Broker在启动的时候(BrokerController#start),会在start方法中调用startProcessorByHa逻辑,在该逻辑中会判断当前Broker的角色是否是Master,如果是Master,会启动 TransactionalMessageCheckService 服务

相关源码如下:

1
2
3
4
5
6
7
8
9
10
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
// 事务消息回查线程
// 调用start,执行对应的run
// public class TransactionalMessageCheckService extends ServiceThread {}
this.transactionalMessageCheckService.start();
}
}
}
JAVA

this.transactionalMessageCheckService.start() 后执行run方法逻辑:

在如下逻辑中,会先获取到事务消息回查的默认间隔(默认间隔为60s),该时间间隔会在进行回查的时候,作为阻塞线程超时时间在后续的逻辑中使用,通过CAS机制,保证事务回查核心逻辑只有一个线程在执行

具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public void run() {
log.info("Start transaction check service thread!");
// 事务消息的回查间隔
// 默认 private long transactionCheckInterval = 60 * 1000;
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
// 循环调用
while (!this.isStopped()) {
// 调用 waitForRunning
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}

/**
* this.waitForRunning(checkInterval); 逻辑如下:
*/
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
// 事务回查核心逻辑
this.onWaitEnd();
return;
}

//entry to wait
waitPoint.reset();

try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
JAVA

在 onWaitEnd() 逻辑中,先会获取到Broker 配置的两个参数,然后调用 check 方法,进行事务回查对应的逻辑

两个参数:

  1. transactionTimeOut

    事务超时时间,默认 6s

  2. transactionCheckMax

    事务最大回查次数,默认 15

相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void onWaitEnd() {
// 事务超时时间 private long transactionTimeOut = 6 * 1000;
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 事务最大回查次数 private int transactionCheckMax = 15;
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
// 事务回查核心逻辑
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
JAVA

下面章节主要介绍 check 回查核心逻辑

TransactionalMessageServiceImpl#check

如下逻辑为事务回查的主要核心逻辑,在该逻辑中主要做了如下几件事:

  1. 获取到 HalfMsg 对应的Topic(RMQ_SYS_TRANS_HALF_TOPIC)的 List (size() = 1)
  2. 获取到 MessageQueue 对应的 offset:halfOffset
  3. 根据 HalfMsg 对应的 MessageQueue 获取到对应的 opQueue
  4. 根据 MessageQueue 和 opQueue 分别获取对应的 offset:halfOffset 和 opOffset
  5. 调用 fillOpRemoveMap 填充两个集合,分别找到已经回查过的 offset 和进一步需要回查的 offset (后续详细说明)
  6. 判断是否超过回查次数或达到被清理的时间,如果满足这种情况,将这类消息写入 TRANS_CHECK_MAX_TIME_TOPIC 中,后续不再对这种情况的 HalfMsg 进行回查操作
  7. 调用 listener.resolveHalfMsg(msgExt) 逻辑,完成消息回查(通过向线程池提交 sendCheckMessage 任务的方式实现)
  8. 更新处理后的 halfOffset 和 opOffset

调用 check 逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@Override
public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) {
try {
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
// 获取 topic 对应的所有MessageQueue( RMQ_SYS_TRANS_HALF_TOPIC 实际只有一个QueueId)
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
//根据 RMQ_SYS_TRANS_HALF_TOPIC 中的 MessageQueue 获取 RMQ_SYS_TRANS_OP_HALF_TOPIC 中的MessageQueue,根据QueueId对应
MessageQueue opQueue = getOpQueue(messageQueue);
// 分别获取不同MessageQueue的当前消费进度
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}

// 回查逻辑已经完成的 opHalfMsg 的 offset
List<Long> doneOpOffset = new ArrayList<>();
// 还不能确定halfMs还需不需要进行回查,需要进一步检查
HashMap<Long /* halfOffset */, Long /* opHalfOffset */> removeMap = new HashMap<>();
/**
* fillOpRemoveMap : opOffset对应的消息体内容(halfMsg对应的Offset)与 halfMsg消费偏移量进行对比
* 1. 如果当前opHalfMsg的currentOffset 对应的消息体的值 < halfMsg currentOffset
* 说明该opHalfMsg对应的halfMsg已经回查过,将该opHalfMsg对应的offset 放入 doneOpOffset
* 2. 如果当前opHalfMsg的currentOffset 对应的消息体的值 >= halfMsg currentOffset:
* 将 halfMsg 的offset 作为key ,opHalfMsg 的oppset 作为value 填充到 removeMap
*/
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
// 当前逻辑中没有新写入的OP消息,进行下一个队列检查
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
while (true) {
// 对于 HALF_TOPIC 中的每一个MessageQueue执行的擦好像逻辑不能超过 60s
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
// 如果 removeMap 中包含当前MessageQueue的 offset,说明该 offset对应的half MSG 已经被committed 或者 rollback
// 不需要再进行回查了,后面会 i++ ,进行下一个offset的检查
// OP_HALF_TOPIC 中的Msg 是 halfMsg commit/rollback后 生成对应的 OP Msg(状态已经确定了的halfMsg)
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else {
// 获取当前 offset 对应的 halfMsg
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
// 当拿到的HalfMsg为空时,根据 MAX_RETRY_COUNT_WHEN_HALF_NULL(默认为1) 进行判断,如果为null的次数超过配置,则break
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
// 没有新消息,直接返回
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
// 为null 获取下一个offset
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
// 如果回查次数超过设定的最大次数(默认:15)或者当前half消息的落盘时间超过设定的 commitlog文件保存时间(72h)
// 则放弃该half msg 的回查,在该队列的检查逻辑结束后更新其offset
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
// 在当前 MessageQueue 的检查逻辑开始后,如果有halfMsg 写入,会发生这种情况
// 因为ConsumeQueue 是按照顺序构建的,所以该MessageQueue 后面的消息也都不用回查了
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}

long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
// 如果生产者侧对特定的消息设置了首次回查免疫时间
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
// 对于非首次回查的消息,免疫时间不起作用,所以即使还没有到免疫时间,也要进行if判断
/**
* 返回true的情况:
* 1.已经回查过
* 2.backToHalfQueue
*/
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
// 如果生产者侧没有设置首次回查免疫时间:如果在免疫时间内,则跳过该halfMsg以及后续所有的halfMsg的回查(因为同一个ConsumeQueue中消息是顺序的)
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
// 为什么要加条件判断
// 依据:消息已存储的时间大于事务超时时间
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1);
/**
* 如果需要回查
* 1.Write messageExt to Half topic again
* 2.向线程池添加回查任务(sendCheckMessage(msgExt);)
*/
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
}
newOffset = i + 1;
i++;
}
// 更新 halfMsg 和 opHalfMsg 的 offset
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
// 计算新的 opOffset
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("Check error", e);
}

}
JAVA

fillOpRemoveMap:

要想理清fillOpRemoveMap里面的逻辑,得先理解事务消息的消息流转以及消息存储机制

回顾事务消息存储机制:

  1. 发送事务消息,起始Topic为用户设置Topic
  2. prepareMsg:将用户Topic退避到Msg对应的Property中,替换为系统Topic:RMQ_SYS_TRANS_HALF_TOPIC,保存HalfMsg
  3. 执行完本地事务/消息回查成功后,针对对应的HalfMsg向OpTopic(RMQ_SYS_TRANS_OP_HALF_TOPIC)保存一条对应的opMsg
  4. 完成上述步骤后,将真实的Msg退避到真实的Topic,以供Consumer消费

对应关系图如下:

  • 来源官网

对应源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Read op message, parse op message, and fill removeMap
*
* @param removeMap Half message to be remove, key:halfOffset, value: opOffset.
* @param opQueue Op message queue.
* @param pullOffsetOfOp The begin offset of op message queue.
* @param miniOffset The current minimum offset of half message queue.
* @param doneOpOffset Stored op messages that have been processed.
* @return Op message result.
*/
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,
MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {
// 根据pullOffsetOfOp 从 opQueue 拉取 OpMsg (32个)
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
if (null == pullResult) {
return null;
}
if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
|| pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset());
return pullResult;
} else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) {
log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue,
pullResult);
return pullResult;
}
List<MessageExt> opMsg = pullResult.getMsgFoundList();
if (opMsg == null) {
log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult);
return pullResult;
}
for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
// 填充 doneOpOffset 和 removeMap
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
log.debug("Remove map: {}", removeMap);
log.debug("Done op list: {}", doneOpOffset);
return pullResult;
}
JAVA

针对以上的逻辑,简单整理了两个集合的填充逻辑

  • 具体如下:

RocketMQ——事务消息原理分析之消息回查
http://yoursite.com/post/3e4c93fa.html/
Author
Chase Wang
Posted on
October 2, 2022
Licensed under