天天看點

RocketMQ 消息存儲

引言

前面我們已經簡單地介紹了 RocketMQ 的整體設計思路,本文着重其中消息存儲部分的整體實作思路,更多關于 RocketMQ 的文章均收錄于

<RocketMQ系列文章>

;

消息存儲

通過前面的知識,我們已經知道了topic是如何配置設定到Broker的,以及消息發送方是如何決定把消息發送給哪個Broker的,接下來我們看一看Broker介紹到消息後,是怎麼存儲消息的。

RocketMQ主要存儲的檔案包括CommitLog檔案、ConsumeQueue檔案、IndexFile檔案。RocketMQ将所有主題的消息存儲在同一個檔案中,確定消息發送時順序寫檔案,盡最大的能力確定消息發送的高性能與高吞吐量。但由于消息中間件一般是基于消息主題的訂閱機制,這樣便給按照消息主題檢索消息帶來了極大的不便。為了提高消息消費的效率,RocketMQ引入了ConsumeQueue消息隊列檔案,每個消息主題包含多個消息消費隊列,每一個消息隊列有一個消息檔案。IndexFile索引檔案,其主要設計理念就是為了加速消息的檢索性能,根據消息的屬性快速從CommitLog檔案中檢索消息。

磁盤有時候會比你想象的快很多,有時候也會比你想象的慢很多,關鍵在如何使用,使用得當,磁盤的速度完全可以比對上網絡的資料傳輸速度。目前的高性能磁盤,順序寫速度可以達到600MB/s,超過了一般網卡的傳輸速度,這是磁盤比想象的快的地方。但是磁盤随機寫的速度隻有大概1OOKB/s, 和順序寫的性能相差6000倍!因為有如此巨大的速度差别,好的消息隊列系統會比普通的消息隊列系統速度快多個數量級。

存儲方案

RocketMQ 消息存儲

RocketMQ消息的存儲是由ConsumeQueue和CommitLog配合完成的,消息真正的實體存儲檔案是CommitLog, ConsumeQueue是消息的邏輯隊列,類似資料庫的索引檔案,存儲的是指向實體存儲的位址。每個Topic下的每個Message Queue都有一個對應的ConsumeQueue檔案。

CommitLog以實體檔案的方式存放,每台Broker上的CommitLog被本機器所有ConsumeQueue共享。在CommitLog中,一個消息的存儲長度是不固定的,RocketMQ采取一些機制,盡量向CommitLog中順序寫,但是随機讀。ConsumeQueue的内容也會被寫到磁盤裡作持久存儲,隻不過是通過異步刷盤的方式進行。

這樣設計的優點:

  1. CommitLog順序寫,可以大大提高寫入效率。接收到消息時,隻有CommitLog是需要同步刷盤的(根據配置,可能也不需要同步刷盤),其他檔案都是異步儲存,如果發生了當機,RocketMQ可以根據CommitLog恢複ConsumeQueue檔案和IndexFile。
  2. 雖然CommitLog是随機讀,但是利用作業系統的page cache機制,可以批量地從磁盤讀取,cache存到記憶體中之後,加速後續的讀取速度。
  3. 為了保證完全的順序寫,需要ConsumeQueue這個中間結構,因為ConsumeQueue裡隻存偏移量資訊,是以尺寸是有限的,在實際情況中,大部分的ConsumeQueue能夠被全部讀入記憶體,是以這個中間結構的操作速度很快,可以認為是記憶體讀取的速度。此外為了保證CommitLog和ConsumeQueue的一緻性,CommitLog裡存儲了Consume Queues、Message keys、Tags等所有資訊,即使ConsumeQueue丢失,也可以通過commitLog完全恢複出來。

下圖是一個Broker在檔案系統中存儲的各個檔案。我們可以看到CommitLog檔案夾、ConsumeQueue檔案夾,還有在config檔案夾中Topic、Consumer的相關資訊。最下面那個檔案夾index存的是索引檔案,這個檔案用來加快消息查詢的速度。

RocketMQ 消息存儲
  • commitlog:消息存儲目錄。
  • config:運作期間一些配置資訊,主要包括下列資訊。
    • consumerFilter.json:主題消息過濾資訊。
    • consumerOffset.json:叢集消費模式消息消費進度。
    • delayOffset.json:延時消息隊列拉取進度。
    • subscriptionGroup:消息消費組配置資訊。
    • topic.json:topic配置屬性。
  • consumequeue:消息消費隊列存儲目錄。
  • index:消息索引檔案存儲目錄。
  • abort:如果存在 abort檔案說明Broker非正常關閉,該檔案預設啟動時建立,正常退出之前删除。
  • checkpoint:檔案檢測點存儲commitlog檔案最後一次刷盤時間戳、consumequeue最後一次刷盤時間、index索引檔案最後一次刷盤時間戳。

存儲流程

  1. 如果目前Broker停止工作或Broker為SLAVE角色或目前Broker不支援寫入則拒絕消息寫入;如果消息主題長度超過256個字元、消息屬性長度超過65536個字元将拒絕該消息寫入。
  2. 如果消息的延遲級别大于0,将消息的原主題名稱與原消息隊列ID存入消息屬性中,用延遲消息主題SCHEDULE_TOPIC、消息隊列ID更新原先消息的主題與隊列。
  3. 擷取目前可以寫入的CommitLog檔案
  4. 在寫入CommitLog之前,先申請putMessageLock,也就是将消息存儲到CommitLog檔案中是串行的
  5. 設定消息的存儲時間,如果CommitLog檔案不存在就需要建立新的檔案
  6. 建立全局唯一消息ID
  7. 擷取該消息在消息隊列的偏移量
  8. 計算消息總長度,并寫入CommitLog
  9. 如果計算發現CommitLog無法存儲所有内容,則建立新的CommitLog,檔案名為即将插入消息的偏移
  10. 将消息内容寫入CommitLog檔案後根據配置進行同步或者異步刷盤
  11. 更新邏輯偏移量,并釋放putMessageLock
  12. 根據CommitLog偏移量,消息存儲大小,tag的hash值插入一條Message到ConsumeQueue
  13. 根據Key的hash值,CommitLog偏移量,插入一條資料到IndexFile
  14. ConsumerQueue每隔一段時間自動刷盤、IndexFile在每次建立新indexFile時刷盤之前的索引檔案、checkpoint檔案在刷盤CommotLog,ConsumeQueue和IndexFile時進行更新
RocketMQ 消息存儲

文章說明

更多有價值的文章均收錄于

貝貝貓的文章目錄
RocketMQ 消息存儲

版權聲明: 本部落格所有文章除特别聲明外,均采用 BY-NC-SA 許可協定。轉載請注明出處!

創作聲明: 本文基于下列所有參考内容進行創作,其中可能涉及複制、修改或者轉換,圖檔均來自網絡,如有侵權請聯系我,我會第一時間進行删除。

參考内容

[1]《RocketMQ技術内幕》

[2]《RocketMQ實戰與原了解析》

[3]

老生常談——利用消息隊列處理分布式事務

[4]

RocketMQ架構解析

[5]

MappedByteBuffer VS FileChannel 孰強孰弱?

[6]

檔案 IO 操作的一些最佳實踐

[7]

海量資料處理之Bloom Filter詳解

[8]

rocketmq GitHub Wiki