天天看點

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》
Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

作者簡介

冉小龍

騰訊雲進階研發工程師

Apache Pulsar committer

RoP maintainer

Apache Pulsar Go client, Go Functions 作者及主要維護者

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

背景

Apache Bookkeeper 是基于日志的一個持久化系統,所有的資料會以日志的形式存儲到 Ledger 磁盤的 Entry Log 檔案中,之後通過背景異步回收的形式來将 EntryLog 檔案回收掉。但是在我們實際的使用場景中,發現很久之前的 EntryLog 檔案無法被删除掉,對 Entry Log 檔案存在的時間進行監控,具體如下:

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

我們可以看到,假設從 Broker 側設定的 Retention 政策最大為 5 天,即很久之前的 EntryLog 檔案依然存在于對應的 Ledger 資料盤中,導緻磁盤的占用率較高。雖然Bookie 的 GC 回收機制是背景異步回收的,當 Broker 側認為某條消息可以删除時,Bookie 并不會立即從磁盤中将該資料删除掉,而是利用 Bookie 的 GC 線程周期性的觸發回收的邏輯。但是資料的删除操作竟然滞後了半年多,于是萌生了搞懂 Bookie GC 回收機制的想法,究竟是什麼原因導緻了該現象的發生。

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

Bookie GC 介紹

在 Apache Bookkeeper 中,資料的寫入,讀取以及回收(壓縮)操作是互相隔離的。為了避免過多碎片檔案的産生,在 Bookies 中不同 Ledgers 中的 Entrys 會聚合存儲到一個 EntryLog 檔案中。Bookie 可以通過運作 GC 線程(GarbageCollectorThread)來删除未關聯的 Entry 條目來達到回收磁盤空間的目的。在目前的 EntryLog 檔案中,如果某一個 Ledger 中包含無法删除的 Entry,那麼這個 EntryLog 檔案将一直保留在資料盤(Ledger 盤)中無法被删除。由于業務場景的限定,我們沒辦法要求一個 EntryLog 檔案中所有 Ledgers 的 Entries 都能在近乎相同的時間内滿足可删除的條件。為了避免該現象,Bookie 引入了資料壓縮的概念,即通過掃描 EntryLog 檔案判定哪些 Entry 是可以删除的,可以删除的 Entry 繼續保留在原始的 EntryLog 檔案中,不可删除的 Entry 寫入新的 EntryLog 檔案中,掃描完成之後将原始的 EntryLog 檔案删除掉。

Bookie 壓縮類型

Bookie 的 GC 回收線程并不是一直執行的,而是基于特定的門檻值,Bookie 按照一個 EntryLog 檔案中有用資料的占比以及資料壓縮被觸發的時間将資料壓縮的操作分為如下兩種類型:

Minor GC

預設觸發的時間為每 1 小時觸發一次,可以通過 minorCompactionInterval 來自定義每一次 minor GC 觸發的時間間隔。當到達 Minor GC 觸發的時間門檻值之後,會繼續檢查目前 EntryLog 中有用資料的占比是否超過預設配置的 20%。如果沒有超過,則 Minor GC 生效,開始回收并壓縮 EntryLog 中的資料。如果超過門檻值,那麼 Minor GC 不會被觸發。可以通過 minorCompactionThreshold 來自定義 Minor GC 中有用資料的占比達到多少之後不會繼續觸發 Minor GC。為了避免 Minor GC 執行占用太多的時間,也可以通過 minorCompactionMaxTimeMillis 的參數來控制目前 Minor GC 最大允許執行的時間是多少。當 minorCompactionMaxTimeMillis <= 0 時,垃圾回收線程會一直執行直到掃描完成目前 Ledger 目錄下所有的 Entry Log 檔案。

Major GC

預設觸發的時間為每 24 小時觸發一次,可以通過 majorCompactionInterval 來自定義每一次 major GC 觸發的時間間隔。當到達 Major GC 觸發的時間門檻值之後,會繼續檢查目前 EntryLog 中有用資料的占比是否超過預設配置的 80%。如果沒有超過,則 Major GC 生效,開始回收并壓縮 EntryLog 中的資料。如果超過門檻值,那麼 Major GC 不會被觸發。可以通過 majorCompactionThreshold 來自定義 Major GC 中有用資料的占比達到多少之後不會繼續觸發 Major GC。為了避免 Minor GC 執行占用太多的時間,也可以通過 majorCompactionMaxTimeMillis 的參數來控制目前 Major GC 最大允許執行的時間是多少。當 majorCompactionMaxTimeMillis <= 0 時,垃圾回收線程會一直執行直到掃描完成目前 Ledger 目錄下所有的 Entry Log 檔案。

注意: minorCompactionThreshold 和 majorCompactionThreshold 的最大值不可以超過 100%,當 minorGC 和 majorGC 同時配置時,MinorGC 的 minorCompactionInterval 和 minorCompactionThreshold 要求必須小于 MajorGC 中指定的門檻值。

為什麼需要引入壓縮有用占比門檻值?

當做資料壓縮回收時,我們預設分别為 Minor GC 和 Major GC 引入了資料有用占比的門檻值,這樣做的目的是為了避免每次垃圾回收線程運作時,都會去頻繁的掃描所有的 EntryLog 檔案。當一個 EntryLog 檔案中有用資料的占比超過 Major GC 指定的門檻值,那麼可以認為目前 EntryLog 中絕大部分資料仍然為有效的資料。這種情況下我們無需繼續為了回收剩下的那一點無效資料,然後将該 EntryLog 中的資料從原始的 EntryLog 檔案中再寫入新的 EntryLog 檔案中,這樣可以大幅度的節省磁盤 I/O。

Bookie 壓縮方式

目前,Bookie 提供了如下兩種資料壓縮的方式:

按照 Entries 的數量

預設情況下,Bookie 是通過 Entries 的數量進行壓縮,預設值為 1000,即每次最大壓縮 1000 條 Entry。可以通過 compactionRateByEntries 自定義每次壓縮 Entries 的數量。

按照 Entries 大小

Bookie 按照 Entries 的大小進行壓縮,可以通過 compactionRateByBytes 自定義每次回收最大允許被回收 Entries 的大小。當想要使用該壓縮方式時,需要在 Bookie 的配置檔案中同時打開如下配置:isThrottleByBytes=true。

注意:生産環境中建議使用按照 Entries 大小壓縮的方式,這個取決于 Entry 被打包的方式。對于 Pulsar 來說,普通消息和 Batch 消息都會被當作一條 Entry 來看待,這就可能會導緻每一條 Entry 的大小都不一樣。如果按照 Entries 的數量來回收,即每次回收的資料大小是不一緻的,如果單個 Entry 過大,有可能導緻回收期間占用較大的磁盤 IO,影響正常資料的讀寫IO,造成抖動的現象發生。

Bookie GC 觸發的方式

目前 Bookie 的 GC 操作支援如下兩種觸發方式:

自動觸發

Bookie 的 GC 回收線程按照 Bookie 壓縮類型小節中介紹的方式,按照特定的時間間隔及門檻值周期性的執行資料壓縮回收的操作。

手動觸發

Bookie 支援了 REST API 的 HTTP 服務,允許使用者通過手動的方式觸發 GC,使用方式如下:

curl -X PUT http://127.0.0.1:8000/api/v1/bookie/gc

  • IP: 即為目前 Bookie 的 IP 位址
  • Port:示例中的 8000 端口為 Bookie 配置檔案中 httpServerPort 指定的端口,預設為 8000。

執行完成之後,也可以通過如下請求檢查 GC 的狀态等資訊:

curl http://127.0.0.1:8000/api/v1/bookie/gc_details

Output:

[ {  "forceCompacting" : false,    "majorCompacting" : false,    "minorCompacting" : false,    "lastMajorCompactionTime" : 1662436000016,    "lastMinorCompactionTime" : 1662456700007,    "majorCompactionCounter" : 11,    "minorCompactionCounter" : 99}]           

複制

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

Bookie GC 回收代碼分析

Bookie 回收的代碼邏輯主要在 org.apache.bookkeeper.bookie.GarbageCollectorThread 類中的 runWithFlags() 方法, 主要的回收邏輯包含如下三個函數:

  • doGcLedgers()
  • doGcEntryLogs()
  • doCompactEntryLogs()

在了解 Bookie GC 的回收邏輯中,我們首先需要介紹幾個關鍵的集合:

  • LedgerRangeIterator: 該接口為一個 LedgerRange 的疊代器,用來存儲從 Meta Store (zookeeper)中存儲的所有 Ledgers 的資訊。
  • ledgerIndex:是 LedgerStorage (RocksDB)中所有 Ledger 掃描出來的一個集合。
  • ledgersMap:每一個 EntryLog 對應一個 ledgersMap,表示目前 EntryLog 中存儲的所有 Ledgers 的集合。
  • entryLogMetaMap:每一個 Ledger 盤擁有一個 entryLogMetaMap 對象,是目前 Ledger 資料盤下所有 EntryLogID -> EntryLogMeta 的一個緩存。

目前,Bookie 的索引存儲支援了多種方式,預設使用的是 SortedLedgerStorage,可以在 Bookie 的配置檔案中通過 ledgerStorageClass 來指定具體需要使用的索引存儲方式,一般推薦使用的配置如下:

ledgerStorageClass=org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage           

複制

是以在下面的代碼詳解中,我們以 DbLedgerStorage 為例。

doGcLedgers()

在 doGcLedgers() 中,代碼邏輯主要如下:

1. 首先從 RocksDB 中擷取目前資料盤目錄下所有的 Ledgers 資料,并使用 NavigableSet 集合暫存目前活躍的 Ledgers 清單。

NavigableSetbkActiveLedgers = Sets.newTreeSet(ledgerStorage.getActiveLedgersInRange(0, Long.MAX_VALUE));           

複制

2. 通過 ledgerManager 對象,擷取 meta store (預設:zookeeper)中所有 Ledgers 的 Range,暫存在 LedgerRangeIterator 疊代器中。

LedgerRangeIterator ledgerRangeIterator = ledgerManager                    .getLedgerRanges(zkOpTimeoutMs);           

複制

3. 定義一個 Set 集合,來緩存從 zookeeper 中擷取的 LedgerRangeIterator。

if (ledgerRangeIterator.hasNext()) {    LedgerRange lRange = ledgerRangeIterator.next();    ledgersInMetadata = lRange.getLedgers();    // 當第一次進來以後,就可以擷取到目前批次中最大的那個 Ledgers 的索引是多少    end = lRange.end();} else {    // 如果從 zk 中擷取到的 Ledgers 疊代器是空的或者已經疊代完所有的 Ledgers,則重置 done 标記,退出循環。    ledgersInMetadata = new TreeSet<>();    end = Long.MAX_VALUE;    done = true;}           

複制

這裡将 LedgerRangeIterator 的疊代器轉化為 ledgersInMetadata 的 Set 集合主要是為了第四步可以做 subSet 的操作。

4. 以 RocksDB 中擷取到的 Ledgers 集合為标準,對從 zookeeper 中擷取的 Ledgers 清單做 subSet 的操作。

IterablesubBkActiveLedgers = bkActiveLedgers.subSet(start, true, end, true);           

複制

其中 start 位置為 0,end 的位置為 LedgerRangeIterator 疊代器最後的一個位置。因為上述兩個 Set 在預先都是做過排序操作的,是以在這裡可以直接進行 subSet 的操作。

5. 拿第四步擷取到的 subBkActiveLedgers 與 zookeeper 中的 ledgersInMetadata 集合比較,判斷 zookeeper 中是否還包含目前 LedgerID,如果不包含代表可以從 Bookie 的 RocksDB 索引中删除目前 LedgerID 的資訊。

// 疊代 subBkActiveLedgers 的集合            for (Long bkLid : subBkActiveLedgers) {                 // 以 zk 為标準                 if (!ledgersInMetadata.contains(bkLid)) {                 ....                 // 清理指定的 Ledger ID                 // 這個Ledger在 Bookie 中有,在 zk上沒有,則删除。                 garbageCleaner.clean(bkLid);              }            }           

複制

這裡以 zookeeper 中擷取到的 ledgersInMetadata 為基準是因為,在 Pulsar 中當資料寫入的時候是先去 zookeeper 節點注冊一個臨時的 zk-node 來存儲目前 LedgerID 的相關中繼資料資訊,然後再去 RocksDB 中寫入 LedgerID 的存儲索引資訊,然後将 LedgerID 的 Entry 資料寫入到 EntryLog 中。删除操作也是同樣的道理,當使用者在 Pulsar 中使用的 Topic 中,有 Ledger 符合删除條件時,會去調用 ManagedLedger 的接口去 zookeeper 中删除 LedgerID 的 zk-node。可以看到,無論是讀寫,對于 Bookie 的 Client 來說,都是優先操作 Bookie-Zk 中的 Ledgers 資訊。是以對于删除操作而言我們也是以 zookeeper 中的 Ledgers Set 集合為基準,來檢查 RocksDB 的索引存儲中有哪些 LedgerID 是可以删除的。

6. 調用 GarbageCleaner 的接口去 RocksDB 的 ledgerIndex 中删除指定的 LedgerID

初始化 garbageCleaner 接口并實作 clean 方法,在 clean 方法中調用 DbLedgerStorage 的 deleteLedger 接口。

this.garbageCleaner = ledgerId -> {        try {            if (LOG.isDebugEnabled()) {                LOG.debug("delete ledger : " + ledgerId);            }            gcStats.getDeletedLedgerCounter().inc();            // 調用 DbLedgerStorage 去删除接口            ledgerStorage.deleteLedger(ledgerId);         } catch (IOException e) {             LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);         }    };           

複制

去 ledgerIndex 的緩存中删除目前的 LedgerID。

@Override    public void deleteLedger(long ledgerId) throws IOException {        ...        entryLocationIndex.delete(ledgerId);        ledgerIndex.delete(ledgerId);        ....    }           

複制

可以看到 doGcLedgers() 函數主要是以 zookeeper 的 Ledgers 集合為基準,去對比 RocksDB 的 ledgerIndex 索引存儲中删除待删除的 Ledgers。

doGcEntryLogs()

在 doGcEntryLogs() 中,代碼邏輯主要如下:

1. 疊代 entryLogMetaMap 擷取目前資料盤目錄下所有的 EntryLog 資訊。

entryLogMetaMap.forEach((entryLogId, meta) -> {           ...           });           

複制

2. 以 RocksDB 中的 ledgerIndex 緩存為基準,判斷目前 EntryLog 中是否有可以删除的 Ledger。

removeIfLedgerNotExists(meta);           

複制

EntryLogMetadata 中的 removeLedgerIf() 方法的參數為 LongPredicate ,本質是通過 ledgerIndex 中是否存在目前 LedgerID,如果不存在則 LongPredicate 的 test() 方法為true,該 Ledger 可以删除。

private void removeIfLedgerNotExists(EntryLogMetadata meta) {        // 這個 ledger 是否可以删除,取決于目前這個 Ledger 是否在 ledgerIndex 的集合中存在        meta.removeLedgerIf((entryLogLedger) -> {            // Remove the entry log ledger from the set if it isn't active.            try {                // ledgerStorage為專門為壓縮定制的 CompactableLedgerStorage,繼承了 LedgerStorage 接口                return !ledgerStorage.ledgerExists(entryLogLedger);            } catch (IOException e) {                LOG.error("Error reading from ledger storage", e);                return false;            }        });    }           

複制

而 removeLedgerIf() 方法本身操作的是 EntryLogMeta 中  ledgersMap 的這個集合,删除操作也是基于 ledgerIndex 判斷是否可以從 ledgersMap 中删除 LedgerID。

public void removeLedgerIf(LongPredicate predicate) {        ledgersMap.removeIf((ledgerId, size) -> {            boolean shouldRemove = predicate.test(ledgerId);            if (shouldRemove) {                remainingSize -= size;            }            return shouldRemove;        });    }           

複制

3. 通過第二步的删除操作,在這裡去判斷目前 EntryLog 中是否所有的 LedgerID 都已經被删除,如果都删除了,則我們可以直接将這個 EntryLog 從資料盤中删除。

// 判斷 EntryLog Meta 中的 ledgersMap 對象是否還有元素。            if (meta.isEmpty()) {                // This means the entry log is not associated with any active ledgers anymore.                // We can remove this entry log file now.                LOG.info("Deleting entryLogId " + entryLogId + " as it has no active ledgers!");                // 當目前的 EntryLog 中沒有任何 Ledgers 對象時,直接調用删除 EntryLog 的接口進行删除操作。                removeEntryLog(entryLogId);                gcStats.getReclaimedSpaceViaDeletes().add(meta.getTotalSize());            }           

複制

可以看到,在doGcEntryLogs 函數中,主要是以 ledgerIndex 為基準,操作每一個 EntryLog 中的 ledgesMap 對象,判斷 Ledger 是否可以删除。如果目前 EntryLog 的所有 Ledger 都可以删除,則直接删除 EntryLog 檔案。如果有一部分 Ledger 可以删除,一部分 Ledger 無法删除,則進入 doCompactEntryLogs() 函數的處理邏輯中。

doCompactEntryLogs

在 doCompactEntryLogs() 中,代碼主要邏輯如下:

1. 構造 entryLogMetaMap 的臨時對象 logsToCompact,并按照使用率對其排序:

ListlogsToCompact = new ArrayList();        // 開始之前首先把本地緩存的 entryLogMetaMap 都添加進來        logsToCompact.addAll(entryLogMetaMap.values());        // 按照使用率做一個排序        logsToCompact.sort(Comparator.comparing(EntryLogMetadata::getUsage));           

複制

2. 疊代entryLogMetaMap的臨時對象:logsToCompact。

for (EntryLogMetadata meta : logsToCompact) {            ...            // 真正觸發回收的核心邏輯            compactEntryLog(meta);        }           

複制

3. 調用 scanEntryLog() 方法,開始掃描 EntryLog 檔案。

            // 掃描指定的 EntryLog 檔案            entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), scannerFactory.newScanner(entryLogMeta));           

複制

掃描 EntryLog 檔案中可以簡單梳理為如下三個邏輯:

3.1 如何掃描 EntryLog 檔案

要了解 EntryLog 檔案是如何被掃描出來的,我們首先要去看 Entry 是如何被寫入 EntryLog 檔案中的。首先每一個 EntryLog 都有 1024 個位元組 EntryLog Header 資訊,主要包含如下内容:

  • Fingerprint(指紋資訊): 4 bytes "BKLO"

    在預配置設定 EntryLog 的時候,就固定的将4位元組的簽名資訊寫入。

  • Log file HeaderVersion: 4 bytes

    有兩個版本:HEADER_V0 和 HEADER_V1,目前 EntryLog 的版本為 HEADER_V1。

  • Ledger map offset:8 bytes
  • Ledgers Count:4 bytes

是以在掃描 EntryLog 檔案時,我們首先跳過目前 EntryLog 的 Header 資訊:

        // Start the read position in the current entry log file to be after        // the header where all of the ledger entries are.        long pos = LOGFILE_HEADER_SIZE;           

複制

之後會繼續寫入 4 位元組的 entrySize 以及 8 位元組的 LedgerID,是以掃描的時候也需要按照這種格式将 entrySize 和 LedgerID 分别讀取出來,然後依據 entrySize 的大小,繼續向後讀取出 Entry 真正的内容。

// Buffer where to read the entrySize (4 bytes) and the ledgerId (8 bytes)        ByteBuf headerBuffer = Unpooled.buffer(4 + 8);                    while (true) {...                long offset = pos;                pos += 4;                int entrySize = headerBuffer.readInt();                long ledgerId = headerBuffer.readLong();                headerBuffer.clear();                // 調用 scanner 的 accept() 方法                if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {                // skip this entry                pos += entrySize;                continue;                }                // read the entry                data.clear();                if (entrySize <= 0) {                LOG.warn("bad read for ledger entry from entryLog {}@{} (entry size {})",                entryLogId, pos, entrySize);                return;                }                data.capacity(entrySize);                // process the entry                // 調用 scanner 的 process() 方法,将 entry 寫入新的 EntryLog 中                scanner.process(ledgerId, offset, data);                // Advance position to the next entry                pos += entrySize;            }           

複制

如此往複,不斷的将 EntryLog 中的每一條 Entry 依次讀取出來。

3.2 accept 接口

accept 接口主要用來判斷目前的 LedgerID 是否還在 EntryLog 檔案中,即是否還在 ledgersMap 中存在。

@Override                public boolean accept(long ledgerId) {                return meta.containsLedger(ledgerId);                }           

複制

3.3 process 接口

process 接口主要用來将無法删除的 Entry 寫入到新的 EntryLog 檔案中,并記錄 Entry 對應的 offset 資訊。

@Override                public void process(final long ledgerId, long offset, ByteBuf entry) throws IOException {    ...                    long newoffset = entryLogger.addEntry(ledgerId, entry);                    offsets.add(new EntryLocation(ledgerId, entryId, newoffset));                }           

複制

4. 調用 flush 方法,更新新 EntryLog 檔案的索引資訊

// 強制把寫入的資料重新整理到磁盤上去,重新整理的時候會同時更新索引資訊,以便 broker 下次讀取消息的時候,可以去新的 EntryLog 中去讀取。            scannerFactory.flush();           

複制

flush() 方法主要是将上述無法删除的 Entry 寫入新 EntryLog 中的位點資訊調用 DbLedgerStorage  的接口更新到 RocksDB 中去。

void flush() throws IOException {            if (offsets.isEmpty()) {                if (LOG.isDebugEnabled()) {                    LOG.debug("Skipping entry log flushing, as there are no offset!");                }                return;            }                        // Before updating the index, we want to wait until all the compacted entries are flushed into the            // entryLog            try {                entryLogger.flush();                // 更新新 EntryLog 檔案中 offsets 的資訊。                ledgerStorage.updateEntriesLocations(offsets);                ledgerStorage.flushEntriesLocationsIndex();            } finally {                offsets.clear();            }        }           

複制

5. 删除原始的 EntryLog 檔案

// 移除掉原先舊的EntryLog檔案            logRemovalListener.removeEntryLog(entryLogMeta.getEntryLogId());           

複制

上述的代碼邏輯描述了對于單個資料盤目錄下 EntryLog 完整的回收邏輯。對于多個資料盤目錄的場景,每一個資料盤目錄都會建立一個單獨的 GarbageCollectorThread 的線程來運作上述的邏輯。

EntryLog 檔案的大小如何控制

在 Ledger 的資料盤目錄中可以看到,每一個 EntryLog 檔案的大小都固定為 1GB 左右,當達到這個大小時,EntryLog 檔案就會滾動建立新的 EntryLog 檔案來寫入。這是因為預設設定的 EntryLog 大小為 1GB,具體如下:

/**     * Set the max log size limit to 1GB. It makes extra room for entry log file before        * hitting hard limit '2GB'. So we don't need to force roll entry log file when flushing          * memtable (for performance consideration)          */         public static final long MAX_LOG_SIZE_LIMIT = 1 * 1024 * 1024 * 1024;           

複制

reachEntryLogLimit() 方法用來檢查是否 EntryLog 檔案達到指定的大小:

boolean reachEntryLogLimit(BufferedLogChannel logChannel, long size) {            if (logChannel == null) {            return false;                }                return logChannel.position() + size > logSizeLimit;        }           

複制

使用者也可以通過如下參數自定義 EntryLog 檔案的大小:

logSizeLimit           

複制

如何計算 EntryLog 檔案的使用率

在 doCompactEntryLogs 章節中可以看到,在疊代 entryLogMetaMap 時,依據 EntryLog 的使用率對 EntryLog 進行了排序。EntryLog 的使用率主要通過 EntryLog Metadata 中的如下兩個字段進行計算的:

  • private long totalSize; // 總大小
  • private long remainingSize; // 剩餘大小

在資料寫入 EntryLog (ledgersMap) 的過程中會同時增加 totalSize 和 remainingSize 這兩個字段:

// 往 ledgersMap 中新增元素        public void addLedgerSize(long ledgerId, long size) {            totalSize += size;                remainingSize += size;                ledgersMap.addAndGet(ledgerId, size);        }           

複制

當在做資料壓縮時,如果判斷某一個 LedgerID 可以從 ledgersMap 中删除時,會從 remainingSize 中減去目前 Ledger 的 size:

public void removeLedgerIf(LongPredicate predicate) {            ledgersMap.removeIf((ledgerId, size) -> {                        boolean shouldRemove = predicate.test(ledgerId);                        if (shouldRemove) {                            remainingSize -= size; // 減去目前 ledger 的大小                    }                         return shouldRemove;                 });         }           

複制

是以在計算 EntryLog 的使用率時,拿目前 remainingSize/totalSize 即可計算出 EntryLog 檔案中目前剩餘的有效資料的比率是多少:

public double getUsage() {            if (totalSize == 0L) {                    return 0.0f;                 }                 return (double) remainingSize / totalSize;         }           

複制

minor GC 與 major GC 執行資料回收的邏輯是完全一緻的,EntryLog 中有效資料的使用率也是用來區分是否為 minor GC 或者 major GC 的關鍵點。

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

one more thing

TDMQ是騰訊基于 Apache Pulsar 自研的一個雲原生消息中間件系列,其中包含相容Pulsar、RabbitMQ、RocketMQ 等協定的消息隊列子産品,得益于其底層計算與存儲分離的架構,TDMQ 具備良好的彈性伸縮以及故障恢複能力。歡迎大家檢視并使用:

https://cloud.tencent.com/product/tdmq

福利時間

您對Apache Pulsar還有什麼想要了解的?

評論區留言并分享文章至朋友圈

我們将在精選留言中随機抽送

騰訊公仔

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

往期

推薦

《騰訊雲微服務引擎 TSE 産品動态》

《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》

《雲原生時代的Java應用優化實踐》

《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》

《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》

《SpringBoot應用優雅接入北極星PolarisMesh》

《Serverless可觀測性的價值》

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

掃描下方二維碼關注本公衆号,

了解更多微服務、消息隊列的相關資訊!

解鎖超多鵝廠周邊!

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》
Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

戳原文,檢視更多消息隊列TDMQ Pulsar 版的資訊!

Apache Pulsar 系列 —— 深入了解 Bookie GC 回收機制《騰訊雲微服務引擎 TSE 産品動态》《千億級、大規模:騰訊超大 Apache Pulsar 叢集性能調優實踐》《雲原生時代的Java應用優化實踐》《全面相容Eureka:PolarisMesh(北極星)釋出1.5.0版本》《全面擁抱Go社群:PolarisMesh全功能對接gRPC-Go | PolarisMesh12月月報》《SpringBoot應用優雅接入北極星PolarisMesh》《Serverless可觀測性的價值》

點個在看你最好看