天天看點

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

源碼分析 RocketMQ DLedger 多副本系列已經進行到第 8 篇了,前面的章節主要是介紹了基于 raft 協定的選主與日志複制,從本篇開始将開始關注如何将 DLedger 應用到 RocketMQ中。

摘要:詳細分析了RocketMQ DLedger 多副本(主從切換) 是如何整合到 RocketMQ中,本文的行文思路首先結合已掌握的DLedger 多副本相關的知識初步思考其實作思路,然後從 Broker啟動流程、DLedgerCommitlog 核心類的講解,再從消息發送(追加)與消息查找來進一步探讨 DLedger 是如何支援平滑更新的。

1、閱讀源碼之前的思考

RocketMQ 的消息存儲檔案主要包括 commitlog 檔案、consumequeue 檔案與 Index 檔案。commitlog 檔案存儲全量的消息,consumequeue、index 檔案都是基于 commitlog 檔案建構的。要使用 DLedger 來實作消息存儲的一緻性,應該關鍵是要實作 commitlog 檔案的一緻性,即 DLedger 要整合的對象應該是 commitlog 檔案,即隻需保證 raft 協定的複制組内各個節點的 commitlog 檔案一緻即可。

我們知道使用檔案存儲消息都會基于一定的存儲格式,rocketmq 的 commitlog 一個條目就包含魔數、消息長度,消息屬性、消息體等,而我們再來回顧一下 DLedger 日志的存儲格式:

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

DLedger 要整合 commitlog 檔案,是不是可以把 rocketmq 消息,即一個個 commitlog 條目整體當成 DLedger 的 body 字段即可。

還等什麼,跟我一起來看源碼吧!!!别急,再抛一個問題,DLedger 整合 RocketMQ commitlog,能不能做到平滑更新?

帶着這些思考和問題,一起來探究 DLedger 是如何整合 RocketMQ 的。

2、從 Broker 啟動流程看 DLedger

溫馨提示:本文不會詳細介紹 Broker 端的啟動流程,隻會點出在啟動過程中與 DLedger 相關的代碼,如想詳細了解 Broker 的啟動流程,建議關注筆者的《RocketMQ技術内幕》一書。

Broker 涉及到 DLedger 相關關鍵點如下:

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

2.1 建構 DefaultMessageStore

DefaultMessageStore 構造方法

if(messageStoreConfig.isEnableDLegerCommitLog()) {  // @1
    this.commitLog = new DLedgerCommitLog(this);
 else {
    this.commitLog = new CommitLog(this);                    // @2
}           

代碼@1:如果開啟 DLedger ,commitlog 的實作類為 DLedgerCommitLog,也是本文需要關注的關鍵所在。

代碼@2:如果未開啟 DLedger,則使用舊版的 Commitlog實作類。

2.2 增加節點狀态變更事件監聽器

BrokerController#initialize

if (messageStoreConfig.isEnableDLegerCommitLog()) {
    DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
    ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}           

主要調用 LedgerLeaderElector 的 addRoleChanneHandler 方法增加 節點角色變更事件監聽器,DLedgerRoleChangeHandler 是實作主從切換的另外一個關鍵點。

2.3 調用 DefaultMessageStore 的 load 方法

DefaultMessageStore#load

// load Commit Log
result = result && this.commitLog.load();   // @1
// load Consume Queue
result = result && this.loadConsumeQueue();  
if (result) {
    this.storeCheckpoint =  new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));
    this.indexService.load(lastExitOK);
    this.recover(lastExitOK);                         // @2
    log.info("load over, and the max phy offset = {}", this.getMaxPhyOffset());
}           

代碼@1、@2 最終都是委托 commitlog 對象來執行,這裡的關鍵又是如果開啟了 DLedger,則最終調用的是 DLedgerCommitLog。

經過上面的鋪墊,主角 DLedgerCommitLog “閃亮登場“了。

3、DLedgerCommitLog 詳解

溫馨提示:由于 Commitlog 的絕大部分方法都已經在《RocketMQ技術内幕》一書中詳細介紹了,并且 DLedgerCommitLog 的實作原理與 Commitlog 檔案的實作原理類同,本文會一筆帶過關于存儲部分的實作細節。

3.1 核心類圖

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

DLedgerCommitlog 繼承自 Commitlog。讓我們一一來看一下它的核心屬性。

  • DLedgerServer dLedgerServer

    基于 raft 協定實作的叢集内的一個節點,用 DLedgerServer 執行個體表示。

  • DLedgerConfig dLedgerConfig

    DLedger 的配置資訊。

  • DLedgerMmapFileStore dLedgerFileStore

    DLedger 基于檔案映射的存儲實作。

  • MmapFileList dLedgerFileList

    DLedger 所管理的存儲檔案集合,對比 RocketMQ 中的 MappedFileQueue。

  • int id

    節點ID,0 表示主節點,非0表示從節點

  • MessageSerializer messageSerializer

    消息序列器。

  • long beginTimeInDledgerLock = 0

    用于記錄 消息追加的時耗(日志追加所持有鎖時間)。

  • long dividedCommitlogOffset = -1

    記錄的舊 commitlog 檔案中的最大偏移量,如果通路的偏移量大于它,則通路 dledger 管理的檔案。

  • boolean isInrecoveringOldCommitlog = false

    是否正在恢複舊的 commitlog 檔案。

接下來我們将詳細介紹 DLedgerCommitlog 各個核心方法及其實作要點。

3.2 構造方法

public DLedgerCommitLog(final DefaultMessageStore defaultMessageStore) {
    super(defaultMessageStore);                   // @1
    dLedgerConfig =  new DLedgerConfig();
    dLedgerConfig.setEnableDiskForceClean(defaultMessageStore.getMessageStoreConfig().isCleanFileForciblyEnable());
    dLedgerConfig.setStoreType(DLedgerConfig.FILE);
    dLedgerConfig.setSelfId(defaultMessageStore.getMessageStoreConfig().getdLegerSelfId());
    dLedgerConfig.setGroup(defaultMessageStore.getMessageStoreConfig().getdLegerGroup());
    dLedgerConfig.setPeers(defaultMessageStore.getMessageStoreConfig().getdLegerPeers());
    dLedgerConfig.setStoreBaseDir(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
    dLedgerConfig.setMappedFileSizeForEntryData(defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog());
    dLedgerConfig.setDeleteWhen(defaultMessageStore.getMessageStoreConfig().getDeleteWhen());
    dLedgerConfig.setFileReservedHours(defaultMessageStore.getMessageStoreConfig().getFileReservedTime() + 1);  
    id = Integer.valueOf(dLedgerConfig.getSelfId().substring(1)) + 1;            // @2
    dLedgerServer = new DLedgerServer(dLedgerConfig);                           // @3
    dLedgerFileStore = (DLedgerMmapFileStore) dLedgerServer.getdLedgerStore();
    DLedgerMmapFileStore.AppendHook appendHook = (entry, buffer, bodyOffset) -> {
            assert bodyOffset == DLedgerEntry.BODY_OFFSET;
            buffer.position(buffer.position() + bodyOffset + MessageDecoder.PHY_POS_POSITION);
            buffer.putLong(entry.getPos() + bodyOffset);
    };
    dLedgerFileStore.addAppendHook(appendHook);   // @4
    dLedgerFileList = dLedgerFileStore.getDataFileList();
    this.messageSerializer = new MessageSerializer(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());   // @5
}           

代碼@1:調用父類 即 CommitLog 的構造函數,加載 ${ROCKETMQ_HOME}/store/ comitlog 下的 commitlog 檔案,以便相容更新 DLedger 的消息。我們稍微看一下 CommitLog 的構造函數:

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

代碼@2:建構 DLedgerConfig 相關配置屬性,其主要屬性如下:

  • enableDiskForceClean

    是否強制删除檔案,取自 broker 配置屬性 cleanFileForciblyEnable,預設為 true 。

  • storeType

    DLedger 存儲類型,固定為 基于檔案的存儲模式。

  • dLegerSelfId
  1. 節點的 id 名稱,示例配置:n0,其配置要求第二個字元後必須是數字。
  • dLegerGroup

    DLeger group 的名稱,建議與 broker 配置屬性 brokerName 保持一緻。

  • dLegerPeers

    DLeger Group 中所有的節點資訊,其配置示例 n0-127.0.0.1:40911;n1-127.0.0.1:40912;n2-127.0.0.1:40913。多個節點使用分号隔開。

  • storeBaseDir

    設定 DLedger 的日志檔案的根目錄,取自 borker 配件檔案中的 storePathRootDir ,即 RocketMQ 的資料存儲根路徑。

  • mappedFileSizeForEntryData

    設定 DLedger 的單個日志檔案的大小,取自 broker 配置檔案中的 - mapedFileSizeCommitLog,即與 commitlog 檔案的單個檔案大小一緻。

  • deleteWhen

    DLedger 日志檔案的删除時間,取自 broker 配置檔案中的 deleteWhen,預設為淩晨 4點。

  • fileReservedHours

    DLedger 日志檔案保留時長,取自 broker 配置檔案中的 fileReservedHours,預設為 72h。

代碼@3:根據 DLedger 配置資訊建立 DLedgerServer,即建立 DLedger 叢集節點,叢集内各個節點啟動後,就會觸發選主。

代碼@4:建構 appendHook 追加鈎子函數,這是相容 Commitlog 檔案很關鍵的一步,後面會詳細介紹其作用。

代碼@5:建構消息序列化。

根據上述的流程圖,建構好 DefaultMessageStore 實作後,就是調用其 load 方法,在啟用 DLedger 機制後,會依次調用 DLedgerCommitlog 的 load、recover 方法。

3.3 load

public boolean load() {
    boolean result = super.load();
    if (!result) {
        return false;
    }
    return true;
}           

DLedgerCommitLog 的 laod 方法實作比較簡單,就是調用 其父類 Commitlog 的 load 方法,即這裡也是為了啟用 DLedger 時能夠相容以前的消息。

3.4 recover

在 Broker 啟動時會加載 commitlog、consumequeue等檔案,需要恢複其相關是資料結構,特别是與寫入、刷盤、送出等指針,其具體調用 recover 方法。

DLedgerCommitLog#recover

public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {  // @1
    recover(maxPhyOffsetOfConsumeQueue);
}           

首先會先恢複 consumequeue,得出 consumequeue 中記錄的最大有效實體偏移量,然後根據該實體偏移量進行恢複。

接下來看一下該方法的處理流程與關鍵點。

dLedgerFileStore.load();           

Step1:加載 DLedger 相關的存儲檔案,并一一建構對應的 MmapFile,其初始化三個重要的指針 wrotePosition、flushedPosition、committedPosition 三個指針為檔案的大小。

if (dLedgerFileList.getMappedFiles().size() > 0) {   
    dLedgerFileStore.recover();   // @1
    dividedCommitlogOffset = dLedgerFileList.getFirstMappedFile().getFileFromOffset();     // @2
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    if (mappedFile != null) {                                                                                                       // @3
        disableDeleteDledger();
    }
    long maxPhyOffset = dLedgerFileList.getMaxWrotePosition();
    // Clear ConsumeQueue redundant data
    if (maxPhyOffsetOfConsumeQueue >= maxPhyOffset) {      // @4
        log.warn("[TruncateCQ]maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, maxPhyOffset);
        this.defaultMessageStore.truncateDirtyLogicFiles(maxPhyOffset);
    }
    return;
}           

Step2:如果已存在 DLedger 的資料檔案,則隻需要恢複 DLedger 相關資料文建,因為在加載舊的 commitlog 檔案時已經将其重要的資料指針設定為最大值。其關鍵實作點如下:

  • 首先調用 DLedger 檔案存儲實作類 DLedgerFileStore 的 recover 方法,恢複管轄的 MMapFile 對象(一個檔案對應一個MMapFile執行個體)的相關指針,其實作方法與 RocketMQ 的 DefaultMessageStore 的恢複過程類似。
  • 設定 dividedCommitlogOffset 的值為 DLedger 中所有實體檔案的最小偏移量。操作消息的實體偏移量小于該值,則從 commitlog 檔案中查找;實體偏移量大于等于該值的話則從 DLedger 相關的檔案中查找消息。
  • 如果存在舊的 commitlog 檔案,則禁止删除 DLedger 檔案,其具體做法就是禁止強制删除檔案,并将檔案的有效存儲時間設定為 10 年。
  • 如果 consumequeue 中存儲的最大實體偏移量大于 DLedger 中最大的實體偏移量,則删除多餘的 consumequeue 檔案。
溫馨提示:為什麼當存在 commitlog 檔案的情況下,不能删除 DLedger 相關的日志檔案呢?

因為在此種情況下,如果 DLedger 中的實體檔案有删除,則實體偏移量會斷層。

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

正常情況下, maxCommitlogPhyOffset 與 dividedCommitlogOffset 是連續的,這樣非常友善是通路 commitlog 還是 通路 DLedger ,但如果DLedger 部分檔案删除後,這兩個值就變的不連續,就會造成中間的檔案空洞,無法被連續通路。

isInrecoveringOldCommitlog = true;
super.recoverNormally(maxPhyOffsetOfConsumeQueue);
isInrecoveringOldCommitlog = false;           

Step3:如果啟用了 DLedger 并且是初次啟動(還未生成 DLedger 相關的日志檔案),則需要恢複 舊的 commitlog 檔案。

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile == null) {           // @1
    return;
}
ByteBuffer byteBuffer =  mappedFile.sliceByteBuffer();
byteBuffer.position(mappedFile.getWrotePosition());
boolean needWriteMagicCode = true;
// 1 TOTAL SIZE
byteBuffer.getInt(); //size
int magicCode = byteBuffer.getInt();
if (magicCode == CommitLog.BLANK_MAGIC_CODE) {   // @2
    needWriteMagicCode = false;
} else {
    log.info("Recover old commitlog found a illegal magic code={}", magicCode);
}
dLedgerConfig.setEnableDiskForceClean(false);
dividedCommitlogOffset = mappedFile.getFileFromOffset() + mappedFile.getFileSize();   // @3
log.info("Recover old commitlog needWriteMagicCode={} pos={} file={} dividedCommitlogOffset={}", needWriteMagicCode, mappedFile.getFileFromOffset() + mappedFile.getWrotePosition(), mappedFile.getFileName(), dividedCommitlogOffset);
if (needWriteMagicCode) {  // @4
    byteBuffer.position(mappedFile.getWrotePosition());
    byteBuffer.putInt(mappedFile.getFileSize() - mappedFile.getWrotePosition());
    byteBuffer.putInt(BLANK_MAGIC_CODE);
    mappedFile.flush(0);
}
mappedFile.setWrotePosition(mappedFile.getFileSize());   // @5
mappedFile.setCommittedPosition(mappedFile.getFileSize());
mappedFile.setFlushedPosition(mappedFile.getFileSize());
dLedgerFileList.getLastMappedFile(dividedCommitlogOffset);
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
}           

Step4:如果存在舊的 commitlog 檔案,需要将最後的檔案剩餘部分全部填充,即不再接受新的資料寫入,新的資料全部寫入到 DLedger 的資料檔案中。其關鍵實作點如下:

  • 嘗試查找最後一個 commitlog 檔案,如果未找到,則結束。
  • 從最後一個檔案的最後寫入點(原 commitlog 檔案的 待寫入位點)嘗試去查找寫入的魔數,如果存在魔數并等于 CommitLog.BLANK_MAGIC_CODE,則無需再寫入魔數,在更新 DLedger 第一次啟動時,魔數為空,故需要寫入魔數。
  • 初始化 dividedCommitlogOffset ,等于最後一個檔案的起始偏移量加上檔案的大小,即該指針指向最後一個檔案的結束位置。
  • 将最後一個 commitlog 未寫滿的資料全部寫入,其方法為 設定消息體的 size 與 魔數即可。
  • 設定最後一個檔案的 wrotePosition、flushedPosition、committedPosition 為檔案的大小,同樣有意味者最後一個檔案已經寫滿,下一條消息将寫入 DLedger 中。

在啟用 DLedger 機制時 Broker 的啟動流程就介紹到這裡了,相信大家已經了解 DLedger 在整合 RocketMQ 上做的努力,接下來我們從消息追加、消息讀取兩個方面再來探讨 DLedger 是如何無縫整合 RocketMQ 的,實作平滑更新的。

4、從消息追加看 DLedger 整合 RocketMQ 如何實作無縫相容

溫馨提示:本節同樣也不會詳細介紹整個消息追加(存儲流程),隻是要點出與 DLedger(多副本、主從切換)相關的核心關鍵點。如果想詳細了解消息追加的流程,可以閱讀筆者所著的《RocketMQ技術内幕》一書。

DLedgerCommitLog#putMessage

AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBody(encodeResult.data);
dledgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
    return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}           

關鍵點一:消息追加時,則不再寫入到原先的 commitlog 檔案中,而是調用 DLedgerServer 的 handleAppend 進行消息追加,該方法會有叢集内的 Leader 節點負責消息追加以及在消息複制,隻有超過叢集内的半數節點成功寫入消息後,才會傳回寫入成功。如果追加成功,将會傳回本次追加成功後的起始偏移量,即 pos 屬性,即類似于 rocketmq 中 commitlog 的偏移量,即實體偏移量。

long wroteOffset =  dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET;
ByteBuffer buffer = ByteBuffer.allocate(MessageDecoder.MSG_ID_LENGTH);
String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset);
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(), queueOffset, eclipseTimeInLock);           

關鍵點二:根據 DLedger 的起始偏移量計算真正的消息的實體偏移量,從開頭部分得知,DLedger 自身有其存儲協定,其 body 字段存儲真實的消息,即 commitlog 條目的存儲結構,傳回給用戶端的消息偏移量為 body 字段的開始偏移量,即通過 putMessage 傳回的實體偏移量與不使用Dledger 方式傳回的實體偏移量的含義是一樣的,即從開偏移量開始,可以正确讀取消息,這樣 DLedger 完美的相容了 RocketMQ Commitlog。關于 pos 以及 wroteOffset 的圖解如下:

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

5、從消息讀取看 DLedger 整合 RocketMQ 如何實作無縫相容

DLedgerCommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
    if (offset < dividedCommitlogOffset) {   // @1
        return super.getMessage(offset, size);
    }
    int mappedFileSize = this.dLedgerServer.getdLedgerConfig().getMappedFileSizeForEntryData();
    MmapFile mappedFile = this.dLedgerFileList.findMappedFileByOffset(offset, offset == 0);   // @2
    if (mappedFile != null) {
        int pos = (int) (offset % mappedFileSize);
        return  convertSbr(mappedFile.selectMappedBuffer(pos, size));                                       // @3
    }
    return null;
}           

消息查找比較簡單,因為傳回給用戶端消息,轉發給 consumequeue 的消息實體偏移量并不是 DLedger 條目的偏移量,而是真實消息的起始偏移量。其實作關鍵點如下:

  • 如果查找的實體偏移量小于 dividedCommitlogOffset,則從原先的 commitlog 檔案中查找。
  • 然後根據實體偏移量按照二分方找到具體的實體檔案。
  • 對實體偏移量取模,得出在該實體檔案中中的絕對偏移量,進行消息查找即可,因為隻有知道其實體偏移量,從該處先将消息的長度讀取出來,然後即可讀出一條完整的消息。

5、總結

根據上面詳細的介紹,我想讀者朋友們應該不難得出如下結論:

  • DLedger 在整合時,使用 DLedger 條目包裹 RocketMQ 中的 commitlog 條目,即在 DLedger 條目的 body 字段來存儲整條 commitlog 條目。
  • 引入 dividedCommitlogOffset 變量,表示實體偏移量小于該值的消息存在于舊的 commitlog 檔案中,實作 更新 DLedger 叢集後能通路到舊的資料。
  • 新 DLedger 叢集啟動後,會将最後一個 commitlog 填充,即新的資料不會再寫入到 原先的 commitlog 檔案。
  • 消息追加到 DLedger 資料日志檔案中,傳回的偏移量不是 DLedger 條目的起始偏移量,而是DLedger 條目中 body 字段的起始偏移量,即真實消息的起始偏移量,保證消息實體偏移量的語義與 RocketMQ Commitlog一樣。

RocketMQ 整合 DLedger(多副本)實作平滑更新的設計技巧就介紹到這裡了。

推薦閱讀:

1、

RocketMQ 多副本前置篇:初探raft協定

2、

源碼分析RocketMQ多副本之Leader選主

3、

源碼分析 RocketMQ DLedger 多副本存儲實作

4、

源碼分析 RocketMQ DLedger(多副本) 之日志追加流程

5、

源碼分析 RocketMQ DLedger(多副本) 之日志複制-上篇

6、

源碼分析 RocketMQ DLedger(多副本) 之日志複制-下篇

7、

基于 raft 協定的 RocketMQ DLedger 多副本日志複制設計原理

8、

RocketMQ 整合 DLedger(多副本)即主從切換實作平滑更新的設計技巧

原文釋出時間為:2019-10-02

本文作者:丁威,《RocketMQ技術内幕》作者。

本文來自

中間件興趣圈

,了解相關資訊可以關注