天天看點

順豐科技 Hudi on Flink 實時數倉實踐

本文作者為劉傑,介紹了順豐科技數倉的架構,趟過的一些問題、使用 Hudi 來優化整個 job 狀态的實踐細節,以及未來的一些規劃。主要内容為:
  1. 數倉架構
  2. Hudi 代碼躺過的坑
  3. 狀态優化
  4. 未來規劃

GitHub 位址

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

順豐科技早在 2019 年引入 Hudi ,當時是基于 Spark 批處理,2020 年對資料的實時性要求更高公司對架構進行了更新,在社群 Hudi on Flink 的半成品上持續優化實作 Binlog 資料 CDC 入湖。在 Hudi 社群飛速發展的同時公司今年對數倉也提出了新的要求,最終采用 Flink + Hudi 的方式來寬表的實時化。過程中遇到了很多問題主要有兩點:

  1. Hudi Master 代碼當時存在一些漏洞;
  2. 寬表涉及到多個 Join,Top One 等操作使得狀态很大。

慶幸的是社群的修複速度很給力加上 Hudi 強大 upsert 能力使這兩個問題得到以有效的解決。

一、數倉架構

感興趣的同學可以參考之前順豐分享的

Hudi on Flink 在順豐的實踐應用

二、Hudi 代碼趟過的坑

在去年我們是基于 Hudi 0.6 左右進行的 Hudi on Flink 的實踐,代碼較老。為了擁抱社群我們使用最新 master 代碼進行實踐,在大資料量寫入場景中,發現了一個比較隐秘的丢數問題,這個問題花了将近兩周的時間才定位到。

1. Hudi StreamWriteFunction 算子核心流程梳理

順豐科技 Hudi on Flink 實時數倉實踐

StreamWriteFunction

算子收資料的時候會先把資料按照 fileld 分組緩存好,資料的持續流會使得緩存資料越來越大,當達到一定門檻值時便會執行 flush。門檻值由 2 個核心參數控制:

write.batch.size 預設 64M

write.task.max.size 預設 1G

。當單個分組資料達到 64M 或者總緩存資料達到 800M ~ 1G 就會觸發 flush 。

flush 會調用 client 的 api 去建立一個 WriteHandle,然後把 WriteHandle 放入 Map 進行緩存,一個 handle 可以了解為對應一個檔案的 cow。

如果一個 fileld 在同一 checkpoint 期間被多次寫入,則後一次是基于前一次的 cow, 它的 handle 是一個

FlinkMergeAndReplaceHandle

,判斷一個 fileld 是否之前被寫入過就是根據上面 Map 緩存得來的。

StreamWriteFunction

執行 snapshotState 時會把記憶體的所有分組資料一次進行 flush, 之後對 client 的 handle 進行清空。

2. 場景還原

Hudi 本身是具備 upsert 能力的,是以我們開始認為 Hudi Sink 在 At Least Once 模式下是沒問題的,并且 At Least Once 模式下 Flink 算子不需要等待 Barrier 對齊,能夠處理先到的資料使得處理速度更快,于是我們在 Copy On Write 場景中對 Flink CheckpointingMode 設定了 AT_LEAST_ONCE。

writeFunction 的上遊是檔案

BucketAssignFunction

fileld 配置設定算子,假如有一批 insert 資料 A、B、C、D 屬于同一個分區并且配置設定到同一個

BucketAssignFunction

的 subtask ,但是 A、B 和 C、D 是相鄰兩個不同的 checkpoint。

當 A 進入

BucketAssignFunction

時如果發現沒有新的小檔案可以使用,就會建立一個新的 fileld f0,當 B 流入時也會給他配置設定到 f0 上。同時因為是 AT_LEAST_ONCE 模式,C、D 資料都有可能被處理到也被配置設定到了 f0 上。也就是說 在 AT_LEAST_ONCE 模式下由于 C、D 資料被提前處理,導緻 A、B、C、D 4 條屬于兩個 checkpoint 的 insert 資料被配置設定到了同一個 fileld。

writeFunction 有可能當接收到 A、B、C 後這個算子的 barrier 就對齊了,會把 A、B、C 進行 flush,而 D 将被遺留到下一個 checkpoint 才處理。A、B、C 是 insert 資料是以就會直接建立一個檔案寫入,D 屬于下一個 checkpoint ,A、B、C 寫入時建立的 handle 已被清理了,等到下一個 checkpoint 執行 flush。因為 D 也是 insert 資料是以也會直接建立一個檔案寫資料,但是 A、B、C、D 的 fileld 是一樣的,導緻最終 D 建立的檔案覆寫了 A、B、C 寫入的檔案最終導緻 A、B、C 資料丢失。

順豐科技 Hudi on Flink 實時數倉實踐

3. 問題定位

這個問題之是以難定位是因為具有一定随機性,每次丢失的資料都不太一樣,而且小資料量不易出現。最終通過開啟 Flink 的 Queryable State 進行查詢, 查找丢失資料的定位到 fileld, 發現 ABCD state 的 instant 都是 I,然後解析對應 fileld 的所有版本進行跟蹤還原。

三、狀态優化

我們對線上最大的離線寬邊進行了實時化的,寬表字段較多,涉及到多個表對主表的 left join 還包括一些 Top One 的計算,這些算子都會占用 state. 而我們的資料周期較長需要儲存 180 天資料。估算下來狀态大小将會達到上百 T,這無疑會對狀态的持久化帶來很大的壓力。但是這些操作放入 Hudi 來做就顯得輕而易舉。

1. Top One 下沉 Hudi

在 Hudi 中有一個

write.precombine.field

配置項用來指定使用某個字段對 flush 的資料去重,當出現多條資料需要去重時就會按照整個字段進行比較,保留最大的那條記錄,這其實和 Top One 很像。

我們在 SQL 上将 Top One 的排序邏輯組合成了一個字段設定為 Hudi 的

write.precombine.field

,同時把這個字段寫入 state,同一 key 的資料多次進來時都會和 state 的

write.precombine.field

進行比較更新。

Flink Top One 的 state 預設是儲存整記錄的所有字段,但是我們隻儲存了一個字段,大大節省了 state 的大小。

2. 多表 Left Join 下沉 Hudi

2.1 Flink SQL join

我們把這個場景簡化成如下一個案例,假如有寬表 t_p 由三張表組成

insert into t_p 
select 
    t0.id,t0.name,
    t1.age,           
    t2.sex 
from t0 
    left join t1 on t0.id = t1.id 
    left join t2 on t0.id = t2.id           

在 Flink SQL join 算子内部會維護一個左表和右表的 state,這都是每個 table 的全字段,且多一次 join 就會多出一個 state. 最終導緻 state 大小膨脹,如果 join 算子上遊是一個 append 流,state 大小膨脹的效果更明顯。

2.2 把 Join 改寫成 Union All

對于上面案例每次 left join 隻是補充了幾個字段,我們想到用 union all 的方式進行 SQL 改寫,union all 需要補齊所有字段,缺的字段用 null 補。我們認為 null 補充的字段不是有效字段。 改成從 union all 之後要求 Hudi 具備局部更新的能力才能達到 join 的效果。

  • 當收到的資料是來自 t0 的時候就隻更新 id 和 name 字段;
  • 同理 ,資料是來自 t1 的時候就隻更新 age 字段;
  • t2 隻更新 sex 字段。

不幸的是 Hudi 的預設實作是全字段覆寫,也就是說當收到 t0 的資料時會把 age sex 覆寫成 null, 收到 t1 資料時會把 name sex 覆寫成 null。這顯然是不可接受的。這就要求我們對 Hudi sink 進行改造。

2.3 Hudi Union All 實作

Hudi 在 cow 模式每條記錄的更新寫入都是對舊資料進行 copy 覆寫寫入,似乎隻要知道這條記錄來自哪個表,哪幾個字段是有效的字段就選擇性的對 copy 出來的字段進行覆寫即可。但是在分區變更的場景中就不是那麼好使了。在分區變更的場景中,資料從一個分區變到另一個分區的邏輯是把舊分區資料删掉,往新分區新增資料。這可能會把一些之前局部更新的字段資訊丢失掉。細聊下來 Hudi on Flink 涉及到由幾個核心算子組成 pipeline。

順豐科技 Hudi on Flink 實時數倉實踐
  • RowDataToHoodieFunction:這是對收入的資料進行轉化成一個 HudiRecord,收到資料是包含全字段的,我們在轉化 HudiRecord 的時候隻選擇了有效字段進行轉化。
  • BoostrapFunction:在任務恢複的時候會讀取檔案加載索引資料,當任務恢複後次算子不做資料轉化處理。
  • BucketAssignFunction:這個算子用來對記錄配置設定 location,loaction 包含兩部分資訊。一是分區目錄,另一個是 fileld。fileld 用來辨別記錄将寫入哪個檔案,一旦記錄被确定寫入哪個檔案,就會發記錄按照 fileld 分組發送到 StreamWriteFunction,StreamWriteFunction 再按檔案進行批量寫入。

原生的 BucketAssignFunction 的算子邏輯如下圖,當收到一條記錄時會先從 state 裡面進行查找是否之前有寫過這條記錄,如果有就會找對應的 location。如果分區沒有發生變更,就把目前這條記錄也配置設定給這個location,如果在 state 中沒有找到 location 就會新建立一個 location,把這個新的location 配置設定給目前記錄,并更新到 state。

總之這個 state 存儲的 location 就是告訴目前記錄應該從哪個檔案進行更新或者寫入。遇到分區變更的場景會複雜一點。假如一條記錄從 2020 分區變更成了 2021,就會建立一條删除的記錄,它的 loaction 是 state 中的 location。這條記錄讓下遊進行實際的删除操作,然後再建立一個新的 location (分區是 2021) 發送到下遊進行 insert。

順豐科技 Hudi on Flink 實時數倉實踐

為了在 Hudi 中實作 top one,我們對 state 資訊進行了擴充,用來做 Top One 時間字段。

對于 StreamWriteFunction 在 Insert 場景中,假如收到了如下 3 條資料 {id:1,name:zs},{id:1,age:20},{id:1,sex:man},在執行 flush 時會建立一個全字段的空記錄 {id:null,name:null,age:null,sex:null},然後依次和 3 條記錄進行合并。注意,這個合并過程隻會選擇有效字段的合并。如下圖:

順豐科技 Hudi on Flink 實時數倉實踐

在 Update 場景中的更新邏輯類似 insert 場景,假如老資料是 {id:1,name:zs,age:20,sex:man} ,新收到了{id:1,name:ls},{id:1,age:30} 這 2 條資料,就會先從檔案中把老的資料讀出來,然後依次和新收到的資料進行合并,合并步驟同 insert。如下圖:

順豐科技 Hudi on Flink 實時數倉實踐

這樣通過 union all 的方式達到了 left join 的效果,大大節省了 state 的大小。

四、未來規劃

parquet 中繼資料資訊收集,parquet 檔案可以從 footer 裡面得到每個行列的最大最小等資訊,我們計劃在寫入檔案的後把這些資訊收集起來,并且基于上一次的 commit 的中繼資料資訊進行合并,生成一個包含所有檔案的中繼資料檔案,這樣可以在讀取資料時進行謂詞下推進行檔案的過濾。

公司緻力于打造基于 Hudi 作為底層存儲,Flink 作為流批一體化的 SQL 計算引擎,Flink 的批處理 Hudi 這塊還涉足不深,未來可能會計劃用 Flink 對 Hudi 實作 clustering 等功能,在 Flink 引擎上完善 Hudi 的批處理功能。

熱點推薦

Flink Forward Asia 2021 正式啟動!議題火熱征集中! Flink 1.14 新特性預覽 Apache Flink 在汽車之家的應用與實踐 37 手遊基于 Flink CDC + Hudi 湖倉一體方案實踐

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群

第一時間擷取最新技術文章和社群動态,請關注公衆号~

順豐科技 Hudi on Flink 實時數倉實踐

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99 元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包 3 個月及以上還有 85 折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
順豐科技 Hudi on Flink 實時數倉實踐

繼續閱讀