RocketMQ——消息存储原理分析

消息存储

消息的存储做为RocketMQ设计中极为重要的一个环节和架构设计,非常值得去学习它的设计和思想,本文主要针对消息存储的部分逻辑做简要的说明和分析,做一个简单的入门学习。存储本身是该中间件的一个核心组件,包含了诸多的设计理念和设计思想,关于它的一些细节设计和思想会到后续的文章中进行细致的说明和分析。

消息存储触发时机

前面的文章中对消息发送的逻辑做了细致的分析和说明,Producer将消息发送到Broker后,Broker会做怎样的处理呢?

Broker收到Message存储请求,首先会进行如下步骤的处理操作:

  1. 首先Producer发送一个Message,会在MQClientAPIImpl#sendMessage逻辑中创建一个 RequestCode.SEND_MESSAGE 请求(以普通消息为例), 最后通过NettyRemoting组件将该请求发送到服务端(以同步消息为例),如下逻辑之前的文章中也有介绍。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 创建请求
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
    // 发送请求(以同步消息为例)
    public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    // 请求唯一标识
    final int opaque = request.getOpaque();
    try {
    final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
    this.responseTable.put(opaque, responseFuture);
    final SocketAddress addr = channel.remoteAddress();
    // writeAndFlus
    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture f) throws Exception {
    // 发送结果封装(省略)
    }
    });
    // 阻塞等待
    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    // 相应信息组装(省略)
    return responseCommand;
    } finally {
    this.responseTable.remove(opaque);
    }
    }
  2. Broker服务在启动的时候,会在BrokerStartup#start方法中初始化BrokerController,然后在调用initialize的时候,会通过registerProcessor() 向对应的RemotingServer中注册一系列的 NettyRequestProcessor ,分别对应不同请求的Processor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public void registerProcessor() {
    /**
    * SendMessageProcessor
    */
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    // 省略处理其他请求的Processor

    // ......

    }
  3. 每个Processor会通过NettyRequestProcessor中的processRequest中处理相应的请求,对于普通消息的处理逻辑如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
    case RequestCode.CONSUMER_SEND_MSG_BACK:
    return this.consumerSendMsgBack(ctx, request);
    default:
    SendMessageRequestHeader requestHeader = parseRequestHeader(request);
    mqtraceContext = buildMsgContext(ctx, requestHeader);
    this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
    // 省略不相干逻辑,本章节只需关注 sendMessage 逻辑即可
    RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
    this.executeSendMessageHookAfter(response, mqtraceContext);
    return response;
    }
    }
    }

写消息前的一些校验工作

Message在写入到CommitLog之前,会做如下的验证操作

  1. 判断服务是否可用
  2. 判断节点角色,是否时Master节点,因为Slave节点不允许写入
  3. 判断是否有写权限
  4. 校验Topic合法性,长度是否合理
  5. Message相关属性校验,主要是长度限制校验
  6. 写消息前判断OSPageCache是否可用(Broker保护机制:可能有其它线程在处理,或者写入消息耗时超过阈值:1s)

如下逻辑为上述工作相关源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 为了篇幅过程,会省略部分无关紧要的逻辑,如日志打印等
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 服务判断
if (this.shutdown) {
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}

// slave 节点不处理
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
// 判断是否有写权限,如果不可写则不能处理客户端发送的消息
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
// 长度限制
if (msg.getTopic().length() > Byte.MAX_VALUE) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
// MSG 扩展属性长度限制
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
// 写消息前,判断OSPageCache是否可用
// 如果返回true,标识有消息在写入,并且写入的消息耗时较长(超过了1s),则本条消息不再写入,保护broker
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}

long beginTime = this.getSystemClock().now();
// 写消息
PutMessageResult result = this.commitLog.putMessage(msg);
long elapsedTime = this.getSystemClock().now() - beginTime;
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}

消息刷盘前的工作

Message在真正写入CommitLog文件的时候,都是通过MappedFile来完成的

MappedFile类是RocketMQ消息存储模块中最底层的类, 它是对MappedByteBuffer(mmap)的进一步封装,能够更方便的去操作和使用 mmap零拷贝进而高效的完成读写操作。对于CommitLog而言,每一个文件都会映射成一个MappedFile文件,通过向映射文件中写入消息将消息写入内存buffer,最后通过刷盘机制将内存中的消息刷到此片。关于MappedFile,后续会单独做详细的说明

本文主要分析真正刷盘之前做了哪些操作?

  1. 消息存储时间设置
  2. 消息体CRC校验设置
  3. 实例化存储指标统计Service,便于统计消息流转详细信息
  4. 如果是延时消息、定时消息做一些相关处理:退避真实Topic、queueId到Msg的property,设置系统Topic和QueueId,便于后续系统做延时处理
  5. 设置发、收的ip:port
  6. 获取或创建MappedFile(后续单独介绍)
  7. 将Message信息刷新到MappedFile内存(writeBuffer/mappedByteBuffer)
  8. 判断是否处理成功,成功做后续的刷盘操作,失败则返回对应的错误标识到客户端

如下逻辑为消息真正刷盘前的处理逻辑。同样,为了避免过长的篇幅,会对不重要的部分做精简处理和省略处理(不对核心逻辑做省略精简)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// 存储时间设置
msg.setStoreTimestamp(System.currentTimeMillis());
// CRC校验,防篡改
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
AppendMessageResult result = null;
// 存储指标统计Service
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

String topic = msg.getTopic();
int queueId = msg.getQueueId();

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 处理延时、定时消息
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
// 更改Topic(SCHEDULE_TOPIC_XXXX),延时消息固定Topic
topic = ScheduleMessageService.SCHEDULE_TOPIC;
// 不同级别的延迟消息写入到不同的队列中,新的队列为 延迟级别 - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
// 备份源 Topic 和 QueueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
// host地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address) {
msg.setBornHostV6Flag();
}
// store地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address) {
msg.setStoreHostAddressV6Flag();
}
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 获取最近的一个CommitLog文件的内存映射文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// putMessage 时会有多个线程并行处理的场景,防止写入错乱需要上锁
// 可以在Broker配置是可重入锁还是自旋锁
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;

// Here settings are stored timestamp, in order to ensure an orderly
// global
// 拿到锁后再设置存储时间戳,可以做到全局有序
msg.setStoreTimestamp(beginLockTimestamp);
// 如果没有获取到MappedFile 或者 获取到的 MappedFile 文件写满了,则创建一个新的
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
// 创建失败,返回对应标识
if (null == mappedFile) {
// 消息写入完成后,设置 beginTimeInLock = 0 ,然后释放锁
// 该值是用来计算消息写入耗时,写入新消息前,会根据该值来检查操作系统内存页写入是否繁忙,如果上一条在1s内没有写入,则本条不允许在写入
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
// 将Message 刷新到MappedFile 的内存 writeBuffer/mappedByteBuffer ,没有落盘
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
// 返回END_OF_FILE标识后,换一个新的MappedFile,重新写
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED: // 消息长度超过了最大阈值
case PROPERTIES_SIZE_EXCEEDED: // 消息属性长度超过了最大阈值
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR: // 未知错误
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}

elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock(); // 释放锁
}
// mark1
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
// 上述操作只是将消息刷到PageCache中,handleDiskFlush 进行刷盘操作
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}

mark1 介绍(简要说明,具体MappedFile后续会详细说明)

关于上述逻辑中mark1 的说明如下:

上述逻辑mark1中,只有当appendMessage返回的result.getStatus()为END_OF_FILE时才会执行,返回END_OF_FILE标识意味着原来的Mapped文件已经写完,要换一个新的MappedFile重新写。对于原来的MappedFile需要做怎样的处理呢?

原因是,再创建MappedFile(后续文章分析说明)的时候,如果开启了预热,再进行预热的时候,会做mlock操作(防止由于内存不够,操作系统将已经分配的内存空间回收)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* END_OF_FILE
* 写入消息用到的MappedFile是新创建的 unlockMappedFile 为已经写满的问题件
*/
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
// 新的MappedFile,并且启用了预热开关,做 unlockMappedFile 处理
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}

/**
* unlockMappedFile 逻辑中,启了一个延时任务(6s)
* 延迟任务中做了munlock,
*/
public void unlockMappedFile(final MappedFile mappedFile) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
mappedFile.munlock(); // 释放内存,允许swap
}
}, 6, TimeUnit.SECONDS);
}

RocketMQ——消息存储原理分析
http://yoursite.com/post/d89fbee.html/
Author
Chase Wang
Posted on
November 27, 2022
Licensed under