天天看點

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

同步刷盤

在RocketMQ中有同步刷盤和異步刷盤兩種方式

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

2種刷盤方式适用的場景如下

刷盤方式 适用場景
同步刷盤 資料可靠性高,适用于金融等對資料可靠性要求高的場景,性能比異步刷盤要低
異步刷盤 性能和吞吐量高 , Broker端異常關閉時,有少量消息丢失

根據前面的章節我們知道RocketMQ會通過SendMessageProcessor來處理刷盤的消息,當消息存儲到記憶體中後,就開始刷盤

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

異步刷盤的方式有兩種,第一種Mmap+PageCache(預設的異步刷盤方式),上面說到的同步刷盤也是這種機制,代碼實作如下

@Test
public void writeCaseOne() throws Exception {
    File file = new File("/Users/peng/software/rocketmq/test/case1.txt");
    FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
    MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 2048);
    byteBuffer.put("hello mmap\n".getBytes());
    // 将 pagecache 中的内容強制刷到磁盤
    byteBuffer.force();
}      

第二種是DirectByteBuffer+PageCache,也就是直接寫堆外記憶體

@Test
public void writeCaseTwo() throws Exception {
    File file = new File("/Users/peng/software/rocketmq/test/case2.txt");
    FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
    ByteBuffer byteBuffer = ByteBuffer.allocate(20);
    byteBuffer.put("hello mmap\n".getBytes());
    byteBuffer.flip();
    while (byteBuffer.hasRemaining()) {
        fileChannel.write(byteBuffer);
    }
    // 将 pagecache 中的内容強制刷到磁盤
    fileChannel.force(false);
}      
RocketMQ源碼解析:同步刷盤和異步刷盤的實作

從CommitLog#submitFlushRequest方法可以看到刷盤的邏輯

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

當broker端配置的是同步刷盤,但是發送過來的消息不需要等待消息刷盤完成,就會退化成異步刷盤,我們先看同步刷盤,在RocketMQ中,并不是往記憶體中放一條消息,就刷盤一次,這樣效率太低。RocketMQ會每隔10ms統一執行刷盤請求來提高效率

  1. 首先把刷盤的請求封裝成GroupCommitRequest,然後放到GroupCommitService的阻塞隊列中
  2. GroupCommitService每隔10ms将目前阻塞隊列中的刷盤請求統一執行,然後喚醒阻塞等待的線程
  3. RocketMQ源碼解析:同步刷盤和異步刷盤的實作
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 有資料過來會結束等待的
            this.waitForRunning(10);
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // 省略部分邏輯
}      
RocketMQ源碼解析:同步刷盤和異步刷盤的實作

不斷執行doCommit方法進行刷盤,當刷盤完成時,會喚醒等待刷盤的線程

這裡有個需要注意的細節點,我我們放請求的時候是放到requestsWrite中,但是讀的時候卻是在requestsRead中,那麼requestsRead中能讀取到值嗎?

// GroupCommitService
// 讀請求清單
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 讀請求清單
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();      

我們來看ServiceThread類的waitForRunning方法

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

其實當每次等待結束後都會調用onWaitEnd方法,而GroupCommitService重寫了這個方法,在這個方法内部調用swapRequests方法

private void swapRequests() {
    lock.lock();
    try {
        LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    } finally {
        lock.unlock();
    }
}      

swapRequests方法會将requestsWrite和requestsRead中的内容進行交換。

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

首先通過上次刷盤位置定位到MappedFile,然後開始刷盤

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

可以看到有兩種刷盤的方式,調用FileChannel#force(異步刷盤并且開啟transientStorePool)或者MappedByteBuffer#force(同步刷盤或者異步刷盤但是不開啟transientStorePool)

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

當刷盤的時候,需要累積到一定頁數才開始刷,同步刷盤是0頁,異步輸盤是4頁。至此同步輸盤的邏輯就梳理完了。

其實異步輸盤不開啟transientStorePool時,執行的邏輯和這個差不多,隻是累計的頁數不相同而已

異步刷盤

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

不開啟TransientStorePool

當不開啟TransientStorePoo時,會先喚醒FlushRealTimeService線程,然後開始開始刷盤

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

先算出輸盤的頁數,預設4頁,如果10s沒有刷盤了,則将頁數設為0,然後執行MappedFileQueue#flush方法,這個方法在同步刷盤已經分析過了,不再分析。

開啟TransientStorePool

當開啟TransientStorePool是會先喚醒CommitRealTimeService,将ByteBuffer中的内容刷入FileChannel,接着喚醒FlushRealTimeService線程,将FileChannel中的資料刷入磁盤

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

先算出commit的頁數,預設4頁,如果200ms沒有commit了,則将頁數設為0(在後續執行流程可以看到commit也對頁數有要求),然後執行MappedFileQueue#commit方法,将将ByteBuffer中的内容刷入FileChannelMappedFile#commit0

RocketMQ源碼解析:同步刷盤和異步刷盤的實作

至于這兩種刷盤方式的好處,我個人了解也不是很深刻,是以轉一下社群胡宗棠老師對這個問題的解讀

  1. 第一種,Mmap+PageCache的方式,讀寫消息都走的是pageCache,這樣子讀寫都在pagecache裡面不可避免會有鎖的問題,在并發的讀寫操作情況下,會出現缺頁中斷降低,記憶體加鎖,污染頁的回寫。
  2. 第二種,DirectByteBuffer(堆外記憶體)+PageCache的兩層架構方式,這樣子可以實作讀寫消息分離,寫入消息時候寫到的是DirectByteBuffer——堆外記憶體中,讀消息走的是PageCache(對于,DirectByteBuffer是兩步刷盤,一步是刷到PageCache,還有一步是刷到磁盤檔案中),帶來的好處就是,避免了記憶體操作的很多容易堵的地方,降低了時延,比如說缺頁中斷降低,記憶體加鎖,污染頁的回寫。

參考部落格