天天看點

Hudi on Flink 快速上手指南

一、背景

Apache Hudi 是目前最流行的資料湖解決方案之一,Data Lake Analytics[1] 內建了 Hudi 服務高效的資料 MERGE(UPDATE/DELETE)場景;AWS 在 EMR 服務中 預安裝[2] 了 Apache Hudi,為使用者提供高效的 record-level updates/deletes 和高效的資料查詢管理;Uber [3]已經穩定運作 Apache Hudi 服務 4 年多,提供了低延遲的資料庫同步和高效率的查詢[4]。自 2016 年 8 月上線以來,資料湖存儲規模已經超過 100PB[5]。

Apache Flink 作為目前最流行的流計算架構,在流式計算場景有天然的優勢,目前,Flink 社群也在積極擁抱 Hudi 社群,發揮自身 streaming 寫/讀的優勢,同時也對 batch 的讀寫做了支援。

Hudi 和 Fink 在 0.8.0 版本做了大量的內建工作[6]。核心的功能包括:

實作了新的 Flink streaming writer

支援 batch 和 streaming 模式 reader

支援 Flink SQL API

Flink streaming writer 通過 state 實作了高效的 index 方案,同時 Hudi 在 UPDATE/DELETE 上的優秀設計使得 Flink Hudi 成為目前最有潛力的 CDC 資料入湖方案,因為篇幅關系,将在後續的文章中介紹。

本文用 Flink SQL Client 來簡單的示範通過 Flink SQL API 的方式實作 Hudi 表的操作,包括 batch 模式的讀寫和 streaming 模式的讀。

二、環境準備

本文使用 Flink Sql Client[7] 作為示範工具,SQL CLI 可以比較友善地執行 SQL 的互動操作。

第一步:下載下傳 Flink jar

Hudi 內建了 Flink 的 1.11 版本。您可以參考這裡[8]來設定 Flink 環境。hudi-flink-bundle jar 是一個內建了 Flink 相關的 jar 的 uber jar, 目前推薦使用 scala 2.11 來編譯。

第二步:設定 Flink 叢集

啟動一個 standalone 的 Flink 叢集。啟動之前,建議将 Flink 的叢集配置設定如下:

在 $FLINK_HOME/conf/flink-conf.yaml 中添加配置項 taskmanager.numberOfTaskSlots: 4

在 $FLINK_HOME/conf/workers 中将條目 localhost 設定成 4 行,這裡的行數代表了本地啟動的 worker 數

啟動叢集:

第三步:啟動 Flink SQL Client

Hudi 的 bundle jar 應該在 Sql Client 啟動的時候加載到 CLASSPATH 中。您可以在路徑 hudi-source-dir/packaging/hudi-flink-bundle 下手動編譯 jar 包或者從 Apache Official Repository [9]下載下傳。

啟動 SQL CLI:

備注:

推薦使用 hadoop 2.9.x+ 版本,因為一些對象存儲(aliyun-oss)從這個版本開始支援

flink-parquet 和 flink-avro 已經被打進 hudi-flink-bundle jar

您也可以直接将 hudi-flink-bundle jar 拷貝到 $FLINK_HOME/lib 目錄下

本文的存儲選取了對象存儲 aliyun-oss,為了友善,您也可以使用本地路徑

示範的工作目錄結構如下:

三、Batch 模式的讀寫

插入資料

使用如下 DDL 語句建立 Hudi 表:

DDL 裡申明了表的 path,record key 為預設值 uuid,pre-combine key 為預設值 ts 。

然後通過 VALUES 語句往表中插入資料:

這裡看到 Flink 的作業已經成功送出到叢集,可以本地打開 web UI 觀察作業的執行情況:

Hudi on Flink 快速上手指南

查詢資料

作業執行完成後,通過 SELECT 語句查詢表結果:

這裡執行語句 set execution.result-mode=tableau; 可以讓查詢結果直接輸出到終端。

通過在 WHERE 子句中添加 partition 路徑來裁剪 partition:

更新資料

相同的 record key 的資料會自動覆寫,通過 INSERT 相同 key 的資料可以實作資料更新:

可以看到 uuid 為 id1 和 id2 的資料 age 字段值發生了更新。

再次 insert 新資料觀察結果:

四、Streaming 讀

通過如下語句建立一張新的表并注入資料:

這裡将 table option read.streaming.enabled 設定為 true,表明通過 streaming 的方式讀取表資料;opiton read.streaming.check-interval 指定了 source 監控新的 commits 的間隔為 4s;option table.type 設定表類型為 MERGE_ON_READ,目前隻有 MERGE_ON_READ 表支援 streaming 讀。

以上操作發生在一個 terminal 中,我們稱之為 terminal_1。

從新的 terminal(我們稱之為 terminal_2)再次啟動 Sql Client,重新建立 t1 表并查詢:

回到 terminal_1,繼續執行 batch mode 的 INSERT 操作:

幾秒之後,觀察 terminal_2 的輸出多了一行:

再次在 terminal_1 中執行 INSERT 操作:

觀察 terminal_2 的輸出變化:

五、總結

通過一些簡單的示範,我們發現 HUDI Flink 的內建已經相對完善,讀寫路徑均已覆寫,關于詳細的配置,可以參考 Flink SQL Config Options[10]。

Hudi 社群正在積極的推動和 Flink 的深度內建,包括但不限于:

Flink streaming reader 支援 watermark,實作資料湖/倉的中間計算層 pipeline

Flink 基于 Hudi 的物化視圖,實作分鐘級的增量視圖,服務于線上的近實時查詢