// Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } // 执行 doCommit 操作之前,做swapRequest操作 synchronized (this) { this.swapRequests(); }
this.doCommit();
CommitLog.log.info(this.getServiceName() + " service end"); }
privatevoiddoCommit(){ synchronized (this.requestsRead) { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; /** * why 2 ? * 如果这个处理周期有新的MappedFile产生,新的MappedFile要重新触发一次 */ for (int i = 0; i < 2 && !flushOK; i++) { /** * CommitLog.this.mappedFileQueue.getFlushedWhere():commitLog已经落地的位置 * req.getNextOffset():MappedFile的内存缓冲区的位置 */ // 因为是异步线程处理,可能在其它线程已经处理了,本线程中不需要再处理 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); if (!flushOK) { // 刷盘 CommitLog.this.mappedFileQueue.flush(0); } } // 唤醒等待刷盘的线程 req.wakeupCustomer(flushOK); }
long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { // 更新checkpoint文件的CommitLog最后落地时间 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } // 置空,便于下一次swap this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mappedFileQueue.flush(0); } } }
flush
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
publicbooleanflush(finalint flushLeastPages){ boolean result = true; // 根据最近的刷盘位置找到MappedFile文件 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0); if (mappedFile != null) { long tmpTimeStamp = mappedFile.getStoreTimestamp(); // flush int offset = mappedFile.flush(flushLeastPages); long where = mappedFile.getFileFromOffset() + offset; // 刷新了数据 where > flushedWhere 返回false ,否则 返回true result = where == this.flushedWhere; this.flushedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } }
publicvoidrun(){ CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 间隔时间 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 每次commit的最少页 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); // 没有达到页数的要求,超出一定时间也要commit,防止消息丢失 int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; }
try { // 将堆外内存中的数据commit到内存 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { // some data committed. this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. // 唤醒FlushRealTimeService线程进行刷盘 flushCommitLogService.wakeup(); }
if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } }
boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }
publicintcommit(finalint commitLeastPages){ if (writeBuffer == null) { //no need to commit data to file channel, so just regard wrotePosition as committedPosition. returnthis.wrotePosition.get(); } // 是否满足commit要求,实现参考FlushRealTimeService中flush相关逻辑 if (this.isAbleToCommit(commitLeastPages)) { if (this.hold()) { // 将堆外内存中的数据commit到内存 commit0(commitLeastPages); this.release(); } else { log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get()); } }
// All dirty data has been committed to FileChannel. if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) { this.transientStorePool.returnBuffer(writeBuffer); this.writeBuffer = null; }
returnthis.committedPosition.get(); }
// commit0 protectedvoidcommit0(finalint commitLeastPages){ int writePos = this.wrotePosition.get(); int lastCommittedPosition = this.committedPosition.get();
if (writePos - this.committedPosition.get() > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); // 通过channel写入到内存 this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
if (printFlushProgress) { this.printFlushProgress(); }
long begin = System.currentTimeMillis(); // flush CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { // 更新checkpoint的commitLog最后落地时间 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } }
// Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); }
this.printFlushProgress();
CommitLog.log.info(this.getServiceName() + " service end"); }