天天看點

探索Apache Hudi核心概念 (3) - Compaction

作者:Lakehouse

Compaction是MOR表的一項核心機制,Hudi利用Compaction将MOR表産生的Log File合并到新的Base File中。本文我們會通過Notebook介紹并示範Compaction的運作機制,幫助您了解其工作原理和相關配置。

1. 運作 Notebook

本文使用的Notebook是:《Apache Hudi Core Conceptions (4) - MOR: Compaction》[1],對應檔案是:4-mor-compaction.ipynb,請先修改Notebook中的環境變量

S3_BUCKET

,将其設為您自己的S3桶,并確定用于資料準備的Notebook:《Apache Hudi Core Conceptions (1) - Data Preparation》[2]已經至少執行過一次。Notebook使用的Hudi版本是

0.12.1

,Spark叢集建議配置32 vCore / 128 GB及以上。

2. 核心概念

Compaction負責定期将一個File Slice裡的Base File和從屬于它的所有Log File一起合并寫入到一個新的Base File中(産生新的File Slice),唯有如此,MOR表的日志檔案才不至于無限膨脹下去。以下是與Compaction有關的幾項重要配置,在後面的介紹中我們會逐一介紹它們的作用:

配置項 預設值
hoodie.compact.inline[3] false
hoodie.compact.schedule.inline[4] false
hoodie.compact.inline.max.delta.commits[5] 5

2.1. 排期與執行

Compaction的運作機制包括:排期(Schedule)和執行(Execute)兩個階段。排期階段的主要工作是劃定哪些File Slices将參與Compaction,然後生成一個計劃(Compaction Plan)儲存到Timeline裡,此時在Timeline裡會出現一個名為

compaction

的Instant,狀态是

REQUESTED

;執行階段的主要工作是讀取這個計劃(Compaction Plan)并執行它,執行完畢後,Timeline中的

compaction

就會變成

COMPLETED

狀态。

2.2. 同步與異步

從運作模式上看,Compaction又分同步、異步以及半異步三種模式(“半異步”模式是本文使用的一種叫法,為的是和同步、異步兩種模式的稱謂對齊,Hudi官方文檔對這一模式有介紹,但沒有給出命名),它們之間的差異主要展現在從(達到規定門檻值的某次)送出(Commit)到排期(Schedule)再到執行(Execute)三個階段的推進方式上。在Hudi的官方文檔中,交替使用了Sync/Async和Inline/Offline兩組詞彙來描述推進方式,這兩組詞彙是有微妙差異的,為了表述嚴謹,我們使用同步/異步和立即/另行這兩組中文術語與之對應。以下是Compaction三種運作模式的詳細介紹:

  • • 同步模式(Inline Schedule,Inline Execute)
探索Apache Hudi核心概念 (3) - Compaction

同步模式可概括為:立即排期,立即執行(Inline Schedule,Inline Execute)。在該模式下,當累積的增量送出(

deltacommit

)次數到達一個門檻值時,會立即觸發Compaction的排期與執行(排期和執行是連在一起的),這個門檻值是由配置項

hoodie.compact.inline.max.delta.commits

[6] 控制的,預設值是

5

,即:預設情況下,每5次增量送出就會觸發并執行一次Compaction。鎖定同步模式的配置是:

配置項 設定值
hoodie.compact.inline[7] true
hoodie.compact.schedule.inline[8] false
  • • 異步模式(Offline Schedule,Offline Execute)
探索Apache Hudi核心概念 (3) - Compaction

異步模式可概括為:另行排期,另行執行(Offline Schedule,Offline Execute)。在該模式下,任何送出都不會直接觸發和執行Compaction,除非使用了支援異步Compaction的Writer,否則使用者需要自己保證有一個獨立的程序或線程負責定期執行Compaction操作。Hudi提供了四種運作異步Compaction的方式:

  1. 1. 通過hudi-cli或送出Spark作業驅動異步Compaction
  2. 2. 送出Flink作業驅動異步Compaction
  3. 3. 在HoodieDeltaStreamer中配置并運作異步Compaction
  4. 4. 在Spark Structured Streaming中配置并運作異步Compaction

在後面的測試用例中,我們将使用第一種方式示範如何進行異步的Compaction排期與執行。和同步模式一樣,在異步模式下,同樣是當增量送出(

deltacommit

)次數達到一定的門檻值時才會觸發排期,這個門檻值依然是

hoodie.compact.inline.max.delta.commits

[9]。

異步模式面臨的場景要比同步模式複雜一些,同步模式下,每次送出時都會檢查累積的送出次數是否已達規定門檻值,是以在同步模式下,每次排期涵蓋的增量送出數量基本是固定的,就是門檻值設定的次數,但是在異步模式下,由于發起排期和增量送出之間沒有必然的協同關系,是以在發起排期時,Timeline中可能尚未積累到足夠數量的增量送出,或者增量送出數量已經超過了規定門檻值,如果是前者,不會産生排期計劃,如果是後者,排期計劃會将所有累積的增量送出涵蓋進來。鎖定異步模式的配置是:

配置項 設定值
hoodie.compact.inline[10] false
hoodie.compact.schedule.inline[11] false
  • • 半異步模式(Inline Schedule,Offline Execute)
探索Apache Hudi核心概念 (3) - Compaction

半異步模式可概括為:立即排期,另行執行(Inline Schedule,Offline Execute),即:排期會伴随增量送出(

deltacommit

)自動觸發,但執行還是通過前面介紹的四種異步方式之一去完成。鎖定半異步模式的配置是:

配置項 設定值
hoodie.compact.inline[12] false
hoodie.compact.schedule.inline[13] true

3. 同步Compaction

3.1. 關鍵配置

《Apache Hudi Core Conceptions (4) - MOR: Compaction》[14]的第1個測試用例示範了同步Compaction的運作機制。測試用的資料表有如下幾項關鍵配置:

配置項 預設值 設定值
hoodie.compact.inline[15] false true
hoodie.compact.schedule.inline[16] false false
hoodie.compact.inline.max.delta.commits[17] 5 3
hoodie.copyonwrite.record.size.estimate[18] 1024 175

這些配置項在介紹概念時都已提及,通過這個測試用例将會看到它們組合起來的整體效果。

3.2. 測試計劃

該測試用例會先後插入或更新三批資料,然後進行同步的Compaction排期和執行,過程中将重點觀察時間線和檔案布局的變化,整體測試計劃如下表所示:

步驟 操作 資料量(單分區) 檔案系統
1 Insert 96MB +1 Base File
2 Update 788KB +1 Log File
3 Update 1.2MB +1 Log File +1 Compacted Base File

提示:我們将使用色塊辨別目前批次的Instant和對應存儲檔案,每一種顔色代表一個獨立的File Slice。

3.3. 第1批次

第1批次單分區寫入96MB資料,Hudi将其寫入到一個Parquet檔案中,第一個File Group随之産生,它也是後續 Log File的Base File。需要注意的一個細節是:對于MOR表來說,隻有進行Compaction的那次送出才會被稱為“commit”,在Compaction之前的曆次送出都被稱作“deltacommit”,即使對于建立Base File寫入資料的那次送出也是如此,就如同這裡一樣。

探索Apache Hudi核心概念 (3) - Compaction

3.4. 第2批次

第2批次更新了一小部分資料,Hudi将更新資料寫入到了Log檔案中,大小788KB,fileVersion是1,它從屬于上一步生成的Parquet檔案,即Parquet檔案是它的Base File ,這個Log檔案的fileId和尾部的時間戳(baseCommitTime)與Parquet檔案是一樣的。目前的Parquet檔案和Log檔案組成了一個File Slice。

探索Apache Hudi核心概念 (3) - Compaction

3.5. 第3批次

第3批次再次更新了一小部分資料,Hudi将更新資料又寫入到一個Log檔案中,大小1.2MB,fileVersion是2。與上一個Log檔案一樣,fileId和尾部的時間戳(baseCommitTime)與Parquet檔案一緻,是以它也是Parquet檔案的Delta Log,且按Timeline排在上一個Log檔案之後。目前的File Slice多了一個新的Log檔案。但是,不同于第2批次,第3批次的故事到這裡還沒有結束,在該測試用例中,目前測試表的設定是:每三次deltacommit會觸發一次Compaction,是以,第3次操作後就觸發了第1次的Compaction操作:

探索Apache Hudi核心概念 (3) - Compaction

于是,在Timeline上出現了一個commit(No.3),同時,在檔案系統上,生成了一個新的96MB的Parquet檔案,它是第一個Parquet檔案連同它的兩個Log檔案重新壓縮後得到的,這個新的Parquet檔案fileId沒變,但是instantTime變成了Compaction對應的commit時間,于是,在目前File Group裡,第二個File Slice産生了,目前它還隻有一個Base File,沒有Log File。

探索Apache Hudi核心概念 (3) - Compaction

3.6. 複盤

最後,讓我們将此前的全部操作彙總在一起,重新看一下整體的時間線和最後的檔案布局:

探索Apache Hudi核心概念 (3) - Compaction

4. 異步Compaction

4.1. 關鍵配置

《Apache Hudi Core Conceptions (4) - MOR: Compaction》[19]的第2個測試用例示範了異步Compaction的運作機制。測試用的資料表有如下幾項關鍵配置:

配置項 預設值 設定值
hoodie.compact.inline[20] false false
hoodie.compact.schedule.inline[21] false false
hoodie.compact.inline.max.delta.commits[22] 5 3
hoodie.copyonwrite.record.size.estimate[23] 1024 175

這些配置項在介紹概念時都已提及,通過這個測試用例将會看到它們組合起來的整體效果。

4.2. 測試計劃

該測試用例會先後插入或更新三批資料,然後進行異步的Compaction排期和執行,過程中将重點觀察時間線和檔案布局的變化,整體測試計劃如下表所示:

步驟 操作 資料量(單分區) 檔案系統
1 Insert 96MB +1 Base File
2 Update 788KB +1 Log File
3 Update 1.2MB +1 Log File
4 Offline Schedule N/A N/A
5 Offline Execute 96.15MB +1 Compacted Base File

由于該測試用例的前三步操作與第3節(第1個測試用例)完全一緻,是以不再贅述,我們會從第4步操作(Notebook的3.8節)開始解讀。

4.3. 異步排期

在完成了和第3節完全一樣的前三批操作後,時間線和檔案系統的情形如下:

探索Apache Hudi核心概念 (3) - Compaction

這和3.5節執行後的狀況非常不同,沒有發生Compaction,連排期也沒有看到,因為我們關閉了

hoodie.compact.inline

[24]。于是,在接下來的第4步操作中(Notebook的3.8節),我們通過

spark-submit

手動發起了一個排期作業(

--mode 'schedule'

):

sudo -u hadoop spark-submit \
 --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
 --class 'org.apache.hudi.utilities.HoodieCompactor' \
 /usr/lib/hudi/hudi-utilities-bundle.jar \
 --spark-memory '4g' \
 --mode 'schedule' \
 --base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
 --table-name "$TABLE_NAME" \
 --hoodie-conf "hoodie.compact.inline.max.delta.commits=3"           

執行後,檔案布局沒有變化,但是在時間線中出現了一個狀态為

REQUESTED

compaction

探索Apache Hudi核心概念 (3) - Compaction

4.4. 異步執行

第5步操作(Notebook的3.9節)通過

spark-submit

手動發起了一個執行作業(

--mode 'execute'

):

sudo -u hadoop spark-submit \
 --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
 --class "org.apache.hudi.utilities.HoodieCompactor" \
 /usr/lib/hudi/hudi-utilities-bundle.jar \
 --spark-memory '4g' \
 --mode 'execute' \
 --base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
 --table-name "$TABLE_NAME"           

執行後,原

compaction

狀态由

REQUESTED

變為

COMPLETED

,原Base File和兩個Log File被合并打包成一個新的Base File檔案,大小96MB:

探索Apache Hudi核心概念 (3) - Compaction

4.5. 異步排期 + 異步執行

異步的排期和執行可以通過一個指令一步完成,《Apache Hudi Core Conceptions (4) - MOR: Compaction》[25]的第3個測試用例示範了這一操作。它的前三步操作與第2個測試用例一樣,在第四步時,使用了“Schedule + Execute”一起執行的方式(

--mode 'scheduleAndExecute'

)一步完成了Compaction操作,指令如下:

sudo -u hadoop spark-submit \
 --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
 --class 'org.apache.hudi.utilities.HoodieCompactor' \
 /usr/lib/hudi/hudi-utilities-bundle.jar \
 --spark-memory '4g' \
 --mode 'scheduleAndExecute' \
 --base-path "s3://${S3_BUCKET}/${TABLE_NAME}" \
 --table-name "$TABLE_NAME" \
 --hoodie-conf "hoodie.compact.inline.max.delta.commits=3"           

5. 半異步Compaction

5.1. 關鍵配置

《Apache Hudi Core Conceptions (4) - MOR: Compaction》[26]的第4個測試用例示範了半異步Compaction的運作機制。測試用的資料表有如下幾項關鍵配置:

配置項 預設值 設定值
hoodie.compact.inline[27] false false
hoodie.compact.schedule.inline[28] false true
hoodie.compact.inline.max.delta.commits[29] 5 3
hoodie.copyonwrite.record.size.estimate[30] 1024 175

這些配置項在介紹概念時都已提及,通過這個測試用例将會看到它們組合起來的整體效果。

5.2. 測試計劃

該測試用例會先後插入或更新三批資料,然後進行異步的Compaction Execute,過程中将重點觀察時間線和檔案布局的變化,整體測試計劃如下表所示:

步驟 操作 資料量(單分區) 檔案系統
1 Insert 96MB +1 Base File
2 Update 788KB +1 Log File
3 Update 1.2MB +1 Log File
4 Offline Execute 96.15MB +1 Compacted Base File

由于該測試用例的前三步操作與第3節(第1個測試用例)完全一緻,是以不再贅述,我們會從第3步操作(Notebook的5.7節)開始解讀。

5.3. 同步排期

在完成了和第3節完全一樣的前三批操作後,時間線和檔案系統的情形如下:

探索Apache Hudi核心概念 (3) - Compaction

在該模式下,第3次送出自動觸發了Compaction排期,狀态為

REQUESTED

5.4. 異步執行

在接下來的第4步操作中,通過

spark-submit

手動發起了一個執行作業,排期計劃被consume,原

REQUESTED

狀态的Compaction變成了

COMPLETED

探索Apache Hudi核心概念 (3) - Compaction

關于作者:耿立超,架構師,著有 《大資料平台架構與原型實作:資料中台建設實戰》[31]一書,多年IT系統開發和架構經驗,對大資料、企業級應用架構、SaaS、分布式存儲和領域驅動設計有豐富的實踐經驗,個人技術部落格:https://laurence.blog.csdn.net

引用連結

[1]

《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb

[2]

《Apache Hudi Core Conceptions (1) - Data Preparation》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/1-data-preparation.ipynb

[3]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[4]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[5]

hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits

[6]

hoodie.compact.inline.max.delta.commits

: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits

[7]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[8]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[9]

hoodie.compact.inline.max.delta.commits

: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits

[10]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[11]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[12]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[13]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[14]

《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb

[15]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[16]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[17]

hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits

[18]

hoodie.copyonwrite.record.size.estimate: https://hudi.apache.org/docs/configurations/#hoodiecopyonwriterecordsizeestimate

[19]

《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb

[20]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[21]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[22]

hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits

[23]

hoodie.copyonwrite.record.size.estimate: https://hudi.apache.org/docs/configurations/#hoodiecopyonwriterecordsizeestimate

[24]

hoodie.compact.inline

: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[25]

《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb

[26]

《Apache Hudi Core Conceptions (4) - MOR: Compaction》: https://github.com/bluishglc/apache-hudi-core-conceptions/blob/master/4-mor-compaction.ipynb

[27]

hoodie.compact.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactinline

[28]

hoodie.compact.schedule.inline: https://hudi.apache.org/docs/configurations/#hoodiecompactscheduleinline

[29]

hoodie.compact.inline.max.delta.commits: https://hudi.apache.org/docs/configurations/#hoodiecompactinlinemaxdeltacommits

[30]

hoodie.copyonwrite.record.size.estimate: https://hudi.apache.org/docs/configurations/#hoodiecopyonwriterecordsizeestimate

[31]

《大資料平台架構與原型實作:資料中台建設實戰》: https://item.jd.com/12677623.html