天天看點

RocketMQ 消息存儲檔案

引言

前面我們已經簡單地介紹了 RocketMQ 的整體設計思路,本文着重其中消息存儲檔案部分,更多關于 RocketMQ 的文章均收錄于

<RocketMQ系列文章>

;

檔案詳解

了解完了檔案存儲部分使用的核心技術,在讓我們回到 RocketMQ 檔案組織的讨論中來,接下來我将挨個分析各個核心檔案存儲的内容和使用方法。

Commit檔案

commitlog 目錄的組織方式在前面已經詳細介紹過了,該目錄下的檔案主要存儲消息,其特點是每一條消息長度不相同,CommitLog 檔案存儲的邏輯視圖如下圖所示,每條消息的前面4個位元組存儲該條消息的總長度。整個 CommitLog 檔案預設大小為 1G。

RocketMQ 消息存儲檔案

在查找消息時,需要先根據要查找的消息偏移找到消息所在的檔案,然後根據消息偏移與檔案大小取餘,得到消息在檔案中的位置,最後根據消息大小,取出指定長度的消息内容。

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
    MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return mappedFile.selectMappedBuffer(pos, size);
    }
    return null;
}           

ConsumeQueue檔案

RocketMQ 基于主題訂閱模式實作消息消費,消費者關心的是一個主題下的所有消息,但由于同一主題的消息不連續地存儲在 CommitLog 檔案中,試想一下如果消息消費者直接從消息存儲檔案(CommitLog)中去周遊查找訂閱主題下的消息,效率将極其低下,RocketMQ 為了适應消息消費的檢索需求,設計了消息消費隊列檔案(ConsumeQueue),該檔案可以看成是 CommitLog 關于消息消費的“索引”檔案,消息主題,第二級目錄為主題的消息隊列。

RocketMQ 消息存儲檔案

為了加快檢索速度,并且減少空間使用,ConsumeQueue 不會存儲所有消息正文,隻會存儲如下内容:

RocketMQ 消息存儲檔案

單個 ConsumeQueue 檔案中預設包含 30 萬個條目,單個檔案的長度為 30w × 20 ≈ 6M 位元組, 單個 ConsumeQueue 檔案可以看出是一個 ConsumeQueue 條目的數組,其下标為 ConsumeQueue 的邏輯偏移量,消息消費進度存儲的偏移量即邏輯偏移量。 ConsumeQueue 即為 CommitLog 檔案的索引檔案, 其建構機制是當消息到達 CommitLog 檔案後, 由專門的線程産生消息轉發任務,進而建構消息消費隊列檔案與下文提到的索引檔案。

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
    /*startIndex 消息索引*/
    int mappedFileSize = this.mappedFileSize;
    // 根據消息索引 * 20 得到在 ConsumeQueue 中的實體偏移
    long offset = startIndex * CQ_STORE_UNIT_SIZE;
    if (offset >= this.getMinLogicOffset()) {
        // 找到實體索引所在的檔案
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
        if (mappedFile != null) {
            // 實體索引與檔案大小取餘,得到資料存儲的位置,然後通過MappedByteBuffer的到記憶體映射Buffer
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
            return result;
        }
    }
    return null;
}           

根據 startIndex 擷取準備消費的條目。首先 startIndex * 20 得到在 ConsumeQueue 中的實體偏移量,如果該 offset 小于 minLogicOffset,則傳回 null,說明該消息已被删除;如果大于 minLogicOffset,則根據偏移量定位到具體的實體檔案,然後通過 offset 與實體文大小取模擷取在該檔案的偏移量,最終的到從 startIndex 開始,到該 ConsumeQueue 有效結尾的所有資料對應的 MappedByteBuffer。

除了根據消息偏移量查找消息的功能外,RocketMQ 還提供了根據時間戳查找消息的功能,具體實作邏輯如下:

  1. 首先根據時間戳定位到 ConsumeQueue 實體檔案,就是從第一個檔案開始找到第一個檔案更新時間大于該時間戳的檔案。
  2. 然後對 ConsumeQueue 中的所有項,使用二分查找,查詢每條記錄對應的 CommitLog 的最後更新時間和要查詢的時間戳
  3. 最終找到與時間戳對應的 ConsumeQueue 偏移,或者離時間戳最近的消息的 ConsumeQueue 偏移

Index索引檔案

消息消費隊列是 RocketMQ 專門為消息訂閱建構的索引檔案,提高根據主題與消息隊列檢索消息的速度,另外 RocketMQ 引入了 Hash 索引機制為消息建立索引,HashMap 的設計包含兩個基本點: Hash槽 與 Hash 沖突的連結清單結構。

RocketMQ 消息存儲檔案

從圖中可以看出,indexFile 總共包含 IndexHeader、 Hash 槽、 Hash 條目(資料)。

IndexHeader

IndexHeader頭部,包含 40 個位元組,記錄該 IndexFile 的統計資訊,其結構如下。

  • beginTimestamp: 該索引檔案中包含消息的最小存儲時間。
  • endTimestamp: 該索引檔案中包含消息的最大存儲時間。
  • beginPhyOffset: 該索引檔案中包含消息的最小實體偏移量(CommitLog 檔案偏移量)。
  • endPhyOffset:該索引檔案中包含消息的最大實體偏移量(CommitLog 檔案偏移量)。
  • hashSlotCount: hashSlot個數,并不是 hash 槽使用的個數,在這裡意義不大。
  • indexCount: Index條目清單目前已使用的個數,Index條目在Index條目清單中按順序存儲。

Hash槽

Hash槽,一個 IndexFile 預設包含500萬個 Hash 槽,每個 Hash 槽存儲的是落在該 Hash 槽的 hashcode 最新的 Index 的索引。

Hash 條目

Index條目清單,預設一個索引檔案包含 2000 萬個條目,每一個 Index 條目結構如下。

  • hashcode: key 的 hashcode。
  • phyOffset: 消息對應的實體偏移量。
  • timeDif:該消息存儲時間與第一條消息的時間戳的內插補點,小于 0 該消息無效。
  • preIndexNo:該條目的前一條記錄的 Index 索引,當出現 hash 沖突時,建構的連結清單結構。

Index檔案的寫入步驟如下:

  1. 如果目前已使用條目大于等于允許最大條目數時,則傳回 false,表示目前索引檔案已寫滿。如果目前索引檔案未寫滿則根據 key 算出 key 的 hashcode,然後 keyHash 對 hash 槽數量取餘定位到 hashcode 對應的 hash 槽下标, hashcode對應的hash槽的實體位址 = IndexHeader 頭部(40位元組) + 下标 * 每個 hash 槽的大小(4位元組)。
  2. 讀取 hash 槽中存儲的資料,如果 hash 槽存儲的資料小于 0 或大于目前索引檔案中存儲的最大條目,則将該槽的值設定為 0。
  3. 将條目資訊存儲在 IndexFile 中。
    1. 計算新添加條目的起始實體偏移量,等于頭部位元組長度 + hash 槽數量單個 hash 槽大小(4個位元組) + 目前 Index 條目個數單個 Index 條目大小(20個位元組)。
    2. 依次将 hashcode、消息實體偏移量、時間差timeDif、原來 Hash 槽的值存入該索引條目中。
    3. 将新添加的索引條目索引存入 hash 槽中,覆寫原來的值。
  4. 更新檔案索引頭資訊。
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
    // 判斷是否寫滿了
    if (this.indexHeader.getIndexCount() < this.indexNum) {
        // 計算 hash 槽的位置
        int keyHash = indexKeyHashMethod(key);
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
        try {
            // 擷取原來槽内的值
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // <= 0 或者大于目前存儲數,則認為其無效
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                slotValue = invalidIndex;
            }
            // 計算時間內插補點
            long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
            timeDiff = timeDiff / 1000;
            if (this.indexHeader.getBeginTimestamp() <= 0) {
                timeDiff = 0;
            } else if (timeDiff > Integer.MAX_VALUE) {
                timeDiff = Integer.MAX_VALUE;
            } else if (timeDiff < 0) {
                timeDiff = 0;
            }
            // 計算索引條目的位置
            int absIndexPos =
                IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                    + this.indexHeader.getIndexCount() * indexSize;
            // 寫入索引條目
            this.mappedByteBuffer.putInt(absIndexPos, keyHash);
            this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
            this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
            // 更新槽的值
            this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
            // 更新頭部資料
            if (this.indexHeader.getIndexCount() <= 1) {
                this.indexHeader.setBeginPhyOffset(phyOffset);
                this.indexHeader.setBeginTimestamp(storeTimestamp);
            }
            this.indexHeader.incHashSlotCount();
            this.indexHeader.incIndexCount();
            this.indexHeader.setEndPhyOffset(phyOffset);
            this.indexHeader.setEndTimestamp(storeTimestamp);
            return true;
        } catch (Exception e) {
            log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
        }
    } else {
        log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
            + "; index max num = " + this.indexNum);
    }
    return false;
}           

至此,索引檔案的寫入套路就已經介紹完了,它通過 hash 槽存儲了 hash 沖突連結清單的頭指針,然後每個索引項都儲存了前一個索引項的指針,借此,在檔案存儲中實作了連結清單的資料結構。

當根據 key 查找消息時,不光可以設定要查找 key 還可以設定最大查找數量,開始時間戳,結束時間戳,操作步驟如下:

  1. 根據 key 計算 hashcode,然後 keyHash 對 hash 槽數量取餘定位到 hashcode 對應的 hash 槽下标。
  2. 如果對應的 Hash 槽中存儲的資料小于 1 或大于目前索引條目個數則表示該 HashCode 沒有對應的條目,直接傳回。
  3. 由于會存在 hash 沖突,根據 slotValue 定位該 hash 槽最新的一個 Item 條目,将存儲的實體偏移加入到 phyOffsets 中 ,然後繼續驗證Item條目中存儲的上一個 Index 下标,如果大于等于 1 并且小于最大條目數,則繼續查找,否則結束查找。
  4. 根據 Index 下标定位到條目的起始實體偏移量,然後依次讀取 hashcode、 實體偏移量、時間差、上一個條目的Index下标,循環步驟4。
    • 如果存儲的時間差小于 0,則直接結束;如果 hashcode 比對并且消息存儲時間介于待查找時間start、 end之間則将消息實體偏移量加入到phyOffsets
    • 驗證條目的前一個 Index 索引,如果索引大于等于 1 并且小于Index條目數,則繼續查找,否則結束整個查找。

具體的實作代碼如下:

public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
    final long begin, final long end) {
    if (this.mappedFile.hold()) {
        // 計算 hash 槽的位置
        int keyHash = indexKeyHashMethod(key);
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
        try {
            // 擷取 hash 槽記憶體的索引位置
            int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
            // 驗證合法性
            if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                || this.indexHeader.getIndexCount() <= 1) {
            } else {
                // 周遊索引連結清單
                for (int nextIndexToRead = slotValue; ; ) {
                    // 數量夠了就退出
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                    // 找到本條索引的位置
                    int absIndexPos =
                        IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                            + nextIndexToRead * indexSize;
                    // 讀取索引條目的内容
                    int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                    long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
                    long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                    int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
                    // 驗證時間合法性
                    if (timeDiff < 0) {
                        break;
                    }
                    timeDiff *= 1000L;
                    long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                    boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
                    // 驗證整體合法性
                    if (keyHash == keyHashRead && timeMatched) {
                        phyOffsets.add(phyOffsetRead);
                    }
                    // 驗證連結清單中下一個節點的合法性,如何合法則繼續循環,否則退出
                    if (prevIndexRead <= invalidIndex
                        || prevIndexRead > this.indexHeader.getIndexCount()
                        || prevIndexRead == nextIndexToRead || timeRead < begin) {
                        break;
                    }
                    nextIndexToRead = prevIndexRead;
                }
            }
        } catch (Exception e) {
            log.error("selectPhyOffset exception ", e);
        }
    }
}           

CheckPoint檔案

checkpoint 的作用是記錄 CommitLog、ConsumeQueue、Index檔案的刷盤時間點,檔案固定長度為 4k,其中隻用該檔案的前面 24 個位元組,其存儲格式如下圖所示。

RocketMQ 消息存儲檔案
  • physicMsgTimestamp: CommitLog檔案刷盤時間點。
  • logicsMsgTimestamp: 消息消費隊列檔案刷盤時間點。
  • indexMsgTimestamp: 索引檔案刷盤時間點。

更新ConsumeQueue和IndexFile

消息消費隊列檔案、消息屬性索引檔案都是基于 CommitLog 檔案建構的,當消息生産者送出的消息存儲在 CommitLog 檔案中,ConsumeQueue、IndexFile需要及時更新,否則消息無法及時被消費,根據消息屬性查找消息也會出現較大延遲。RocketMQ 通過開啟一個線程 ReputMessageService 來準時轉發 CommitLog 檔案更新事件,相應的任務處理器根據轉發的消息及時更新 ConsumeQueue、IndexFile檔案。

檔案恢複

由于 RocketMQ 存儲首先将消息全量存儲在 CommitLog 檔案中,然後異步生成轉發任務更新 ConsumeQueue、Index 檔案。如果消息成功存儲到 CommitLog 檔案中,轉發任務未成功執行,此時消息伺服器 Broker 由于某個原因當機,導緻 CommitLog、ConsumeQueue、IndexFile檔案資料不一緻。如果不加以人工修複的話,會有一部分消息即便在 CommitLog 檔案中存在,但由于并沒有轉發到 ConsumeQueue,這部分消息将永遠不會被消費者消費。

接下來我們看一看 RocketMQ Broker 的啟動過程:

  1. 判斷上一次退出是否正常。
    • Broker在啟動時建立abort檔案,在退出時通過注冊 JVM 鈎子函數删除 abort 檔案。如果下一次啟動時存在 abort 檔案。 說明 Broker 是異常退出的,CommitLog 與 ConsumeQueue 資料有可能不一緻,需要進行修複。
  2. 加載延遲隊列,RocketMQ 定時消息相關。
  3. 加載所有 CommitLog 檔案,如果檔案大小和配置單檔案大小不一緻則忽略,建立好了将wrotePosition、flushedPosition, committedPosition三個指針都指向檔案結尾。後面的恢複過程會将這些指針修正。
  4. 加載消息 ConsumeQueue檔案。與加載 CommitLog 大緻相同。
  5. 加載存儲檢測點,檢測點主要記錄 commitLog 檔案、ConsumeQueue 檔案、Index 索引檔案的刷盤點。
  6. 加載索引檔案,如果上次異常退出,而且索引檔案上次刷盤時間小于該索引檔案最大的消息時間戳該檔案将立即銷毀。
  7. 根據 Broker 是否是正常停止執行不同的恢複政策,下文将分别介紹異常停止、正常停止的檔案恢複機制。
  8. 恢複 ConsumeQueue 檔案後,将在 CommitLog 執行個體中儲存每個消息消費隊列目前的存儲邏輯偏移量,這也是消息中不僅存儲主題、消息隊列 ID 還存儲了消息隊列偏移量的關鍵所在。

Broker 正常停止

  1. Broker正常停止再重新開機時,從倒數第三個檔案開始進行恢複,如果不足 3 個檔案,則從第一個檔案開始恢複。
  2. 從要恢複的 CommitLog 中,按照讀到的消息大小讀出消息正文,然後使用CRC(循環備援校驗)判斷消息是否正确。
  3. 周遊 CommitLog 檔案,每次取出一條消息,如果檢查結果為 true 并且消息的長度大于 0 表示消息正确,校驗指針移動到本條消息的末尾;如果查找結果為 true 并且消息的長度等于 0,表示已到該檔案的末尾,如果還有下一個檔案需要檢查,則循環步驟3,否則跳出循環; 如果查找結構為 false,表明該檔案未填滿所有消息,跳出循環,結束周遊檔案。
  4. 通過步驟 3,最終會得到一個校驗通過的偏移 offset,通過它來更新 commit 指針和 flush 指針。
  5. 删除 offset 之後的所有檔案。

正常停止的時,Broker 會将 IndexFile 和 ConsumeQueue 都更新好,是以如果 Broker 正常停止的話,恢複過程隻是修正commit 指針和 flush 指針。

Broker 異常停止

異常檔案恢複的步驟與正常停止檔案恢複的流程基本相同,其主要差别有兩個。首先,正常停止預設從倒數第三個檔案開始進行恢複,而異常停止則需要從最後一個檔案往前走,找到第一個消息存儲正常的檔案。其次,如果 CommitLog 目錄沒有消息檔案,如果在消息消費隊列 ConsumeQueue 目錄下存在檔案,則需要銷毀。

如何判斷一個 CommitLog 檔案是正确的呢?

  1. 首先判斷檔案的魔數
  2. 如果檔案中第一條消息的存儲時間等于 0,則認為檔案無效
  3. 對比檔案第一條消息的時間戳與檢測點,檔案第一條消息的時間戳小于檔案檢測點 checkpoint 說明該檔案部分消息是可靠的,則從該檔案開始恢複。
  4. 如果根據前 3 步算法找到了合法的 CommitLog,則周遊 CommitLog 中的消息,驗證消息的合法性,并将消息重新轉發到消息消費隊列與索引檔案,這樣會造成 ConsumeQueue 的備援,這需要消息的消費者來實作幂等性。
  5. 如果步驟3未找到有效 CommitLog,則設定 CommitLog 目錄的 flush 指針、 commit 指針都為 0,并銷毀消息消費隊列檔案。

異常停止時,不确定 ConsumeQueue 和 IndexFile 是否正确,是以從最後一個有效檔案,重新發送 CommitLog 變動事件,進而觸發 ConsumeQueue 和 IndexFile 的更新。

我認為這裡有問題,這裡隻重發最後一個有效檔案的 CommitLog 變動事件,如果倒數第二個檔案的最後幾條改動事件還沒有被處理時,建立了新的 CommitLog 檔案(最後一個 CommitLog檔案)并且成功寫入了資料,并重新整理了 checkpoint。那麼倒數第二個 CommitLog 的最後幾條消息就丢失了。為此,我在官方 Github 倉庫提了一個

issue

,但是目前未收到回複。

而且,如果最後一個 CommitLog 幾乎寫完,那麼也會産生很多的備援消息,我覺得這裡可以從 ConsumeQueue 的最大索引處開始,順序恢複所有有效的 CommitLog 内容。

刷盤機制

RocketMQ 的存儲與讀寫是基于 JDK NIO 的記憶體映射機制(MappedByteBuffer)的,消息存儲時首先将消息追加到記憶體,再根據配置的刷盤政策在不同時間進行刷寫磁盤。如果是同步刷盤,消息追加到記憶體後,将同步調用 MappedByteBuffer 的 force 方法;如果是異步刷盤,在消息追加到記憶體後立刻傳回給消息發送端。RocketMQ 使用一個單獨的線程按照某一個設定的頻率執行刷盤操作。通過在 broker 配置檔案中配置 flushDiskType 來設定刷盤方式,可選值為 ASYNC_FLUSH (異步刷盤)、SYNC_FLUSH (同步刷盤),預設為異步刷盤。 ConsumeQueue、IndexFile 刷盤的實作相對于 CommitLog 刷盤機制來說都很簡單,ConsumeQueue 是周期性刷盤,索引檔案的刷盤并不是采取定時刷盤機制,而是每次想要更新一次索引檔案就會将之前的改動刷寫到磁盤。接下來我将主要介紹 CommitLog 的刷盤過程。

同步刷盤

同步刷盤,指的是在消息追加到記憶體映射檔案的記憶體中後,立即将資料從記憶體刷寫到磁盤檔案,CommitLog 中有一個刷盤服務 GroupCommitService,所有消息發送線程接收到的同步寫入請求,最終都會以請求-回應的方式通知 GroupCommitService 代其進行刷盤操作。當 GroupCommitService 執行完刷盤任務,或者刷盤任務執行逾時時,發送線程才會回複消息的 Producer。

我覺得這裡引入 GroupCommitService 的意義主要有如下幾點:

  1. 避免鎖競争
  2. 抽象出的Request-Response模型,可以用來實作逾時機制
  3. 避免無意義的重新整理調用,每次刷盤都會重新整理最新寫入的所有資料,這樣如果有實際已經被重新整理過的請求過來,隻要判斷重新整理指針就能快速知道是否已經完成
  4. 保證了刷盤的順序

接下來我們看看 GroupCommitService 的核心内容:

// 接收到的請求,直接寫入requestsWrite
private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
// 刷盤時,從requestsRead讀取刷盤請求,有兩個隊列的意義是将讀寫過程的鎖沖突消除,後面大家就會看到實際的操作過程
private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
// 添加刷盤請求
public synchronized void putRequest(final GroupCommitRequest request) {
    // 寫鎖
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    // 通知:有新的請求過來了
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}
// 交換讀寫隊列,依靠它完成讀寫鎖沖突分離
private void swapRequests() {
    List<GroupCommitRequest> tmp = this.requestsWrite;
    this.requestsWrite = this.requestsRead;
    this.requestsRead = tmp;
}
// 實際刷盤過程
private void doCommit() {
    // 讀鎖
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                for (int i = 0; i < 2 && !flushOK; i++) {
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                    if (!flushOK) {
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
                req.wakeupCustomer(flushOK);
            }
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}
// 線程循環
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            this.waitForRunning(10);
            // 等待結束後,會交換讀寫隊列
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }
    synchronized (this) {
        this.swapRequests();
    }
    this.doCommit();
    CommitLog.log.info(this.getServiceName() + " service end");
}           

異步刷盤

異步刷盤根據是否開啟 transientStorePoolEnable 機制,刷盤實作會有細微差别。如果 transientStorePoolEnable 為 true, RocketMQ 會單獨申請一個與目标實體檔案 (CommitLog) 同樣大小的堆外記憶體,該堆外記憶體将使用記憶體鎖定,確定不會被置換到虛拟記憶體中去,消息首先追加到堆外記憶體,然後送出到 FileChannel 中,再 flush 到磁盤。 如果 transientStorePoolEnable 為 false,消息直接追加到與實體檔案直接映射的記憶體中,然後刷寫到磁盤中。

當 transientStorePoolEnable 為 true時,會有一個 CommitRealTimeService 預設每隔 200ms 将直接記憶體中的資料送出到 FileChannel,一次送出預設至少要包含 4 個頁的資料,否則暫時不送出。當 transientStorePoolEnable 為 false 時,這個 CommitRealTimeService 實際上什麼都沒做。

然後是定時刷盤的邏輯,CommitLog 會有一個 FlushRealTimeService 定時将資料刷入磁盤,預設每隔 10s 進行一次刷盤,和 commit 過程一樣,刷盤階段預設也是至少攢夠 4 個頁的髒資料才進行刷盤,當 transientStorePoolEnable 為 true時,刷盤過程調用的是 FileChannel 的 force,否則調用的是 MappedByteBuffer 的 force。

過期删除機制

由于 RocketMQ 操作 CommitLog、ConsumeQueue檔案是基于記憶體映射機制并在啟動的時候會加載 CommitLog、ConsumeQueue 目錄下的所有檔案,為了避免記憶體與磁盤的浪費,不可能将消息永久存儲在消息伺服器上,是以需要引人一種機制來删除己過期的檔案。 RocketMQ 順序寫 CommitLog 檔案、ConsumeQueue 檔案,所有寫操作全部落在最後一個 CommitLog 或 ConsumeQueue 檔案上,之前的檔案在下一個檔案建立後将不會再被更新。RocketMQ 清除過期檔案的方法是: 如果非目前寫檔案在一定時間間隔内沒有再次被更新,則認為是過期檔案,可以被删除,RocketMQ 不會關注這個檔案上的消息是否全部被消費。預設每個檔案的過期時間為 72 小時 ,通過在 Broker 配置檔案中設定 fileReservedTime 來改變過期時間,機關為小時。

RocketMQ 會每隔 10s 排程一次清除過程,檢測是否需要清除過期檔案。

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
RocketMQ 消息存儲檔案

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]《RocketMQ技術内幕》

[2]《RocketMQ實戰與原了解析》

[3]

老生常談——利用消息隊列處理分布式事務

[4]

RocketMQ架構解析

[5]

MappedByteBuffer VS FileChannel 孰強孰弱?

[6]

檔案 IO 操作的一些最佳實踐

[7]

海量資料處理之Bloom Filter詳解

[8]

rocketmq GitHub Wiki