内存映射文件概念 ——摘自百度百科
内存映射文件,是由一个文件到一块内存的映射。内存映射文件与虚拟内存有些类似,通过内存映射文件可以保留一个地址空间的区域,同时将物理存储器提交给此区域,内存文件映射的物理存储器来自一个已经存在于磁盘上的文件,而且在对该文件进行操作之前必须首先对文件进行映射。使用内存映射文件处理存储于磁盘上的文件时,将不必再对文件执行I/O操作,使得内存映射文件在处理大数据量的文件时能起到相当重要的作用。
mmap (memory map) 是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。
MappedFile RocketMq中,CommitLog 对应的映射文件为MappedFile,MappedFile继承了ReferenceResource,ReferenceResource的作用是为了更好的管理MappedFile对象的回收。对于MappedFile对象,主要有如下属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class MappedFile extends ReferenceResource { public static final int OS_PAGE_SIZE = 1024 * 4 ; private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0 ); private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0 ); protected final AtomicInteger wrotePosition = new AtomicInteger(0 ); protected final AtomicInteger committedPosition = new AtomicInteger(0 ); private final AtomicInteger flushedPosition = new AtomicInteger(0 ); protected int fileSize; protected FileChannel fileChannel; protected ByteBuffer writeBuffer = null ; protected TransientStorePool transientStorePool = null ; private String fileName; private long fileFromOffset; private File file; private MappedByteBuffer mappedByteBuffer; private volatile long storeTimestamp = 0 ; private boolean firstCreateInQueue = false ; }
偏移量:
再RocketMQ中每个CommitLog的默认文件大小为1G,默认第一个文件的起始偏移量为0,即文件名为00000000000000000000,第二个文件的起始偏移量则时1G的字节数大小(1073742827),所以第二个文件的文件名为00000000001073742827。
在RocketMQ中为了方便计算和寻址,有物理偏移量和相对偏移量之分,物理偏移量也称为全局偏移量,参考点是当前文件和前面的文件。而相对偏移量的参考点则指的时当前文件,即相对偏移量 = 当前文件的物理偏移量 - 当前文件的起始偏移量。
MappedFile文件获取(CommitLog) 上篇文章中主要分析了文件文件刷盘前的一些准备工作,遗留了MappedFile相关细节的分析,该篇中主要针对MappedFile相关逻辑进行分析,首先了解一下MappedFile 文件获取的细节时怎样的。
入口: org.apache.rocketmq.store.CommitLog#putMessage
在进行写Message到CommitLog时,会先获取一个对应的映射文件MappedFile
1 2 MappedFile mappedFile = this .mappedFileQueue.getLastMappedFile();
MappedFile文件获取
getLastMappedFile逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public MappedFile getLastMappedFile () { MappedFile mappedFileLast = null ; while (!this .mappedFiles.isEmpty()) { try { mappedFileLast = this .mappedFiles.get(this .mappedFiles.size() - 1 ); break ; } catch (IndexOutOfBoundsException e) { } catch (Exception e) { log.error("getLastMappedFile has exception." , e); break ; } } return mappedFileLast; }
如果获取到的MappedFile为空或者此MappedFile已经写满,则创建一个新的写入
调用逻辑:mappedFile = this.mappedFileQueue.getLastMappedFile(0);
相关源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 public MappedFile getLastMappedFile (final long startOffset, boolean needCreate) { long createOffset = -1 ; MappedFile mappedFileLast = getLastMappedFile(); if (mappedFileLast == null ) { createOffset = startOffset - (startOffset % this .mappedFileSize); } if (mappedFileLast != null && mappedFileLast.isFull()) { createOffset = mappedFileLast.getFileFromOffset() + this .mappedFileSize; } if (createOffset != -1 && needCreate) { String nextFilePath = this .storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this .storePath + File.separator + UtilAll.offset2FileName(createOffset + this .mappedFileSize); MappedFile mappedFile = null ; if (this .allocateMappedFileService != null ) { mappedFile = this .allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this .mappedFileSize); } else { try { mappedFile = new MappedFile(nextFilePath, this .mappedFileSize); } catch (IOException e) { log.error("create mappedFile exception" , e); } } if (mappedFile != null ) { if (this .mappedFiles.isEmpty()) { mappedFile.setFirstCreateInQueue(true ); } this .mappedFiles.add(mappedFile); } return mappedFile; } return mappedFileLast; }
在该逻辑中主要做了如下几件事:
调用getLastMappedFile逻辑,获取最后一个MappedFile
a). 获取结果为空:说明mappedFiles是空的,一般为初始状态,需要创建第一个映射文件
b). 如果不为空 :判断是否写满,如果写满,需要计算下一个MappedFile的起始偏移量,即上一个起始偏移量+1G的字节大小
创建MappedFile——创建MappedFile逻辑下文单独分析
后续在调用mappedFile.appendMessage(msg, this.appendMessageCallback);向mappedFile中写数据的时候,如果返回END_OF_FILE标识,说明已经写满(因为是多线程工作),则再次执行步骤二重新换一个新的MappedFile文件重新写即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 switch (result.getStatus()) { case PUT_OK: break ; case END_OF_FILE: unlockMappedFile = mappedFile; mappedFile = this .mappedFileQueue.getLastMappedFile(0 ); if (null == mappedFile) { log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0 ; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this .appendMessageCallback); break ; }
MappedFile文件创建 RocketMQ在进行创建MappedFile的时候,除了会创建当前需要的MappedFile文件外,会预先创建下一个MappedFile,同时为了不影响性能,在创建完第一个MappedFile后,会直接返回,不会等待第二个文件是否创建成功,通过采用这种预分配的方式来保证后续需要创建的时候,可以直接拿到这个预创建的文件,不再走后续的创建操作。
创建逻辑入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 String nextFilePath = this .storePath + File.separator + UtilAll.offset2FileName(createOffset); String nextNextFilePath = this .storePath + File.separator + UtilAll.offset2FileName(createOffset + this .mappedFileSize); MappedFile mappedFile = null ;if (this .allocateMappedFileService != null ) { mappedFile = this .allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath, nextNextFilePath, this .mappedFileSize); } else { try { mappedFile = new MappedFile(nextFilePath, this .mappedFileSize); } catch (IOException e) { log.error("create mappedFile exception" , e); } }
allocateMappedFileService初始化 allocateMappedFileService是构造MappedFileQueue的时候以参数的形式传入的,在CommitLog初始化DefaultCommitLog的时候,是通过new 的方式初始化该Service的。
该Service为分配MappedFile的服务,在创建MappedFile的时候,优先会通过allocateMappedFileService来创建,如果为空则通过new MappedFile的方式创建
1 2 3 4 public DefaultMessageStore (final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException { this .allocateMappedFileService = new AllocateMappedFileService(this ); this .allocateMappedFileService.start(); }
putRequestAndReturnMappedFile(提交创建映射文件请求) 如下为整个AllocateRequestService创建MappedFile的请求流程
相关源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 public MappedFile putRequestAndReturnMappedFile (String nextFilePath, String nextNextFilePath, int fileSize) { int canSubmitRequests = 2 ; if (this .messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { if (this .messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool() && BrokerRole.SLAVE != this .messageStore.getMessageStoreConfig().getBrokerRole()) { canSubmitRequests = this .messageStore.getTransientStorePool().availableBufferNums() - this .requestQueue.size(); } } AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize); boolean nextPutOK = this .requestTable.putIfAbsent(nextFilePath, nextReq) == null ; if (nextPutOK) { if (canSubmitRequests <= 0 ) { log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " + "RequestQueueSize : {}, StorePoolSize: {}" , this .requestQueue.size(), this .messageStore.getTransientStorePool().availableBufferNums()); this .requestTable.remove(nextFilePath); return null ; } boolean offerOK = this .requestQueue.offer(nextReq); if (!offerOK) { log.warn("never expected here, add a request to preallocate queue failed" ); } canSubmitRequests--; } AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize); boolean nextNextPutOK = this .requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null ; if (nextNextPutOK) { if (canSubmitRequests <= 0 ) { log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " + "RequestQueueSize : {}, StorePoolSize: {}" , this .requestQueue.size(), this .messageStore.getTransientStorePool().availableBufferNums()); this .requestTable.remove(nextNextFilePath); } else { boolean offerOK = this .requestQueue.offer(nextNextReq); if (!offerOK) { log.warn("never expected here, add a request to preallocate queue failed" ); } } } if (hasException) { log.warn(this .getServiceName() + " service has exception. so return null" ); return null ; } AllocateRequest result = this .requestTable.get(nextFilePath); try { if (result != null ) { boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS); if (!waitOK) { log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize()); return null ; } else { this .requestTable.remove(nextFilePath); return result.getMappedFile(); } } else { log.error("find preallocate mmap failed, this never happen" ); } } catch (InterruptedException e) { log.warn(this .getServiceName() + " service has exception. " , e); } return null ; }
上面逻辑为AllocateRequest请求创建逻辑,将创建请求放入到集合之后,先阻塞第一个MappedFile的请求,等待MappedFile创建完成后,通过countDown放心,然后将创建的MappedFile返回
mmapOperation 创建MappedFile的逻辑实际是在线程run方法中,通过循环调用的方式,从队列中获取创建的,相关逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 private boolean mmapOperation () { boolean isSuccess = false ; AllocateRequest req = null ; try { req = this .requestQueue.take(); AllocateRequest expectedRequest = this .requestTable.get(req.getFilePath()); if (null == expectedRequest) { log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize()); return true ; } if (expectedRequest != req) { log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " " + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest); return true ; } if (req.getMappedFile() == null ) { long beginTime = System.currentTimeMillis(); MappedFile mappedFile; if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation." ); mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } } else { mappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); } long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime); if (elapsedTime > 10 ) { int queueSize = this .requestQueue.size(); log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize + " " + req.getFilePath() + " " + req.getFileSize()); } if (mappedFile.getFileSize() >= this .messageStore.getMessageStoreConfig() .getMappedFileSizeCommitLog() && this .messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { mappedFile.warmMappedFile(this .messageStore.getMessageStoreConfig().getFlushDiskType(), this .messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile()); } req.setMappedFile(mappedFile); this .hasException = false ; isSuccess = true ; } } catch (InterruptedException e) { log.warn(this .getServiceName() + " interrupted, possibly by shutdown." ); this .hasException = true ; return false ; } catch (IOException e) { log.warn(this .getServiceName() + " service has exception. " , e); this .hasException = true ; if (null != req) { requestQueue.offer(req); try { Thread.sleep(1 ); } catch (InterruptedException ignored) { } } } finally { if (req != null && isSuccess) req.getCountDownLatch().countDown(); } return true ; }
创建MappedFile有两种方式 方式一:
首先通过new RandomAccessFile 的 getChannel 获得一个channel
然后通过map方法(mmap)创建mappedByteBuffer
通过该方式写入时,用户将内容写入到 mappedByteBuffer 后,是可以直接通过 flush 进行刷盘
方式二:
从堆外内存池中获取一个Buffer,即 writeBuffer = transientStorePool.borrowBuffer();
通过该方式写入的时候,用户先将内容写入writeBuffer
再通过commit方法,将writeBuffer中的内容提交到FileChannel,然后再通过FileChannel的flush再写入到磁盘
如下是两种方式的一个简单交互图:
相关源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { try { mappedFile = ServiceLoader.load(MappedFile.class).iterator().next(); mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } catch (RuntimeException e) { log.warn("Use default implementation." ); mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool()); } } else { mappedFile = new MappedFile(req.getFilePath(), req.getFileSize()); }
MappedFile文件预热 通过mmap映射后的Buffer其实是一个虚拟的内存地址,在真正的物理内存中没有对应的物理地址,直接向buffer写入的时候,先会通过虚拟地址先去查找物理内存,差的时候发现没有对应的内存页(缺页)。缺页的时候,就会去磁盘中读数据,放到物理内存中,将物理地址和虚拟地址做一个映射
为啥要写入一些假值?
防止正在在写入消息的时候发生缺页,做虚拟内存和物理内存的映射,所以通过写入假值的方式将这个事情(映射)先做了,这样真正写入消息的时候,写入性能就会提升
预热相关逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public void warmMappedFile (FlushDiskType type, int pages) { long beginTime = System.currentTimeMillis(); ByteBuffer byteBuffer = this .mappedByteBuffer.slice(); int flush = 0 ; long time = System.currentTimeMillis(); for (int i = 0 , j = 0 ; i < this .fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { byteBuffer.put(i, (byte ) 0 ); if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { flush = i; mappedByteBuffer.force(); } } if (j % 1000 == 0 ) { log.info("j={}, costTime={}" , j, System.currentTimeMillis() - time); time = System.currentTimeMillis(); try { Thread.sleep(0 ); } catch (InterruptedException e) { log.error("Interrupted" , e); } } } if (type == FlushDiskType.SYNC_FLUSH) { log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}" , this .getFileName(), System.currentTimeMillis() - beginTime); mappedByteBuffer.force(); } log.info("mapped file warm-up done. mappedFile={}, costTime={}" , this .getFileName(), System.currentTimeMillis() - beginTime); this .mlock(); }