RocketMQ——事务消息原理分析

事务消息交互流程

  • 图片摘自官网
  • 事务消息发送步骤如下(摘抄自官网):

    1、生产者将半事务消息发送至 RocketMQ Broker

    2、RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

    3、生产者开始执行本地事务逻辑。

    4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

      二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者

      二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

    5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查

    6、需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置

    7、(回查)生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。

    8、(回查)生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理

事务消息使用上的限制

  1. 事务消息不支持延时消息和批量消息。
  2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。
  3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
  4. 事务性消息可能不止一次被检查或消费。
  5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

事务消息发送原理分析

事务消息发送入口:DefaultMQProducerImpl#sendMessageInTransaction()
在该逻辑中,首先会对发送的消息做一层封装,做一些property属性的填充
然后调用同步发送逻辑(参考前面文章)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException {
TransactionListener transactionListener = getCheckListener();
/**
* 事务实现的两种方式
* 1. 实现 TransactionListener
* 2. 实现 LocalTransactionExecuter 和 TransactionCheckListener 不建议使用,5.0.0 版本移除
*/
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}

// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
// 标明是事务消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
// 设置 PGROUP 为 producerGroup(用于回查逻辑)
/**
* 1. broker 向生产者发送请求时,通过PGROUP来寻找channel
* 2. 生产者端通过 PGROUP 属性从producerTabel 中找生产者实例,用来执行本地事务状态逻辑
*/
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 执行本地事务
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}

try {
// 发起endTransaction(oneway)
// commit 或 rollback
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}

TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}

以上逻辑中主要做了如下几件事情:

  1. 校验事务实现 TransactionListener 和 LocalTransactionExecuter、TransactionCheckListener
    LocalTransactionExecuter、TransactionCheckListener 这种方式不建议使用(官网:5.0.0 版本移除)
  2. 清理DELAY属性,事务消息不支持延迟消息
  3. 消息检查,包括Topic、消息体等
  4. 设置Msg的Property:TRAN_MSG = true,标明是事务消息
  5. 设置Msg的Property:PGROUP 为 this.defaultMQProducer.getProducerGroup()
    1. broker 向生产者发送请求时,通过PGROUP来寻找channel
    2. 生产者端通过 PGROUP 属性从producerTabel 中找生产者实例,用来执行本地事务状态逻辑
  6. 发送同步消息(前面文章已分析),事务消息为prepare消息(第一阶段提交),prepare 消息后面详细分析:
  7. 根据返回的结果执行相应的逻辑
    1. 返回SEND_OK,执行本地事务,得到本地事务执行结果 LocalTransactionState(COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW)
    2. FLUSH_DISK_TIMEOUT/FLUSH_SLAVE_TIMEOUT/SLAVE_NOT_AVAILABLE时 返回 ROLLBACK_MESSAGE
    3. 其余情况为UNKNOW
  8. endTransaction(第二阶段提交):获取到对应的LocalTransactionState后,会发起 END_TRANSACTION 请求,后面详细分析。

事务消息一阶段提交:prepare消息(HalfMessage)

HalfMessage: 事务消息在第一阶段提交后会向系统队列中写入一条HalfMsg,该队列中的Msg不会被消费者消费。
Client在发送消息是,会封装一个 RequestCode.SEND_MESSAGE的Request,Broker端会对应有一个 SendMessageProcessor 来处理对应的请求逻辑,在SendMessageProcessor中,processRequest是处理请求的核心逻辑,在该逻辑中,会调用到如下的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request,final SendMessageContext sendMessageContext,final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
// 省略和事务逻辑无关的逻辑代码
// ...........
// 判断是否是事务消息 TRAN_MSG = true
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (traFlag != null && Boolean.parseBoolean(traFlag)
&& !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { //For client under version 4.6.1
// broker配置:rejectTransactionMessage 默认为false
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
return response;
}
// prepare,落盘halfMsg
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
}

说明:
当发送一条事务消息后,Broker在处理消息的Processor中,会根据发送消息时设置的TRAN_MSG属性来判断是否是事务消息,并且Broker允许事务消息的情况下会执行this.brokerController.getTransactionalMessageService().prepareMessage(msgInner)逻辑,在该逻辑中,会对HalfMessage进行落盘操作
具体调用如下(TransactionalMessageBridge#putHalfMessage):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
// parseHalfMessageInner:对mssageInner,设置Topic 和 queueId
return store.putMessage(parseHalfMessageInner(messageInner));
}

// parseHalfMessageInner 逻辑如下:
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 将真实的 topic 和 queueId 设置到properties中,用于后续做消息Topic和QueueId的还原操作
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId()));
/**
* 将消息的sysFlag重置为0,因为CommitLogDispatcherBuildConsumeQueue再更新consumerQueue时会跳过TRANSACTION_PREPARED_TYPE 和
* TRANSACTION_ROLLBACK_TYPE的消息,这里重置为0后,就不会影响对 RMQ_SYS_TRANS_HALF_TOPIC 的ConsumerQueue的更新
*/
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 设置Topoic 为 RMQ_SYS_TRANS_HALF_TOPIC queueId 为 0(该topic只有一个queue)
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}

说明:
在上述逻辑中,将该消息的真实Topic和QueueId会退避到Msg对应的Proerty中,将对应的Topic设置为系统Topic:RMQ_SYS_TRANS_HALF_TOPIC,将对应的QueueId设置为 0 ,因为该Topic只有一个Queue
完成message的修改封装后,调用store.putMessage(msg),对当前prepare消息(HalfMsg)进行落盘(不可以被消费)。

事务消息二阶段提交:endTransaction

通过上面发送prepare消息返回的结果,如果是sendOk,执行本地事务逻辑,根据返回的LocalTransactionState,通过调用endTransaction,进行对halfMsg的commit操作或者rollback操作。
具体调用逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}

在上面 endTransaction 逻辑中,会先创建一个 EndTransactionRequestHeader 的header,然后根据localTransactionState的状态,设置对应的CommitOrRollback状态(分别是TRANSACTION_COMMIT_TYPE、TRANSACTION_ROLLBACK_TYPE、TRANSACTION_NOT_TYPE),然后设置好相应属性和对应的brokerAddr后,会调用endTransactionOneway,向Broker发起一个 RequestCode.END_TRANSACTION 的oneway请求。
具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
public void endTransactionOneway(
final String addr,
final EndTransactionRequestHeader requestHeader,
final String remark,
final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
// END_TRANSACTION 请求
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
request.setRemark(remark);
// oneway
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
}

同样,Broker端也会对应有一个处理 END_TRANSACTION 请求的 RequestProcessor(EndTransactionProcessor),在该Processor中的processRequest处理请求逻辑中,会对该请求逻辑进行如下处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
// ...... 省略非核心逻辑
// 判断END_TRANSACTION请求是通过回查后发起的还是执行完本地事务发起的 (省略) ....
// 省略后续的日志打印逻辑 .....
OperationResult result = new OperationResult();
// 根据COMMIT_TYPE 或 ROLLBACK_TYPE 分别处理相应的逻辑
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 根据 requestHeader.getCommitLogOffset() 从CommitLog 获取到 prepare 消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 检查合法性
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 如果commit 还原真是的 topic 和 queueId
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
// sysFlag 设置为 TRANSACTION_COMMIT_TYPE
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
// 将还原后的Message重新落盘,消息可被消费
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 删除halfMsg
// 具体逻辑是构建一个消息放入 RMQ_SYS_TRANS_OP_HALF_TOPIC 队列,queueId为对应half消息的queueId
// 消息内容为对应half消息的queueOffset,并将TAGS属性设置为“d”
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
// 根据 requestHeader.getCommitLogOffset() 从CommitLog 获取到 prepare 消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
// 校验合法性
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 删除halfMsg
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}

说明:
在上述处理二阶段提交的逻辑中,会分别对 TRANSACTION_COMMIT_TYPE 和 TRANSACTION_ROLLBACK_TYPE 进行不同的逻辑处理:

TRANSACTION_COMMIT_TYPE

  1. transactionalMessageService().commitMessage(requestHeader) 逻辑中调用getHalfMessageByOffset(requestHeader.getCommitLogOffset()),拿到对应的 HalfMessage
  2. 校验消息合法性(group、transaction state、commit log offset、prepared transaction 等)
  3. endMessageTransaction(result.getPrepareMessage()):还原真实的 Topic 和 QueueId,封装真实的Msg
  4. sendFinalMessage:将还原后的Msg进行落盘
  5. deletePrepareMessage:删除halfMessage(后续详细介绍)

TRANSACTION_ROLLBACK_TYPE

  1. 和上面CommitType逻辑一模一样,获取到对应的HalfMessage
  2. 同上面一样,校验消息合法性
  3. deletePrepareMessage:删除halfMessage(后续详细介绍)

总结:
broker在处理二阶段提交的过程中,TRANSACTION_COMMIT_TYPE 和 TRANSACTION_ROLLBACK_TYPE 的处理逻辑基本一样,唯一不同的是在进行commit的操作中,需要重新针对HalfMsg封装一个RealMsg(还原真实的Topic和QueueId等),将该消息进行落盘保存,供消费者消费。

deletePrepareMessage 删除halfMsg

由于rocketMQ中是消息落盘是一个顺序写的过程,并且在ConsumeQueue中已经维护了消息的真实地址,如果对prepareMessage进行删除是一个很繁琐,并且开销很大的过程,所以这个地方的删除不是真正的删除halfMsg。
RocketMQ针对已经commit或者rollback这两种已经确定的状态的消息,会对应的向 RMQ_SYS_TRANS_OP_HALF_TOPIC 这个Topic中新增一条对应的op Message,用来标识对应的prepare消息已经被提交或回滚,这样在后面Broker进行消息回查的时候,就可以通过比对OP_HALF_MESSAGE来判断是否需要对某一条消息进行回查了。

deletePrepareMessage 逻辑如下:

1
2
3
4
5
6
7
8
// 在该逻辑中,调用 transactionalMessageBridge.putOpMessage 完成opMessage的写入
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
return false;
}

putOpMessage 逻辑如下:

1
2
3
4
5
6
7
8
public boolean putOpMessage(MessageExt messageExt, String opType) {
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(), this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
// 上面逻辑中,入参为固定的 TransactionalMessageUtil.REMOVETAG,所以会进入addRemoveTagInTransactionOp逻辑
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}

addRemoveTagInTransactionOp 逻辑:

1
2
3
4
5
6
7
// TransactionalMessageUtil.buildOpTopic() = RMQ_SYS_TRANS_OP_HALF_TOPIC
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
writeOp(message, messageQueue);
return true;
}

从该逻辑中可以看到,设置的Topic为RMQ_SYS_TRANS_OP_HALF_TOPIC,tags为 TransactionalMessageUtil.REMOVETAG(”d”),消息体为halfMsg对应的offset
然后调用 writeOp 方法,在该方法内部完成对Op消息的封装,最终调用 store.putMessage(messageInner) 完成对 opMsg 的落盘操作。


RocketMQ——事务消息原理分析
http://yoursite.com/post/181138b3.html/
Author
Chase Wang
Posted on
September 27, 2022
Licensed under