天天看點

FlinkSQL的字段血緣解決方案

作者:DataFunTalk

導讀 随着大資料的進一步發展,對資料血緣解析有着很大需求,資料血緣(data lineage)是資料治理(data governance)的重要組成部分,也是中繼資料管理、資料品質管理的有力工具。通俗地講,資料血緣就是資料在産生、加工、流轉到最終消費過程中形成的有層次的、可溯源的聯系。成熟的資料血緣系統可以幫助開發者快速定位問題,以及追蹤資料的更改,确定上下遊的影響等等。

是以,資料血緣是組織内使資料發揮價值的重要基礎能力,也是資料資産的重要組成部分。目前随着以 Flink 為代表的實時數倉的興起,迫切需要一種解決 FlinkSQL 字段級别血緣的方法。本文就來簡要介紹一種在實時數倉中基于 Apache Calcite 解析 FlinkSQL 字段級血緣的方法。

文章将圍繞下面七個部分展開:

1. Apache Calcite 簡介

2. FlinkSQL 執行流程

3. FlinkSQL 字段血緣解析思路

4. 核心源碼闡述

5. Insert、Join 解析案例

6. 擴充 Calcite 支援 Lookup Join、UDTF 解析案例

7. 解析血緣字段的轉換關系

分享嘉賓|白松(筆名:HamaWhite) 數瀾科技 研發中心副總經理,資料中台産品研發負責人

編輯整理|曹文武 中科雲谷

出品社群|DataFun

01

Apache Calcite 簡介

首先來簡單介紹下 Apache Calcite。

1. Apache Calcite 簡介

Apache Calcite 是一款開源的動态資料管理架構,它提供了标準的 SQL 語言、多種查詢優化和連接配接各種資料源的能力,但不包括資料存儲、處理資料的算法和存儲中繼資料的存儲庫。

Calcite 采用的是業界大資料查詢架構的一種通用思路,它的目标是“one size fits all”,希望能為不同計算平台和資料源提供統一的查詢引擎。Calcite 作為一個強大的 SQL 計算引擎,Flink 内部的 SQL 引擎子產品也是基于 Calcite。

Calcite 工作流程如下圖所示,一般分為 Parser、Validator、Converter 和 Optimizer 四個階段。

FlinkSQL的字段血緣解決方案

第一階段是 Parser,Parser 之後會生成 SqlNode,經過 Validator 和 Converter 生成 RelNode,再經過優化器生成計劃。

2. Calcite RelNode 介紹

FlinkSQL的字段血緣解決方案

在 CalciteSQL 解析中,Parser 解析後生成的 SqlNode 文法樹,經過 Validator 校驗後在 Converter 階段會把 SqlNode 抽象成文法樹(AST)轉為關系運算符樹(RelNode Tree)。

上圖示例的 SQL 來自官網,兩張表進行 join 會生成 SqlNode 文法樹,經過Converter把 SqlNode 轉為關系運算符樹。本文的血緣解析會基于 RelNode 來進行。

3. 元件版本

FlinkSQL的字段血緣解決方案

--

02

FlinkSQL 執行流程

根據源碼整理出 FlinkSQL 的執行流程主要分為五個階段:

1. Parse 階段(文法分析),使用 JavaCC 把 SQL 轉換成抽象文法樹(AST),在 Calcite 中用 SqlNode 來表示。

2. Validate 階段(文法校驗),根據中繼資料資訊進行文法驗證,例如查詢的表、字段、函數是否存在,會分别對 from、where、group by、having、select、order by 等子句進行 validate,驗證後還是SqlNode構成的文法樹 AST。

3. Convert 階段(語義分析),根據 SqlNode 和中繼資料資訊構成關系表達式RelNode樹,也就是最初版本的邏輯計劃。

4. Optimize 階段(邏輯計劃優化),優化器會基于規則進行等價變換,例如謂詞下推、列裁剪等,最終得到最優的查詢計劃。

5. Execute 階段,把邏輯查詢計劃翻譯成實體執行計劃,依次生成 StreamGraph、JobGraph,最終送出運作。

FlinkSQL的字段血緣解決方案

上圖簡述了從 FlinkSQL 到 Flink Job 是如何執行的。比如輸入一條 Flink SQL,先由 CalciteParser 生成 SqlNode,這就是前文中提到的文法樹。後面通過 validator 校驗完成之後還是 SqlNode 類型,在這一步校驗的時會去連 CatalogManager 做一些校驗。本次實驗用的是 Hive 的 MetaStore,當然也可以用 Memory 或者 JDBC 類型的 Catalog。生成 SqlNode 之後,會經過 Convert 階段,這一部分就是生成RelNode 關系型表達式。

優化是在 FlinkChainedProgram 裡面進行,包括 11 個步驟,比如子查詢的重寫,謂詞下推或者重寫邏輯計劃等。這一步生成的是 Optimized Physical RelNode,即作優化後的實體計劃。實體計劃經過 Planner 的 translateToExecNodeGraph 翻譯成 ExecNodeGraph,再經過後面 Transformation 等步驟,依次生成 StreamGraph 、JobGraph。最終通過 submit job,送出到 Flink 單機運作,或者是送出到叢集去運作。

以上就是 FlinkSQL 的一個完整的執行流程。

--

03

FlinkSQL 字段血緣解析思路

FlinkSQL的字段血緣解決方案

FlinkSQL 字段血緣解析分為三個階段:

1. 對輸入 SQL 進行 Parse、 Validate、Convert 生成關系表達式 RelNode 樹。

2. 在優化階段,隻生成到 Optimized Logical Plan,而非原本的 Optimized Physical Plan ,要修正 FlinkSQL 的執行流程。

3. 針對上步驟優化生成的邏輯 RelNode,調用 RelMetaDataQuery 的 getColumnOrigins(RelNode rel,int column)查詢原始字段資訊。最後構造血緣關系,并傳回結果。

FlinkSQL的字段血緣解決方案

這裡再說明下第二階段,前面 3 步跟 FlinkSQL 的執行流程是一樣的,最後生成原始的 RelNode,本文在第四步進行裁剪,隻要解析血緣到邏輯計劃就停止,而沒必要去生成實體計劃,是以這裡把後面三個步驟裁剪掉(上圖中紅色字型标注的)。這就是本文的核心設計思路。

FlinkSQL的字段血緣解決方案

上圖中對比原始的邏輯計劃和優化後的邏輯計劃。例如有 ods_mysql_users 這和維表 dim_mysql_company 進行關聯,原始的邏輯計劃是先進行全表掃描,然後 join 之後再進行投影。經過優化之後,會把把查詢下推,在 select 的階段隻要 查詢相關的字段就好。

--

04

核心源碼闡述

FlinkSQL的字段血緣解決方案

核心源碼是根據前文講的三個步驟,此處定義一個 parseFieldLineage 方法,輸入是一段 sql,會輸出字段血緣的解析結果。如果要把血緣內建到其他系統裡,隻要去調用這個方法就可以擷取到字段的血緣關系。此方法主要包含三個階段:

1.生成原始的 RelNode Tree,調用 parsestatement sql。

2.對原始的 RelNode 去生成優化後的邏輯計劃。

3.調用 Calcite 提供的能力,擷取它的血緣關系。

下面詳細介紹這三個步驟。

(1)根據 SQL 生成 RelNode 樹

FlinkSQL的字段血緣解決方案

第一步是 Parser 階段,調用 tableEnv.getParser().parse(sql) 方法生成 operations,這裡能擷取到 RelNode。後面代碼限制隻能支援 insert 的血緣關系,後續會支援 CTAS 等文法。

(2)生成 Optimized Logical Plan

FlinkSQL的字段血緣解決方案

第二步是生成優化後的邏輯計劃,根據Flink的源碼可知共有 12 個階段。根據上面講的,不要最後三個步驟,隻要優化後生成邏輯計劃就好。

(3)查詢原始字段并構造血緣

FlinkSQL的字段血緣解決方案

第三步,查詢目的表(arget table)的所有字段的資訊,對目的表的每個字段調用RelMetadataQuery.getColumnOrigins()方法,把優化後的 optRelNode 傳給它,再把目的表目前字段的索引傳給它,這樣 Calcite 就能傳回目的表這個字段對應的原始表字段資訊。

通過上述介紹,大家會發現這種方法擷取血緣關系是非常簡單的,我們不用去解析使用者輸入的各種 SQL,核心的血緣關系會由 Calcite 自動擷取。

--

05

Insert、Join解析案例

1. 建立測試表

FlinkSQL的字段血緣解決方案

這裡建立了三張測試表,一個是 ods_mysql_users 這張表,connector 是映射為 mysql-cdc,包含 id、name、birthday、ts 和 proc_time 共五個字段;維表dim_mysql_company,包含user_id 和company_name兩個字段;目的表dwd_hudi_users,是插入到 Hudi 裡面,包含 id、name、company_name、 birthday、ts 和 partition 共六個字段。

2. 測試 Insert-Select

FlinkSQL的字段血緣解決方案

把 MySQL 的這張表直接插入到 Hudi 這張表,生成的血緣關系如上圖右表所示。

3. 測試 Insert-Join

FlinkSQL的字段血緣解決方案

MySQL 的這張表和 MySQL 維表去做一個 join,分别插入 Hudi 表。可以看到source 表有兩個,一個是 users 表,一個是 company 表。比如concat,是把 a 表的 name 字段和 b 表的 company_name 字段插入到目的表 name 字段。在血緣關系中可以看到準确的字段對應關系。

4.進一步改造

FlinkSQL的字段血緣解決方案

近期,github 項目上多位使用者回報,上述操作完成後不支援 lookup join、watermark、UDTF,以及 CEP 的血緣關系。這裡舉一個例子,如上圖所示把 join 換成 lookup join, 即 join 的時候換成了 FOR SYSTEM_TIME AS OF a.proc_time AS b。會發現生成的血緣關系中沒有維表 dim_mysql_company 的字段血緣關。

是以需要進一步改造。這裡的改造不是對上述整個思路或者思想方法去做改造,而隻是在上述的思路或者方法基礎上,做一些補充,就能擷取到這些血緣資訊。

--

06

擴充 Calcite 支援 Lookup Join、UDTF 解析案例

1. Lookup join

先來看一下 lookup join。

FlinkSQL的字段血緣解決方案

針對于 lookup join,Parser 會把 SQL 語句’FOR SYSTEM_TIME AS OF’,解析成SqlSnapshot(SqlNode),在 validate 階段将其轉換為 LocalSnapshot。

從上圖中可以看到,先是對這張表做一個scan再轉換為snapshot。那為什麼沒有解析血緣關系呢?是因為 calcite-core中RelMdColumnOrigins 這個 Handler 類裡并沒有處理 snapshot 類型的 RelNode 導緻傳回空,繼而丢失 lookup join 字段的血緣關系。

我們要做的就是在 RelMdColumnOrigins 增加一個處理 snapshot 的 getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) 方法。

FlinkSQL的字段血緣解決方案

這樣就可以擷取到 lookup join 的血緣關系。

2. UDTF

FlinkSQL的字段血緣解決方案

首先建立建 UDTF,繼承 Flink 的 TableFunction 類,複寫 eval 這個方法。我們這裡加了一個注解,輸出兩個字段: word 和 length。比如某個表,經過 UDTF 處理,會得到一個類似于臨時表,其中包含 word 和 length 兩個字段。然後建立 my_split_udtf 函數。

定義好之後,測試 UDTF SQL。

FlinkSQL的字段血緣解決方案

這裡用 UDTF 作用于 ods_mysql_users 表的 name 字段,生成臨時表,和 ods_mysql_users 表去做 join,注意 length 和 word 這兩個字段就來源于 UDTF 對應的臨時表。

按照前面講的血緣分析方法,拿到Relnode後先去擷取它的血緣,與 lookup join 類似的思路,我們把優化後的邏輯細化列印出來,發現在 org.apache.calcite.rel.metadata.RelMdColumnOrigin 的 getColumnOrigins() 方法中,沒有 Correlate 作為參數的方法,是以無法解析出 UDTF 字段的血緣關系。

FlinkSQL的字段血緣解決方案

由于比較複雜,這裡就不列出具體代碼。其核心思路是因為 LATERAL TABLE(my_split_udtf(name)) 生成的兩個臨時表的兩個字段 word 和 length,本質上是來自于 ods_mysql_users 這張表的 name 字段,是以針對右邊的 LATERAL TABLE 擷取 UDTF 中的字段,再根據字段名擷取左表資訊和索引,最終擷取的是左表的字段血緣關系。

3. 擴充支援其它文法

FlinkSQL的字段血緣解決方案

後續如果 Flink 新版本增加新的文法,隻要通過自定義 RelMdColumnOrigins 類中的方法便能準确解析出字段血緣關系。

上圖的表格中列出了一些 FlinkSQL,經過分析其優化後的邏輯計劃,可以找到其 RelNode 子類型,隻要針對這一子類型定義一個方法,傳回此類型對應的源表和源字段即可。

--

07

解析血緣字段的轉換關系

前文中已經解析出字段血緣關系,但隻是生成了字段間的映射關系,但是不知道字段間的轉換關系,比如兩個字段經過怎樣的加工生成了目标表的字段。解決這一問題的思路也是修改 Calcite 的源代碼,繼續修改 RelMdColumnOrigins 在擷取血緣關系的時候,記錄字段間的轉換關系。下面通過三個例子來展示。

FlinkSQL的字段血緣解決方案

從上圖的例子中可以看到,birthday 字段經過了 date_format 函數的處理,在血緣關系裡面可以看到,在源表、字段、目的表、字段後面多了一個轉換關系。

FlinkSQL的字段血緣解決方案

再展示一個例子,是自定義的一個函數 my_suffix_udf,可以看到血緣關系中就包括這一加工映射。

FlinkSQL的字段血緣解決方案

最後來看一個稍微複雜的例子。這裡涉及到子查詢,字段經過了多個步驟的加工,比如 ods_mysql_users.id 字段,第一步在子查詢先經過 sum(id) 處理生成 sum_id,再經過 ABS(sum_id) 處理插入到 dwd_hudi_users.id 字段。此時會記錄血緣上整個完整的轉換關系 ABS(SUM(id))。

以上就是本次分享的内容,謝謝大家。

FlinkSQL的字段血緣解決方案

繼續閱讀