天天看點

EQueue檔案持久化消息關鍵點設計思路

消息;

隊列,隊列中存放的是消息索引資訊,即消息在檔案中的實體位置(messageoffset)和在隊列中的邏輯位置(queueoffset)的映射資訊;

隊列消費進度,表示目前隊列中的消息消費到第幾個了;

producer将消息的二進制資料發送到broker;

broker做的事情:

單線程持久化消息到記憶體映射檔案;

将目前消息的索引資訊放入緩沖區,可以使用disruptor的ringbuffer實作,單線程寫,無鎖。

單線程從緩沖區讀取消息索引資訊,并将索引資訊寫入記憶體映射檔案;

消息的記憶體映射檔案、消息索引的記憶體映射檔案都定時重新整理到磁盤,比如每隔1s重新整理一次,可配置;

broker将目前消息的索引資訊放入緩沖區後,就立即傳回了,然後producer就收到了消息發送的結果;

其他說明:

因為不可能用一個檔案來儲存所有的消息,是以肯定是用多個檔案的方式。也就是說,無論是儲存消息還是儲存消息索引,都用多個檔案。另外,由于隊列有多個,是以每個隊列都對應多個記憶體映射檔案。隊列檔案的目錄命名規則:rootpath / topic / queueid / queue mapped files

broker在将消息的索引資訊放入緩沖區時,要檢查緩沖區是否到達一定的水位,比如ringbuffer總大小100w個槽,假如水位是80%,那就是當現在ringbuffer中可用的槽不到20%時,應該要做流控,比如sleep 100s;理論上應該不會到達水位,因為寫消息索引肯定比寫消息本身要快;

consumer告訴broker目前需要拉取哪個topic下的哪個隊列裡的第幾個位置(queueoffset)開始的消息,并告訴要最多拉取多少個消息;

broker根據topic和queueid找到對應的隊列;

根據queueoffset從隊列拿到消息在檔案中的實體位置,即messageoffset;

根據messageoffset從消息的記憶體映射檔案擷取消息二進制資料;

将消息二進制資料寫入臨時的記憶體流裡,該記憶體流裡包含了所有要傳回的消息;

消息拉取數量達到要求或沒有新的消息可以拉取後,将記憶體流對應的二進制資料傳回給consumer;

consumer解析二進制資料,得到所有的消息對象;

每隔10s掃描是否有過期的消息檔案,過期時間可配置,比如三天;掃描時,發現檔案的最後修改時間是3天前,則删除;

每隔10s掃描是否有過期的消息索引檔案,判斷是否過期的依據是掃描每個消息索引檔案,判斷該檔案中的最後一個消息索引的messageoffset是否比最小的messageoffset還要小;如果小,就說明這個消息索引檔案已經無意義了,可以删除;

掃描磁盤上所有的消息的存儲檔案,為每個檔案建立記憶體映射;

掃描磁盤上所有的隊列(消息索引)的存儲檔案,為每個檔案建立記憶體映射;

對每個隊列,預恢複幾個檔案(比如最後的3個檔案)的資料到記憶體,剩餘的用到時再恢複;

同理,對于存儲消息的檔案,也預恢複幾個(比如最後的3個檔案)到記憶體;一般大部分消息者隻要消費進度不是太慢,總是應該已經趕上了最後那三個檔案了;

關于異常關閉broker時的邏輯,暫時還沒想清楚,還需要再細思;