Compaction是MOR表的一項核心機制,Hudi利用Compaction将MOR表産生的Log File合并到新的Base File中。本文我們會通過Notebook介紹并示範Compaction的運作機制,幫助您了解其工作原理和相關配置。
1. 運作 Notebook
本文使用的Notebook是:《Apache Hudi Core Conceptions (4) - MOR: Compaction》[1],對應檔案是:4-mor-compaction.ipynb,請先修改Notebook中的環境變量
S3_BUCKET
,将其設為您自己的S3桶,并確定用于資料準備的Notebook:《Apache Hudi Core Conceptions (1) - Data Preparation》[2]已經至少執行過一次。Notebook使用的Hudi版本是
0.12.1
,Spark叢集建議配置32 vCore / 128 GB及以上。
2. 核心概念
Compaction負責定期将一個File Slice裡的Base File和從屬于它的所有Log File一起合并寫入到一個新的Base File中(産生新的File Slice),唯有如此,MOR表的日志檔案才不至于無限膨脹下去。以下是與Compaction有關的幾項重要配置,在後面的介紹中我們會逐一介紹它們的作用:
配置項 | 預設值 |
hoodie.compact.inline[3] | false |
hoodie.compact.schedule.inline[4] | false |
hoodie.compact.inline.max.delta.commits[5] | 5 |
2.1. 排期與執行
Compaction的運作機制包括:排期(Schedule)和執行(Execute)兩個階段。排期階段的主要工作是劃定哪些File Slices将參與Compaction,然後生成一個計劃(Compaction Plan)儲存到Timeline裡,此時在Timeline裡會出現一個名為
compaction
的Instant,狀态是
REQUESTED
;執行階段的主要工作是讀取這個計劃(Compaction Plan)并執行它,執行完畢後,Timeline中的
compaction
就會變成
COMPLETED
狀态。
2.2. 同步與異步
從運作模式上看,Compaction又分同步、異步以及半異步三種模式(“半異步”模式是本文使用的一種叫法,為的是和同步、異步兩種模式的稱謂對齊,Hudi官方文檔對這一模式有介紹,但沒有給出命名),它們之間的差異主要展現在從(達到規定門檻值的某次)送出(Commit)到排期(Schedule)再到執行(Execute)三個階段的推進方式上。在Hudi的官方文檔中,交替使用了Sync/Async和Inline/Offline兩組詞彙來描述推進方式,這兩組詞彙是有微妙差異的,為了表述嚴謹,我們使用同步/異步和立即/另行這兩組中文術語與之對應。以下是Compaction三種運作模式的詳細介紹:
- • 同步模式(Inline Schedule,Inline Execute)
同步模式可概括為:立即排期,立即執行(Inline Schedule,Inline Execute)。在該模式下,當累積的增量送出(
deltacommit
)次數到達一個門檻值時,會立即觸發Compaction的排期與執行(排期和執行是連在一起的),這個門檻值是由配置項
hoodie.compact.inline.max.delta.commits
[6] 控制的,預設值是
5
,即:預設情況下,每5次增量送出就會觸發并執行一次Compaction。鎖定同步模式的配置是:
配置項 | 設定值 |
hoodie.compact.inline[7] | true |
hoodie.compact.schedule.inline[8] | false |
- • 異步模式(Offline Schedule,Offline Execute)
異步模式可概括為:另行排期,另行執行(Offline Schedule,Offline Execute)。在該模式下,任何送出都不會直接觸發和執行Compaction,除非使用了支援異步Compaction的Writer,否則使用者需要自己保證有一個獨立的程序或線程負責定期執行Compaction操作。Hudi提供了四種運作異步Compaction的方式:
- 1. 通過hudi-cli或送出Spark作業驅動異步Compaction
- 2. 送出Flink作業驅動異步Compaction
- 3. 在HoodieDeltaStreamer中配置并運作異步Compaction
- 4. 在Spark Structured Streaming中配置并運作異步Compaction
在後面的測試用例中,我們将使用第一種方式示範如何進行異步的Compaction排期與執行。和同步模式一樣,在異步模式下,同樣是當增量送出(
deltacommit
)次數達到一定的門檻值時才會觸發排期,這個門檻值依然是
hoodie.compact.inline.max.delta.commits
[9]。
異步模式面臨的場景要比同步模式複雜一些,同步模式下,每次送出時都會檢查累積的送出次數是否已達規定門檻值,是以在同步模式下,每次排期涵蓋的增量送出數量基本是固定的,就是門檻值設定的次數,但是在異步模式下,由于發起排期和增量送出之間沒有必然的協同關系,是以在發起排期時,Timeline中可能尚未積累到足夠數量的增量送出,或者增量送出數量已經超過了規定門檻值,如果是前者,不會産生排期計劃,如果是後者,排期計劃會将所有累積的增量送出涵蓋進來。鎖定異步模式的配置是:
配置項 | 設定值 |
hoodie.compact.inline[10] | false |
hoodie.compact.schedule.inline[11] | false |
- • 半異步模式(Inline Schedule,Offline Execute)
半異步模式可概括為:立即排期,另行執行(Inline Schedule,Offline Execute),即:排期會伴随增量送出(
deltacommit
)自動觸發,但執行還是通過前面介紹的四種異步方式之一去完成。鎖定半異步模式的配置是:
配置項 | 設定值 |
hoodie.compact.inline[12] | false |
hoodie.compact.schedule.inline[13] | true |
3. 同步Compaction
3.1. 關鍵配置
《Apache Hudi Core Conceptions (4) - MOR: Compaction》[14]的第1個測試用例示範了同步Compaction的運作機制。測試用的資料表有如下幾項關鍵配置:
配置項 | 預設值 | 設定值 |
hoodie.compact.inline[15] | false | true |
hoodie.compact.schedule.inline[16] | false | false |
hoodie.compact.inline.max.delta.commits[17] | 5 | 3 |
hoodie.copyonwrite.record.size.estimate[18] | 1024 | 175 |
這些配置項在介紹概念時都已提及,通過這個測試用例将會看到它們組合起來的整體效果。
3.2. 測試計劃
該測試用例會先後插入或更新三批資料,然後進行同步的Compaction排期和執行,過程中将重點觀察時間線和檔案布局的變化,整體測試計劃如下表所示:
步驟 | 操作 | 資料量(單分區) | 檔案系統 |
1 | Insert | 96MB | +1 Base File |
2 | Update | 788KB | +1 Log File |
3 | Update | 1.2MB | +1 Log File +1 Compacted Base File |
提示:我們将使用色塊辨別目前批次的Instant和對應存儲檔案,每一種顔色代表一個獨立的File Slice。
3.3. 第1批次
第1批次單分區寫入96MB資料,Hudi将其寫入到一個Parquet檔案中,第一個File Group随之産生,它也是後續 Log File的Base File。需要注意的一個細節是:對于MOR表來說,隻有進行Compaction的那次送出才會被稱為“commit”,在Compaction之前的曆次送出都被稱作“deltacommit”,即使對于建立Base File寫入資料的那次送出也是如此,就如同這裡一樣。
3.4. 第2批次
第2批次更新了一小部分資料,Hudi将更新資料寫入到了Log檔案中,大小788KB,fileVersion是1,它從屬于上一步生成的Parquet檔案,即Parquet檔案是它的Base File ,這個Log檔案的fileId和尾部的時間戳(baseCommitTime)與Parquet檔案是一樣的。目前的Parquet檔案和Log檔案組成了一個File Slice。
3.5. 第3批次
第3批次再次更新了一小部分資料,Hudi将更新資料又寫入到一個Log檔案中,大小1.2MB,fileVersion是2。與上一個Log檔案一樣,fileId和尾部的時間戳(baseCommitTime)與Parquet檔案一緻,是以它也是Parquet檔案的Delta Log,且按Timeline排在上一個Log檔案之後。目前的File Slice多了一個新的Log檔案。但是,不同于第2批次,第3批次的故事到這裡還沒有結束,在該測試用例中,目前測試表的設定是:每三次deltacommit會觸發一次Compaction,是以,第3次操作後就觸發了第1次的Compaction操作:
于是,在Timeline上出現了一個commit(No.3),同時,在檔案系統上,生成了一個新的96MB的Parquet檔案,它是第一個Parquet檔案連同它的兩個Log檔案重新壓縮後得到的,這個新的Parquet檔案fileId沒變,但是instantTime變成了Compaction對應的commit時間,于是,在目前File Group裡,第二個File Slice産生了,目前它還隻有一個Base File,沒有Log File。
3.6. 複盤
最後,讓我們将此前的全部操作彙總在一起,重新看一下整體的時間線和最後的檔案布局:
4. 異步Compaction
4.1. 關鍵配置
《Apache Hudi Core Conceptions (4) - MOR: Compaction》[19]的第2個測試用例示範了異步Compaction的運作機制。測試用的資料表有如下幾項關鍵配置:
配置項 | 預設值 | 設定值 |
hoodie.compact.inline[20] | false | false |
hoodie.compact.schedule.inline[21] | false | false |
hoodie.compact.inline.max.delta.commits[22] | 5 | 3 |
hoodie.copyonwrite.record.size.estimate[23] | 1024 | 175 |
這些配置項在介紹概念時都已提及,通過這個測試用例将會看到它們組合起來的整體效果。
4.2. 測試計劃
該測試用例會先後插入或更新三批資料,然後進行異步的Compaction排期和執行,過程中将重點觀察時間線和檔案布局的變化,整體測試計劃如下表所示:
步驟 | 操作 | 資料量(單分區) | 檔案系統 |
1 | Insert | 96MB | +1 Base File |
2 | Update | 788KB | +1 Log File |
3 | Update | 1.2MB | +1 Log File |
4 | Offline Schedule | N/A | N/A |
5 | Offline Execute | 96.15MB | +1 Compacted Base File |
由于該測試用例的前三步操作與第3節(第1個測試用例)完全一緻,是以不再贅述,我們會從第4步操作(Notebook的3.8節)開始解讀。
4.3. 異步排期
在完成了和第3節完全一樣的前三批操作後,時間線和檔案系統的情形如下:
這和3.5節執行後的狀況非常不同,沒有發生Compaction,連排期也沒有看到,因為我們關閉了
hoodie.compact.inline
[24]。于是,在接下來的第4步操作中(Notebook的3.8節),我們通過
spark-submit
手動發起了一個排期作業(
--mode 'schedule'
):
sudo -u hadoop spark-submit \
--jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
--class 'org.apache.hudi.utilities.HoodieCompactor' \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--spark-memory '4g' \
--mode 'schedule' \
--base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
--table-name "$TABLE_NAME" \
--hoodie-conf "hoodie.compact.inline.max.delta.commits=3"
執行後,檔案布局沒有變化,但是在時間線中出現了一個狀态為
REQUESTED
的
compaction
:
4.4. 異步執行
第5步操作(Notebook的3.9節)通過
spark-submit
手動發起了一個執行作業(
--mode 'execute'
):
sudo -u hadoop spark-submit \
--jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
--class "org.apache.hudi.utilities.HoodieCompactor" \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--spark-memory '4g' \
--mode 'execute' \
--base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
--table-name "$TABLE_NAME"
執行後,原
compaction
狀态由
REQUESTED
變為
COMPLETED
,原Base File和兩個Log File被合并打包成一個新的Base File檔案,大小96MB:
4.5. 異步排期 + 異步執行
異步的排期和執行可以通過一個指令一步完成,《Apache Hudi Core Conceptions (4) - MOR: Compaction》[25]的第3個測試用例示範了這一操作。它的前三步操作與第2個測試用例一樣,在第四步時,使用了“Schedule + Execute”一起執行的方式(
--mode 'scheduleAndExecute'
)一步完成了Compaction操作,指令如下:
sudo -u hadoop spark-submit \
--jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
--class 'org.apache.hudi.utilities.HoodieCompactor' \
/usr/lib/hudi/hudi-utilities-bundle.jar \
--spark-memory '4g' \
--mode 'scheduleAndExecute' \
--base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
--table-name "$TABLE_NAME" \
--hoodie-conf "hoodie.compact.inline.max.delta.commits=3"
5. 半異步Compaction
5.1. 關鍵配置
《Apache Hudi Core Conceptions (4) - MOR: Compaction》[26]的第4個測試用例示範了半異步Compaction的運作機制。測試用的資料表有如下幾項關鍵配置:
配置項 | 預設值 | 設定值 |
hoodie.compact.inline[27] | false | false |
hoodie.compact.schedule.inline[28] | false | true |
hoodie.compact.inline.max.delta.commits[29] | 5 | 3 |
hoodie.copyonwrite.record.size.estimate[30] | 1024 | 175 |
這些配置項在介紹概念時都已提及,通過這個測試用例将會看到它們組合起來的整體效果。
5.2. 測試計劃
該測試用例會先後插入或更新三批資料,然後進行異步的Compaction Execute,過程中将重點觀察時間線和檔案布局的變化,整體測試計劃如下表所示:
步驟 | 操作 | 資料量(單分區) | 檔案系統 |
1 | Insert | 96MB | +1 Base File |
2 | Update | 788KB | +1 Log File |
3 | Update | 1.2MB | +1 Log File |
4 | Offline Execute | 96.15MB | +1 Compacted Base File |
由于該測試用例的前三步操作與第3節(第1個測試用例)完全一緻,是以不再贅述,我們會從第3步操作(Notebook的5.7節)開始解讀。
5.3. 同步排期
在完成了和第3節完全一樣的前三批操作後,時間線和檔案系統的情形如下:
在該模式下,第3次送出自動觸發了Compaction排期,狀态為
REQUESTED
。
5.4. 異步執行
在接下來的第4步操作中,通過
spark-submit
手動發起了一個執行作業,排期計劃被consume,原
REQUESTED
狀态的Compaction變成了
COMPLETED
:
關于作者:耿立超,架構師,著有 《大資料平台架構與原型實作:資料中台建設實戰》[31]一書,多年IT系統開發和架構經驗,對大資料、企業級應用架構、SaaS、分布式存儲和領域驅動設計有豐富的實踐經驗,個人技術部落格:https://laurence.blog.csdn.net
引用連結
[1]
《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb
[2]
《Apache Hudi Core Conceptions (1) - Data Preparation》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/1-data-preparation.ipynb
[3]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[4]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[5]
hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits
[6]
hoodie.compact.inline.max.delta.commits
: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits
[7]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[8]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[9]
hoodie.compact.inline.max.delta.commits
: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits
[10]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[11]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[12]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[13]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[14]
《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb
[15]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[16]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[17]
hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits
[18]
hoodie.copyonwrite.record.size.estimate: https://hudi.apache.org/docs/configurations/#hoodiecopyonwriterecordsizeestimate
[19]
《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb
[20]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[21]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[22]
hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits
[23]
hoodie.copyonwrite.record.size.estimate: https://hudi.apache.org/docs/configurations/#hoodiecopyonwriterecordsizeestimate
[24]
hoodie.compact.inline
: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[25]
《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb
[26]
《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb
[27]
hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline
[28]
hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline
[29]
hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits
[30]
hoodie.copyonwrite.record.size.estimate: https://hudi.apache.org/docs/configurations/#hoodiecopyonwriterecordsizeestimate
[31]
《大資料平台架構與原型實作:資料中台建設實戰》: https://item.jd.com/12677623.html