消息存储
消息的存储做为RocketMQ设计中极为重要的一个环节和架构设计,非常值得去学习它的设计和思想,本文主要针对消息存储的部分逻辑做简要的说明和分析,做一个简单的入门学习。存储本身是该中间件的一个核心组件,包含了诸多的设计理念和设计思想,关于它的一些细节设计和思想会到后续的文章中进行细致的说明和分析。
消息存储触发时机
前面的文章中对消息发送的逻辑做了细致的分析和说明,Producer将消息发送到Broker后,Broker会做怎样的处理呢?
Broker收到Message存储请求,首先会进行如下步骤的处理操作:
- 首先Producer发送一个Message,会在MQClientAPIImpl#sendMessage逻辑中创建一个 RequestCode.SEND_MESSAGE 请求(以普通消息为例), 最后通过NettyRemoting组件将该请求发送到服务端(以同步消息为例),如下逻辑之前的文章中也有介绍。 | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 
 | RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
 
 public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
 final long timeoutMillis)
 throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
 
 final int opaque = request.getOpaque();
 try {
 final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
 this.responseTable.put(opaque, responseFuture);
 final SocketAddress addr = channel.remoteAddress();
 
 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
 @Override
 public void operationComplete(ChannelFuture f) throws Exception {
 
 }
 });
 
 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
 
 return responseCommand;
 } finally {
 this.responseTable.remove(opaque);
 }
 }
 
 |  
 
- Broker服务在启动的时候,会在BrokerStartup#start方法中初始化BrokerController,然后在调用initialize的时候,会通过registerProcessor() 向对应的RemotingServer中注册一系列的 NettyRequestProcessor ,分别对应不同请求的Processor | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 
 | public void registerProcessor() {
 
 
 SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
 sendProcessor.registerSendMessageHook(sendMessageHookList);
 sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
 
 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
 this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
 this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
 
 
 
 
 }
 
 |  
 
- 每个Processor会通过NettyRequestProcessor中的processRequest中处理相应的请求,对于普通消息的处理逻辑如下: | 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 
 | public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {@Override
 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
 SendMessageContext mqtraceContext;
 switch (request.getCode()) {
 case RequestCode.CONSUMER_SEND_MSG_BACK:
 return this.consumerSendMsgBack(ctx, request);
 default:
 SendMessageRequestHeader requestHeader = parseRequestHeader(request);
 mqtraceContext = buildMsgContext(ctx, requestHeader);
 this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
 
 RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
 this.executeSendMessageHookAfter(response, mqtraceContext);
 return response;
 }
 }
 }
 
 |  
 
写消息前的一些校验工作
Message在写入到CommitLog之前,会做如下的验证操作
- 判断服务是否可用
- 判断节点角色,是否时Master节点,因为Slave节点不允许写入
- 判断是否有写权限
- 校验Topic合法性,长度是否合理
- Message相关属性校验,主要是长度限制校验
- 写消息前判断OSPageCache是否可用(Broker保护机制:可能有其它线程在处理,或者写入消息耗时超过阈值:1s)
如下逻辑为上述工作相关源码
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 
 | public PutMessageResult putMessage(MessageExtBrokerInner msg) {
 
 if (this.shutdown) {
 return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
 }
 
 
 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
 long value = this.printTimes.getAndIncrement();
 return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
 }
 
 if (!this.runningFlags.isWriteable()) {
 long value = this.printTimes.getAndIncrement();
 return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
 } else {
 this.printTimes.set(0);
 }
 
 if (msg.getTopic().length() > Byte.MAX_VALUE) {
 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
 }
 
 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
 return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
 }
 
 
 if (this.isOSPageCacheBusy()) {
 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
 }
 
 long beginTime = this.getSystemClock().now();
 
 PutMessageResult result = this.commitLog.putMessage(msg);
 long elapsedTime = this.getSystemClock().now() - beginTime;
 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
 if (null == result || !result.isOk()) {
 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
 }
 return result;
 }
 
 | 
消息刷盘前的工作
Message在真正写入CommitLog文件的时候,都是通过MappedFile来完成的
MappedFile类是RocketMQ消息存储模块中最底层的类, 它是对MappedByteBuffer(mmap)的进一步封装,能够更方便的去操作和使用 mmap零拷贝进而高效的完成读写操作。对于CommitLog而言,每一个文件都会映射成一个MappedFile文件,通过向映射文件中写入消息将消息写入内存buffer,最后通过刷盘机制将内存中的消息刷到此片。关于MappedFile,后续会单独做详细的说明
本文主要分析真正刷盘之前做了哪些操作?
- 消息存储时间设置
- 消息体CRC校验设置
- 实例化存储指标统计Service,便于统计消息流转详细信息
- 如果是延时消息、定时消息做一些相关处理:退避真实Topic、queueId到Msg的property,设置系统Topic和QueueId,便于后续系统做延时处理
- 设置发、收的ip:port
- 获取或创建MappedFile(后续单独介绍)
- 将Message信息刷新到MappedFile内存(writeBuffer/mappedByteBuffer)
- 判断是否处理成功,成功做后续的刷盘操作,失败则返回对应的错误标识到客户端
如下逻辑为消息真正刷盘前的处理逻辑。同样,为了避免过长的篇幅,会对不重要的部分做精简处理和省略处理(不对核心逻辑做省略精简)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 
 | public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
 msg.setStoreTimestamp(System.currentTimeMillis());
 
 msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
 AppendMessageResult result = null;
 
 StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
 
 String topic = msg.getTopic();
 int queueId = msg.getQueueId();
 
 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
 || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
 
 if (msg.getDelayTimeLevel() > 0) {
 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
 msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
 }
 
 topic = ScheduleMessageService.SCHEDULE_TOPIC;
 
 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
 
 
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
 msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
 msg.setTopic(topic);
 msg.setQueueId(queueId);
 }
 }
 
 InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
 if (bornSocketAddress.getAddress() instanceof Inet6Address) {
 msg.setBornHostV6Flag();
 }
 
 InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
 if (storeSocketAddress.getAddress() instanceof Inet6Address) {
 msg.setStoreHostAddressV6Flag();
 }
 long elapsedTimeInLock = 0;
 MappedFile unlockMappedFile = null;
 
 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
 
 
 putMessageLock.lock();
 try {
 long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
 this.beginTimeInLock = beginLockTimestamp;
 
 
 
 
 msg.setStoreTimestamp(beginLockTimestamp);
 
 if (null == mappedFile || mappedFile.isFull()) {
 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 }
 
 if (null == mappedFile) {
 
 
 beginTimeInLock = 0;
 return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
 }
 
 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 switch (result.getStatus()) {
 case PUT_OK:
 break;
 case END_OF_FILE:
 unlockMappedFile = mappedFile;
 
 
 mappedFile = this.mappedFileQueue.getLastMappedFile(0);
 if (null == mappedFile) {
 
 log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
 beginTimeInLock = 0;
 return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
 }
 result = mappedFile.appendMessage(msg, this.appendMessageCallback);
 break;
 case MESSAGE_SIZE_EXCEEDED:
 case PROPERTIES_SIZE_EXCEEDED:
 beginTimeInLock = 0;
 return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
 case UNKNOWN_ERROR:
 beginTimeInLock = 0;
 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
 default:
 beginTimeInLock = 0;
 return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
 }
 
 elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
 beginTimeInLock = 0;
 } finally {
 putMessageLock.unlock();
 }
 
 if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
 this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
 }
 PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
 
 storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
 storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
 
 handleDiskFlush(result, putMessageResult, msg);
 handleHA(result, putMessageResult, msg);
 return putMessageResult;
 }
 
 | 
mark1 介绍(简要说明,具体MappedFile后续会详细说明)
关于上述逻辑中mark1 的说明如下:
上述逻辑mark1中,只有当appendMessage返回的result.getStatus()为END_OF_FILE时才会执行,返回END_OF_FILE标识意味着原来的Mapped文件已经写完,要换一个新的MappedFile重新写。对于原来的MappedFile需要做怎样的处理呢?
原因是,再创建MappedFile(后续文章分析说明)的时候,如果开启了预热,再进行预热的时候,会做mlock操作(防止由于内存不够,操作系统将已经分配的内存空间回收)
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 
 | 
 
 
 if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
 
 this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
 }
 
 
 
 
 
 public void unlockMappedFile(final MappedFile mappedFile) {
 this.scheduledExecutorService.schedule(new Runnable() {
 @Override
 public void run() {
 mappedFile.munlock();
 }
 }, 6, TimeUnit.SECONDS);
 }
 
 |