RocketMQ——消息存储原理分析之内存映射

内存映射文件概念

——摘自百度百科

内存映射文件,是由一个文件到一块内存的映射。内存映射文件与虚拟内存有些类似,通过内存映射文件可以保留一个地址空间的区域,同时将物理存储器提交给此区域,内存文件映射的物理存储器来自一个已经存在于磁盘上的文件,而且在对该文件进行操作之前必须首先对文件进行映射。使用内存映射文件处理存储于磁盘上的文件时,将不必再对文件执行I/O操作,使得内存映射文件在处理大数据量的文件时能起到相当重要的作用。

mmap (memory map) 是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read、write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。

MappedFile

RocketMq中,CommitLog 对应的映射文件为MappedFile,MappedFile继承了ReferenceResource,ReferenceResource的作用是为了更好的管理MappedFile对象的回收。对于MappedFile对象,主要有如下属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class MappedFile extends ReferenceResource {
public static final int OS_PAGE_SIZE = 1024 * 4; // 内存页大小 4k
// 当前进程下 所有的 mappedFile 占用的总虚拟内存大小
private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
// 当前进程下 所有 mappedFile对象的个数
private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
// 数据写入位点
protected final AtomicInteger wrotePosition = new AtomicInteger(0);
// 数据提交位置点
protected final AtomicInteger committedPosition = new AtomicInteger(0);
// 数据刷盘位点
private final AtomicInteger flushedPosition = new AtomicInteger(0);
protected int fileSize; // 文件大小
protected FileChannel fileChannel; // 文件通道
/**
* Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
// 堆外内存池
protected TransientStorePool transientStorePool = null;
// 文件名
private String fileName;
// 这个文件文件对应的起始偏移量,即第一条消息的偏移量
private long fileFromOffset;
private File file;
// 内存缓存区
private MappedByteBuffer mappedByteBuffer;
// 最后一条消息保持的时间戳
private volatile long storeTimestamp = 0;
// 是否时MappedFileQueue的首个文件
private boolean firstCreateInQueue = false;
}

偏移量:

再RocketMQ中每个CommitLog的默认文件大小为1G,默认第一个文件的起始偏移量为0,即文件名为00000000000000000000,第二个文件的起始偏移量则时1G的字节数大小(1073742827),所以第二个文件的文件名为00000000001073742827。

在RocketMQ中为了方便计算和寻址,有物理偏移量和相对偏移量之分,物理偏移量也称为全局偏移量,参考点是当前文件和前面的文件。而相对偏移量的参考点则指的时当前文件,即相对偏移量 = 当前文件的物理偏移量 - 当前文件的起始偏移量。

MappedFile文件获取(CommitLog)

上篇文章中主要分析了文件文件刷盘前的一些准备工作,遗留了MappedFile相关细节的分析,该篇中主要针对MappedFile相关逻辑进行分析,首先了解一下MappedFile 文件获取的细节时怎样的。

入口:org.apache.rocketmq.store.CommitLog#putMessage

在进行写Message到CommitLog时,会先获取一个对应的映射文件MappedFile

1
2
// 获取MappedFileQueue中最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

MappedFile文件获取

  1. getLastMappedFile逻辑如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // 从已经初始化的mappedFiles列表中获取最后一个MappedFile文件
    public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;
    // private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
    // mappedFiles 是在broker启动的时候,通过 load 方法初始化的
    while (!this.mappedFiles.isEmpty()) {
    try {
    mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
    break;
    } catch (IndexOutOfBoundsException e) {
    //continue;
    } catch (Exception e) {
    log.error("getLastMappedFile has exception.", e);
    break;
    }
    }
    return mappedFileLast;
    }
  2. 如果获取到的MappedFile为空或者此MappedFile已经写满,则创建一个新的写入

    调用逻辑:mappedFile = this.mappedFileQueue.getLastMappedFile(0);

    相关源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    /**
    * startOffset :起始offset 为0
    * needCreate :是否需要创建标识 为true
    */
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
    // 创建的MappedFile的起始偏移地址
    long createOffset = -1;
    // 获取最后一个映射文件
    MappedFile mappedFileLast = getLastMappedFile();
    // mappedFileLast 最后一个 为空,说明 mappedFiles 为空,则创建第一个映射文件
    if (mappedFileLast == null) {
    // 计算创建的映射文件对应的物理偏移量
    // 如果指定的 startOffset 不足 mappedFileSize, 则从offset 0 开始,否则从为mappedFileSize 整数倍的offset开始
    createOffset = startOffset - (startOffset % this.mappedFileSize);
    }
    // 最后一个已经写满,计算新的offset
    if (mappedFileLast != null && mappedFileLast.isFull()) {
    // createOffset 将要创建的映射文件的物理偏移量
    // 上一个commitLog文件的起始偏移量 + 文件大小mappedFileSize(1G)
    createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
    }
    // 创建
    if (createOffset != -1 && needCreate) {
    // 需要创建的CommitLog文件的全路径
    String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
    // 预创建的CommitLog文件的全路径(当前创建文件的下一个文件)
    String nextNextFilePath = this.storePath + File.separator
    + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
    MappedFile mappedFile = null;
    // 优先通过 allocateMappedFileService 创建映射文件,因为是预分配方式,性能高
    if (this.allocateMappedFileService != null) {
    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
    nextNextFilePath, this.mappedFileSize);
    } else {
    // 如果 allocateMappedFileService = null , 则通过new 的方式创建
    try {
    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
    } catch (IOException e) {
    log.error("create mappedFile exception", e);
    }
    }
    if (mappedFile != null) {
    if (this.mappedFiles.isEmpty()) {
    mappedFile.setFirstCreateInQueue(true);
    }
    this.mappedFiles.add(mappedFile);
    }

    return mappedFile;
    }

    return mappedFileLast;
    }

    在该逻辑中主要做了如下几件事:

    1. 调用getLastMappedFile逻辑,获取最后一个MappedFile

      a). 获取结果为空:说明mappedFiles是空的,一般为初始状态,需要创建第一个映射文件

      b). 如果不为空 :判断是否写满,如果写满,需要计算下一个MappedFile的起始偏移量,即上一个起始偏移量+1G的字节大小

    2. 创建MappedFile——创建MappedFile逻辑下文单独分析

  3. 后续在调用mappedFile.appendMessage(msg, this.appendMessageCallback);向mappedFile中写数据的时候,如果返回END_OF_FILE标识,说明已经写满(因为是多线程工作),则再次执行步骤二重新换一个新的MappedFile文件重新写即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // result 为调用appendMessage后的返回结果
    switch (result.getStatus()) {
    case PUT_OK:
    break;
    case END_OF_FILE:
    unlockMappedFile = mappedFile;
    // Create a new file, re-write the message
    // 返回END_OF_FILE标识后,换一个新的MappedFile,重新写
    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
    if (null == mappedFile) {
    // XXX: warn and notify me
    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
    beginTimeInLock = 0;
    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
    }
    // 重新写入
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    break;
    // 省略多余的内容
    }

MappedFile文件创建

RocketMQ在进行创建MappedFile的时候,除了会创建当前需要的MappedFile文件外,会预先创建下一个MappedFile,同时为了不影响性能,在创建完第一个MappedFile后,会直接返回,不会等待第二个文件是否创建成功,通过采用这种预分配的方式来保证后续需要创建的时候,可以直接拿到这个预创建的文件,不再走后续的创建操作。

创建逻辑入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 需要创建的CommitLog文件的全路径
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
// 预创建的CommitLog文件的全路径(当前创建文件的下一个文件)
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
// 优先通过 allocateMappedFileService 创建映射文件,因为是预分配方式,性能高
if (this.allocateMappedFileService != null) {
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
// 如果 allocateMappedFileService = null , 则通过new 的方式创建
try {
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}

allocateMappedFileService初始化

allocateMappedFileService是构造MappedFileQueue的时候以参数的形式传入的,在CommitLog初始化DefaultCommitLog的时候,是通过new 的方式初始化该Service的。

该Service为分配MappedFile的服务,在创建MappedFile的时候,优先会通过allocateMappedFileService来创建,如果为空则通过new MappedFile的方式创建

1
2
3
4
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager, final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.allocateMappedFileService = new AllocateMappedFileService(this);
this.allocateMappedFileService.start();
}

putRequestAndReturnMappedFile(提交创建映射文件请求)

如下为整个AllocateRequestService创建MappedFile的请求流程

相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* 提交两个创建映射文件的请求,并等待第一个创建完成并返回
* @param nextFilePath
* @param nextNextFilePath
* @param fileSize
* @return
*/
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
// 默认提交请求书 2
int canSubmitRequests = 2;
// 判断是否启用对外内存池
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
// 当前堆外内存池中可用的buffer 数 - 待处理的分配映射文件请求数(如果处理后,会从queue中take)
// TransientStorePool 默认是5个 requestQueue 中存放的是创建映射文件的请求
// 可用的数量 - 未处理的请求数
canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
}
}
// 第一个映射文件
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

if (nextPutOK) {
// 处理请求数小于等于0,说明没有可用的buffer,直接返回null
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
// 向 requestQueue 中提交数据
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
// 第二个映射文件
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
// mmapOperotion 异样,直接返回null
if (hasException) {
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}

AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
// 阻塞 ---- 阻塞的是 run方法中的while 中的 this.mmapOperation() 中的 countDown()
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {
this.requestTable.remove(nextFilePath);
// mmapOperation 创建完成后,返回创建的MaapedFile
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}

return null;
}

上面逻辑为AllocateRequest请求创建逻辑,将创建请求放入到集合之后,先阻塞第一个MappedFile的请求,等待MappedFile创建完成后,通过countDown放心,然后将创建的MappedFile返回

mmapOperation

创建MappedFile的逻辑实际是在线程run方法中,通过循环调用的方式,从队列中获取创建的,相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
// 从requestQueue拿出(剔除)request,从requestTable 获取到
// 判断两个是否是同一个,同一个进行创建
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}

if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();

MappedFile mappedFile;
// TransientStorePool
// 刷盘类型== ASYNC_FLUSH && brokerRole == master 时间才启用 TransientStorePool
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
// 有堆外内存池的创建方式
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 无堆外内存池的创建方式
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
// 耗时计算
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
if (elapsedTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}

// pre write mappedFile
// 创建完成后做预热操作
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}

req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
// 唤醒等待的线程,从requestQueue中再取下一个MappedFile做重复的工作
req.getCountDownLatch().countDown();
}
return true;
}

创建MappedFile有两种方式

方式一:

  1. 首先通过new RandomAccessFile 的 getChannel 获得一个channel
  2. 然后通过map方法(mmap)创建mappedByteBuffer
  3. 通过该方式写入时,用户将内容写入到 mappedByteBuffer 后,是可以直接通过 flush 进行刷盘

方式二:

  1. 从堆外内存池中获取一个Buffer,即 writeBuffer = transientStorePool.borrowBuffer();
  2. 通过该方式写入的时候,用户先将内容写入writeBuffer
  3. 再通过commit方法,将writeBuffer中的内容提交到FileChannel,然后再通过FileChannel的flush再写入到磁盘

如下是两种方式的一个简单交互图:

相关源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
// 有堆外内存池的创建方式
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 无堆外内存池的创建方式
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}

MappedFile文件预热

通过mmap映射后的Buffer其实是一个虚拟的内存地址,在真正的物理内存中没有对应的物理地址,直接向buffer写入的时候,先会通过虚拟地址先去查找物理内存,差的时候发现没有对应的内存页(缺页)。缺页的时候,就会去磁盘中读数据,放到物理内存中,将物理地址和虚拟地址做一个映射

为啥要写入一些假值?

防止正在在写入消息的时候发生缺页,做虚拟内存和物理内存的映射,所以通过写入假值的方式将这个事情(映射)先做了,这样真正写入消息的时候,写入性能就会提升

预热相关逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public void warmMappedFile(FlushDiskType type, int pages) {
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0; //记录上一次刷盘的字节数
long time = System.currentTimeMillis();
for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
// 写入假值
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
// 当刷盘策略为同步刷盘时,执行强制刷盘
// 每修改pages个分页刷一次盘
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force(); // 强制刷盘
}
}

// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
/**
* Linux 系统中CPU的调度策略是基于时间片的,每个任务CPU会分配一个时间片,时间到了之后,会剥夺CPU执行,不让再执行,换到下一个任务执行
* Windows:基于抢占式的方式执行的,如果当前线程不主动放弃执行权,他会一直去执行,其他线程就没法去执行
*/
// 防止当前线程一直抢占CPU执行权,
// 在本次循环中,因为文件大小是1G,而单次循环是4k,所以避免一直被当前线程占用,到这里先通过该行代码让出CPU执行权,
// 当前线程就会从等待状态进入就绪状态,等待下一次的执行
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}

// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);

/**
* 操作系统中其他进程再做一些其它操作的时候,可能有一些情况导致内存不够,会导致以上操作过的一些内存会被回收
* 为了防止这种情况,通过mlock的方式,将以上操作过的这些地址空间锁死在磁盘中,防止其它进程将内存置换到其它进程
*/
this.mlock();
}

RocketMQ——消息存储原理分析之内存映射
http://yoursite.com/post/d3f7c9c3.html/
Author
Chase Wang
Posted on
December 4, 2022
Licensed under