![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cGcq5yM2cDN5IGN2M2MhJGMxEWZyYzX0UzNwQTMzAzLchDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.jpg)
同步刷盤
在RocketMQ中有同步刷盤和異步刷盤兩種方式
2種刷盤方式适用的場景如下
刷盤方式 | 适用場景 |
同步刷盤 | 資料可靠性高,适用于金融等對資料可靠性要求高的場景,性能比異步刷盤要低 |
異步刷盤 | 性能和吞吐量高 , Broker端異常關閉時,有少量消息丢失 |
根據前面的章節我們知道RocketMQ會通過SendMessageProcessor來處理刷盤的消息,當消息存儲到記憶體中後,就開始刷盤
異步刷盤的方式有兩種,第一種Mmap+PageCache(預設的異步刷盤方式),上面說到的同步刷盤也是這種機制,代碼實作如下
@Test
public void writeCaseOne() throws Exception {
File file = new File("/Users/peng/software/rocketmq/test/case1.txt");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 2048);
byteBuffer.put("hello mmap\n".getBytes());
// 将 pagecache 中的内容強制刷到磁盤
byteBuffer.force();
}
第二種是DirectByteBuffer+PageCache,也就是直接寫堆外記憶體
@Test
public void writeCaseTwo() throws Exception {
File file = new File("/Users/peng/software/rocketmq/test/case2.txt");
FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.put("hello mmap\n".getBytes());
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
fileChannel.write(byteBuffer);
}
// 将 pagecache 中的内容強制刷到磁盤
fileChannel.force(false);
}
從CommitLog#submitFlushRequest方法可以看到刷盤的邏輯
當broker端配置的是同步刷盤,但是發送過來的消息不需要等待消息刷盤完成,就會退化成異步刷盤,我們先看同步刷盤,在RocketMQ中,并不是往記憶體中放一條消息,就刷盤一次,這樣效率太低。RocketMQ會每隔10ms統一執行刷盤請求來提高效率
- 首先把刷盤的請求封裝成GroupCommitRequest,然後放到GroupCommitService的阻塞隊列中
- GroupCommitService每隔10ms将目前阻塞隊列中的刷盤請求統一執行,然後喚醒阻塞等待的線程
public void run() {
CommitLog.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
// 有資料過來會結束等待的
this.waitForRunning(10);
this.doCommit();
} catch (Exception e) {
CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
// 省略部分邏輯
}
不斷執行doCommit方法進行刷盤,當刷盤完成時,會喚醒等待刷盤的線程
這裡有個需要注意的細節點,我我們放請求的時候是放到requestsWrite中,但是讀的時候卻是在requestsRead中,那麼requestsRead中能讀取到值嗎?
// GroupCommitService
// 讀請求清單
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 讀請求清單
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
我們來看ServiceThread類的waitForRunning方法
其實當每次等待結束後都會調用onWaitEnd方法,而GroupCommitService重寫了這個方法,在這個方法内部調用swapRequests方法
private void swapRequests() {
lock.lock();
try {
LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
} finally {
lock.unlock();
}
}
swapRequests方法會将requestsWrite和requestsRead中的内容進行交換。
首先通過上次刷盤位置定位到MappedFile,然後開始刷盤
可以看到有兩種刷盤的方式,調用FileChannel#force(異步刷盤并且開啟transientStorePool)或者MappedByteBuffer#force(同步刷盤或者異步刷盤但是不開啟transientStorePool)
當刷盤的時候,需要累積到一定頁數才開始刷,同步刷盤是0頁,異步輸盤是4頁。至此同步輸盤的邏輯就梳理完了。
其實異步輸盤不開啟transientStorePool時,執行的邏輯和這個差不多,隻是累計的頁數不相同而已
異步刷盤
不開啟TransientStorePool
當不開啟TransientStorePoo時,會先喚醒FlushRealTimeService線程,然後開始開始刷盤
先算出輸盤的頁數,預設4頁,如果10s沒有刷盤了,則将頁數設為0,然後執行MappedFileQueue#flush方法,這個方法在同步刷盤已經分析過了,不再分析。
開啟TransientStorePool
當開啟TransientStorePool是會先喚醒CommitRealTimeService,将ByteBuffer中的内容刷入FileChannel,接着喚醒FlushRealTimeService線程,将FileChannel中的資料刷入磁盤
先算出commit的頁數,預設4頁,如果200ms沒有commit了,則将頁數設為0(在後續執行流程可以看到commit也對頁數有要求),然後執行MappedFileQueue#commit方法,将将ByteBuffer中的内容刷入FileChannelMappedFile#commit0
至于這兩種刷盤方式的好處,我個人了解也不是很深刻,是以轉一下社群胡宗棠老師對這個問題的解讀
- 第一種,Mmap+PageCache的方式,讀寫消息都走的是pageCache,這樣子讀寫都在pagecache裡面不可避免會有鎖的問題,在并發的讀寫操作情況下,會出現缺頁中斷降低,記憶體加鎖,污染頁的回寫。
- 第二種,DirectByteBuffer(堆外記憶體)+PageCache的兩層架構方式,這樣子可以實作讀寫消息分離,寫入消息時候寫到的是DirectByteBuffer——堆外記憶體中,讀消息走的是PageCache(對于,DirectByteBuffer是兩步刷盤,一步是刷到PageCache,還有一步是刷到磁盤檔案中),帶來的好處就是,避免了記憶體操作的很多容易堵的地方,降低了時延,比如說缺頁中斷降低,記憶體加鎖,污染頁的回寫。