RocketMQ——消息存储原理分析之刷盘机制

背景:

前面几篇文章分别针对RocketMQ的存储原理和内存映射相关逻辑做了简要的分析和说明,本篇内容主要针对RocketMQ数据刷盘机制做简要的分析和说明

RocketMQ刷盘机制主要有两部分:

  1. 同步刷盘
  2. 异步刷盘
1
2
3
4
public enum FlushDiskType {
SYNC_FLUSH, // 同步
ASYNC_FLUSH // 异步
}

刷盘源码入口:org.apache.rocketmq.store.CommitLog#handleDiskFlush

同步刷盘

同步刷盘相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
// Synchronization flush
// 同步输盘逻辑
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// service
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 客户端需要等待刷盘成功OK标识
if (messageExt.isWaitStoreMsgOK()) {
// 创建刷盘请求
// result.getWroteOffset() + result.getWroteBytes() 当前写入的位置+写入的字节数 = 下一个offset
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());

service.putRequest(request);
// 等待 GroupCommitService 刷盘
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
if (!flushOK) {
log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
+ " client address: " + messageExt.getBornHostString());
putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
}
} else {
// Broker配置了同步,客户端不关心是否已刷盘成功
// 同步刷盘策略退化成异步刷盘策略
service.wakeup();
}
}
// Asynchronous flush
else {
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// FlushRealTimeService
flushCommitLogService.wakeup();
} else {
// CommitRealTimeService
commitLogService.wakeup();
}
}
}

上面逻辑中,关于同步刷盘逻辑主要做了如下几件事:

  1. 获取对应的刷盘线程 GroupCommitService
  2. 创建请求 GroupCommitRequest
  3. 同步阻塞:request.waitForFlush(flushTimeout)

GroupCommitService

GroupCommitService 作为同步刷盘的线程服务,是在Broker启动的时候调用CommitLog相关的服务做的初始化操作(线程启动–.start()),而具体的刷盘操作就在当前线程的run方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
try {
// 等待通知,如果有新数据,可提前结束等待
// 在执行doCommit 方法之前,在 waitForRunning中会嗲用 swapRequests 逻辑
this.waitForRunning(10);
// 执行刷盘
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}

// Under normal circumstances shutdown, wait for the arrival of the
// request, and then flush
try {
Thread.sleep(10);
} catch (InterruptedException e) {
CommitLog.log.warn("GroupCommitService Exception, ", e);
}
// 执行 doCommit 操作之前,做swapRequest操作
synchronized (this) {
this.swapRequests();
}

this.doCommit();

CommitLog.log.info(this.getServiceName() + " service end");
}

swapRequests

1
2
3
4
5
6
private void swapRequests() {
// swap 操作
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

同步数盘逻辑doCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private void doCommit() {
synchronized (this.requestsRead) {
if (!this.requestsRead.isEmpty()) {
for (GroupCommitRequest req : this.requestsRead) {
// There may be a message in the next file, so a maximum of
// two times the flush
boolean flushOK = false;
/**
* why 2 ?
* 如果这个处理周期有新的MappedFile产生,新的MappedFile要重新触发一次
*/
for (int i = 0; i < 2 && !flushOK; i++) {
/**
* CommitLog.this.mappedFileQueue.getFlushedWhere():commitLog已经落地的位置
* req.getNextOffset():MappedFile的内存缓冲区的位置
*/
// 因为是异步线程处理,可能在其它线程已经处理了,本线程中不需要再处理
flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
if (!flushOK) {
// 刷盘
CommitLog.this.mappedFileQueue.flush(0);
}
}
// 唤醒等待刷盘的线程
req.wakeupCustomer(flushOK);
}

long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 更新checkpoint文件的CommitLog最后落地时间
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
// 置空,便于下一次swap
this.requestsRead.clear();
} else {
// Because of individual messages is set to not sync flush, it
// will come to this process
CommitLog.this.mappedFileQueue.flush(0);
}
}
}

flush

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean flush(final int flushLeastPages) {
boolean result = true;
// 根据最近的刷盘位置找到MappedFile文件
MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
// flush
int offset = mappedFile.flush(flushLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
// 刷新了数据 where > flushedWhere 返回false ,否则 返回true
result = where == this.flushedWhere;
this.flushedWhere = where;
if (0 == flushLeastPages) {
this.storeTimestamp = tmpTimeStamp;
}
}

return result;
}

总结:

RocketMQ在进行同步刷盘的时候,并不是每put一条Message,就执行一次flush刷盘操作,RocketMQ在进行刷盘的时候,不是以putMessage动作维度进行刷盘的,而是以MappedFile唯独做刷盘操作的。

RocketMQ在每次putMessage后,调用handleDiskFlush时,是先创建以恶GroupCommitRequest的请求,将这个请求添加到对应的List(List requestsWrite)中,最后通过同步刷盘线程以requestsWrite 对应的List唯独去做数盘动作,这样做的好处就是每次在执行flush操作时间,尽可能flush多个Message的落盘请求,这样执行完第一个request的flush后,这个list中对应请求的内容一并会做完刷盘操作,从而提升整体的刷盘性能。

同步刷盘简单流程图

01同步刷盘.png

异步刷盘

异步刷盘逻辑的入口和同步刷盘逻辑入口一致,只是if else 的区别,相关逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

if(condition){
// 同步刷盘逻辑
}else {
// Asynchronous flush
// 异步刷盘逻辑
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// FlushRealTimeService
flushCommitLogService.wakeup();
} else {
// CommitRealTimeService
commitLogService.wakeup();
}
}

上面两个Service初始化逻辑:

1
2
3
4
5
6
7
8
9
// commitLog在初始化的时候,构造方法里面会执行如下逻辑
f (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
// 异步
this.flushCommitLogService = new FlushRealTimeService();
}
// commitService
this.commitLogService = new CommitRealTimeService();

从上面代码中可以看到,异步刷盘逻辑中,针对是否开启了对外内存池这个配置对于刷盘的处理逻辑是不同的。FlushRealTimeService 和 CommitRealTimeService 这两个服务线程分别处理这两种情况的刷盘方式。

这两个服务的启动入口:在Broker 启动的时候,会调用messageStore.start()。在这个start方法中会执行如下逻辑:

1
2
3
4
5
6
7
public void start() {
this.flushCommitLogService.start();

if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}

CommitRealTimeService

CommitRealTimeService主要做了如下事情:

  1. 根据间隔时间、页数等计算是否需要惊醒commit
  2. 根据committedWhere找到对应的MappedFile,判断是否满足commit条件
  3. 将对外内存中的数据commit到MappedByteBuffer
  4. commit后,唤醒刷盘线程进行异步刷盘逻辑

相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
// 间隔时间
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
// 每次commit的最少页
int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
// 没有达到页数的要求,超出一定时间也要commit,防止消息丢失
int commitDataThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

long begin = System.currentTimeMillis();
if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
this.lastCommitTimestamp = begin;
commitDataLeastPages = 0;
}

try {
// 将堆外内存中的数据commit到内存
boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
long end = System.currentTimeMillis();
if (!result) {
// some data committed.
this.lastCommitTimestamp = end; // result = false means some data committed.
//now wake up flush thread.
// 唤醒FlushRealTimeService线程进行刷盘
flushCommitLogService.wakeup();
}

if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
this.waitForRunning(interval);
} catch (Throwable e) {
CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
}
}

boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.commit(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}
CommitLog.log.info(this.getServiceName() + " service end");
}

commit操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
// 是否满足commit要求,实现参考FlushRealTimeService中flush相关逻辑
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
// 将堆外内存中的数据commit到内存
commit0(commitLeastPages);
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}

// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}

return this.committedPosition.get();
}

// commit0
protected void commit0(final int commitLeastPages) {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();

if (writePos - this.committedPosition.get() > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
// 通过channel写入到内存
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}

FlushRealTimeService

执行刷盘逻辑的时候,和同步刷盘逻辑一样,都是通过flush的方式进行刷盘操作,可参考同步刷盘逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");

while (!this.isStopped()) {
// 定时刷盘开关
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 定时刷盘周期间隔
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 每次刷盘最少页
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 即使没有达到页数,超出一定的时间也要刷盘
int flushPhysicQueueThoroughInterval =
CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

boolean printFlushProgress = false;

// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushPhysicQueueLeastPages = 0;
printFlushProgress = (printTimes++ % 10) == 0;
}

try {
if (flushCommitLogTimed) {
Thread.sleep(interval);
} else {
// 有数据要落地的通知过来,提前结束等待
this.waitForRunning(interval);
}

if (printFlushProgress) {
this.printFlushProgress();
}

long begin = System.currentTimeMillis();
// flush
CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
if (storeTimestamp > 0) {
// 更新checkpoint的commitLog最后落地时间
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
} catch (Throwable e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
this.printFlushProgress();
}
}

// Normal shutdown, to ensure that all the flush before exit
boolean result = false;
for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
result = CommitLog.this.mappedFileQueue.flush(0);
CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
}

this.printFlushProgress();

CommitLog.log.info(this.getServiceName() + " service end");
}

总结

在触发异步刷盘的时候,会先判断是否开启了对外内存的开关:

1、未开启:则唤醒异步刷盘线程,执行刷盘逻辑,根据上次刷盘位置,获取到对应的mappedFIle,然后调用flush进行刷盘操作,在进行实际刷盘的时候,会根据是否开启对外内存来选择具体的刷盘方式,分别是 fileChannel.force 和 mappedByteBuffer.force

2、开启:则会唤醒Commit线程,将对外内存中的数据commit到MappedByteBuffer中,然后再唤醒异步刷盘线程进行刷盘操作,执行步骤一即可。

异步刷盘简单流程图:
02异步刷盘.png


RocketMQ——消息存储原理分析之刷盘机制
http://yoursite.com/post/aa89d898.html/
Author
Chase Wang
Posted on
December 8, 2022
Licensed under