天天看点

FlinkSQL的字段血缘解决方案

作者:DataFunTalk

导读 随着大数据的进一步发展,对数据血缘解析有着很大需求,数据血缘(data lineage)是数据治理(data governance)的重要组成部分,也是元数据管理、数据质量管理的有力工具。通俗地讲,数据血缘就是数据在产生、加工、流转到最终消费过程中形成的有层次的、可溯源的联系。成熟的数据血缘系统可以帮助开发者快速定位问题,以及追踪数据的更改,确定上下游的影响等等。

因此,数据血缘是组织内使数据发挥价值的重要基础能力,也是数据资产的重要组成部分。当前随着以 Flink 为代表的实时数仓的兴起,迫切需要一种解决 FlinkSQL 字段级别血缘的方法。本文就来简要介绍一种在实时数仓中基于 Apache Calcite 解析 FlinkSQL 字段级血缘的方法。

文章将围绕下面七个部分展开:

1. Apache Calcite 简介

2. FlinkSQL 执行流程

3. FlinkSQL 字段血缘解析思路

4. 核心源码阐述

5. Insert、Join 解析案例

6. 扩展 Calcite 支持 Lookup Join、UDTF 解析案例

7. 解析血缘字段的转换关系

分享嘉宾|白松(笔名:HamaWhite) 数澜科技 研发中心副总经理,数据中台产品研发负责人

编辑整理|曹文武 中科云谷

出品社区|DataFun

01

Apache Calcite 简介

首先来简单介绍下 Apache Calcite。

1. Apache Calcite 简介

Apache Calcite 是一款开源的动态数据管理框架,它提供了标准的 SQL 语言、多种查询优化和连接各种数据源的能力,但不包括数据存储、处理数据的算法和存储元数据的存储库。

Calcite 采用的是业界大数据查询框架的一种通用思路,它的目标是“one size fits all”,希望能为不同计算平台和数据源提供统一的查询引擎。Calcite 作为一个强大的 SQL 计算引擎,Flink 内部的 SQL 引擎模块也是基于 Calcite。

Calcite 工作流程如下图所示,一般分为 Parser、Validator、Converter 和 Optimizer 四个阶段。

FlinkSQL的字段血缘解决方案

第一阶段是 Parser,Parser 之后会生成 SqlNode,经过 Validator 和 Converter 生成 RelNode,再经过优化器生成计划。

2. Calcite RelNode 介绍

FlinkSQL的字段血缘解决方案

在 CalciteSQL 解析中,Parser 解析后生成的 SqlNode 语法树,经过 Validator 校验后在 Converter 阶段会把 SqlNode 抽象成语法树(AST)转为关系运算符树(RelNode Tree)。

上图示例的 SQL 来自官网,两张表进行 join 会生成 SqlNode 语法树,经过Converter把 SqlNode 转为关系运算符树。本文的血缘解析会基于 RelNode 来进行。

3. 组件版本

FlinkSQL的字段血缘解决方案

--

02

FlinkSQL 执行流程

根据源码整理出 FlinkSQL 的执行流程主要分为五个阶段:

1. Parse 阶段(语法分析),使用 JavaCC 把 SQL 转换成抽象语法树(AST),在 Calcite 中用 SqlNode 来表示。

2. Validate 阶段(语法校验),根据元数据信息进行语法验证,例如查询的表、字段、函数是否存在,会分别对 from、where、group by、having、select、order by 等子句进行 validate,验证后还是SqlNode构成的语法树 AST。

3. Convert 阶段(语义分析),根据 SqlNode 和元数据信息构成关系表达式RelNode树,也就是最初版本的逻辑计划。

4. Optimize 阶段(逻辑计划优化),优化器会基于规则进行等价变换,例如谓词下推、列裁剪等,最终得到最优的查询计划。

5. Execute 阶段,把逻辑查询计划翻译成物理执行计划,依次生成 StreamGraph、JobGraph,最终提交运行。

FlinkSQL的字段血缘解决方案

上图简述了从 FlinkSQL 到 Flink Job 是如何执行的。比如输入一条 Flink SQL,先由 CalciteParser 生成 SqlNode,这就是前文中提到的语法树。后面通过 validator 校验完成之后还是 SqlNode 类型,在这一步校验的时会去连 CatalogManager 做一些校验。本次实验用的是 Hive 的 MetaStore,当然也可以用 Memory 或者 JDBC 类型的 Catalog。生成 SqlNode 之后,会经过 Convert 阶段,这一部分就是生成RelNode 关系型表达式。

优化是在 FlinkChainedProgram 里面进行,包括 11 个步骤,比如子查询的重写,谓词下推或者重写逻辑计划等。这一步生成的是 Optimized Physical RelNode,即作优化后的物理计划。物理计划经过 Planner 的 translateToExecNodeGraph 翻译成 ExecNodeGraph,再经过后面 Transformation 等步骤,依次生成 StreamGraph 、JobGraph。最终通过 submit job,提交到 Flink 单机运行,或者是提交到集群去运行。

以上就是 FlinkSQL 的一个完整的执行流程。

--

03

FlinkSQL 字段血缘解析思路

FlinkSQL的字段血缘解决方案

FlinkSQL 字段血缘解析分为三个阶段:

1. 对输入 SQL 进行 Parse、 Validate、Convert 生成关系表达式 RelNode 树。

2. 在优化阶段,只生成到 Optimized Logical Plan,而非原本的 Optimized Physical Plan ,要修正 FlinkSQL 的执行流程。

3. 针对上步骤优化生成的逻辑 RelNode,调用 RelMetaDataQuery 的 getColumnOrigins(RelNode rel,int column)查询原始字段信息。最后构造血缘关系,并返回结果。

FlinkSQL的字段血缘解决方案

这里再说明下第二阶段,前面 3 步跟 FlinkSQL 的执行流程是一样的,最后生成原始的 RelNode,本文在第四步进行裁剪,只要解析血缘到逻辑计划就停止,而没必要去生成物理计划,所以这里把后面三个步骤裁剪掉(上图中红色字体标注的)。这就是本文的核心设计思路。

FlinkSQL的字段血缘解决方案

上图中对比原始的逻辑计划和优化后的逻辑计划。例如有 ods_mysql_users 这和维表 dim_mysql_company 进行关联,原始的逻辑计划是先进行全表扫描,然后 join 之后再进行投影。经过优化之后,会把把查询下推,在 select 的阶段只要 查询相关的字段就好。

--

04

核心源码阐述

FlinkSQL的字段血缘解决方案

核心源码是根据前文讲的三个步骤,此处定义一个 parseFieldLineage 方法,输入是一段 sql,会输出字段血缘的解析结果。如果要把血缘集成到其他系统里,只要去调用这个方法就可以获取到字段的血缘关系。此方法主要包含三个阶段:

1.生成原始的 RelNode Tree,调用 parsestatement sql。

2.对原始的 RelNode 去生成优化后的逻辑计划。

3.调用 Calcite 提供的能力,获取它的血缘关系。

下面详细介绍这三个步骤。

(1)根据 SQL 生成 RelNode 树

FlinkSQL的字段血缘解决方案

第一步是 Parser 阶段,调用 tableEnv.getParser().parse(sql) 方法生成 operations,这里能获取到 RelNode。后面代码限制只能支持 insert 的血缘关系,后续会支持 CTAS 等语法。

(2)生成 Optimized Logical Plan

FlinkSQL的字段血缘解决方案

第二步是生成优化后的逻辑计划,根据Flink的源码可知共有 12 个阶段。根据上面讲的,不要最后三个步骤,只要优化后生成逻辑计划就好。

(3)查询原始字段并构造血缘

FlinkSQL的字段血缘解决方案

第三步,查询目的表(arget table)的所有字段的信息,对目的表的每个字段调用RelMetadataQuery.getColumnOrigins()方法,把优化后的 optRelNode 传给它,再把目的表当前字段的索引传给它,这样 Calcite 就能返回目的表这个字段对应的原始表字段信息。

通过上述介绍,大家会发现这种方法获取血缘关系是非常简单的,我们不用去解析用户输入的各种 SQL,核心的血缘关系会由 Calcite 自动获取。

--

05

Insert、Join解析案例

1. 新建测试表

FlinkSQL的字段血缘解决方案

这里新建了三张测试表,一个是 ods_mysql_users 这张表,connector 是映射为 mysql-cdc,包含 id、name、birthday、ts 和 proc_time 共五个字段;维表dim_mysql_company,包含user_id 和company_name两个字段;目的表dwd_hudi_users,是插入到 Hudi 里面,包含 id、name、company_name、 birthday、ts 和 partition 共六个字段。

2. 测试 Insert-Select

FlinkSQL的字段血缘解决方案

把 MySQL 的这张表直接插入到 Hudi 这张表,生成的血缘关系如上图右表所示。

3. 测试 Insert-Join

FlinkSQL的字段血缘解决方案

MySQL 的这张表和 MySQL 维表去做一个 join,分别插入 Hudi 表。可以看到source 表有两个,一个是 users 表,一个是 company 表。比如concat,是把 a 表的 name 字段和 b 表的 company_name 字段插入到目的表 name 字段。在血缘关系中可以看到准确的字段对应关系。

4.进一步改造

FlinkSQL的字段血缘解决方案

近期,github 项目上多位用户反馈,上述操作完成后不支持 lookup join、watermark、UDTF,以及 CEP 的血缘关系。这里举一个例子,如上图所示把 join 换成 lookup join, 即 join 的时候换成了 FOR SYSTEM_TIME AS OF a.proc_time AS b。会发现生成的血缘关系中没有维表 dim_mysql_company 的字段血缘关。

因此需要进一步改造。这里的改造不是对上述整个思路或者思想方法去做改造,而只是在上述的思路或者方法基础上,做一些补充,就能获取到这些血缘信息。

--

06

扩展 Calcite 支持 Lookup Join、UDTF 解析案例

1. Lookup join

先来看一下 lookup join。

FlinkSQL的字段血缘解决方案

针对于 lookup join,Parser 会把 SQL 语句’FOR SYSTEM_TIME AS OF’,解析成SqlSnapshot(SqlNode),在 validate 阶段将其转换为 LocalSnapshot。

从上图中可以看到,先是对这张表做一个scan再转换为snapshot。那为什么没有解析血缘关系呢?是因为 calcite-core中RelMdColumnOrigins 这个 Handler 类里并没有处理 snapshot 类型的 RelNode 导致返回空,继而丢失 lookup join 字段的血缘关系。

我们要做的就是在 RelMdColumnOrigins 增加一个处理 snapshot 的 getColumnOrigins(Snapshot rel, RelMetadataQuery mq, int iOutputColumn) 方法。

FlinkSQL的字段血缘解决方案

这样就可以获取到 lookup join 的血缘关系。

2. UDTF

FlinkSQL的字段血缘解决方案

首先新建建 UDTF,继承 Flink 的 TableFunction 类,复写 eval 这个方法。我们这里加了一个注解,输出两个字段: word 和 length。比如某个表,经过 UDTF 处理,会得到一个类似于临时表,其中包含 word 和 length 两个字段。然后新建 my_split_udtf 函数。

定义好之后,测试 UDTF SQL。

FlinkSQL的字段血缘解决方案

这里用 UDTF 作用于 ods_mysql_users 表的 name 字段,生成临时表,和 ods_mysql_users 表去做 join,注意 length 和 word 这两个字段就来源于 UDTF 对应的临时表。

按照前面讲的血缘分析方法,拿到Relnode后先去获取它的血缘,与 lookup join 类似的思路,我们把优化后的逻辑细化打印出来,发现在 org.apache.calcite.rel.metadata.RelMdColumnOrigin 的 getColumnOrigins() 方法中,没有 Correlate 作为参数的方法,因此无法解析出 UDTF 字段的血缘关系。

FlinkSQL的字段血缘解决方案

由于比较复杂,这里就不列出具体代码。其核心思路是因为 LATERAL TABLE(my_split_udtf(name)) 生成的两个临时表的两个字段 word 和 length,本质上是来自于 ods_mysql_users 这张表的 name 字段,因此针对右边的 LATERAL TABLE 获取 UDTF 中的字段,再根据字段名获取左表信息和索引,最终获取的是左表的字段血缘关系。

3. 扩展支持其它语法

FlinkSQL的字段血缘解决方案

后续如果 Flink 新版本增加新的语法,只要通过自定义 RelMdColumnOrigins 类中的方法便能准确解析出字段血缘关系。

上图的表格中列出了一些 FlinkSQL,经过分析其优化后的逻辑计划,可以找到其 RelNode 子类型,只要针对这一子类型定义一个方法,返回此类型对应的源表和源字段即可。

--

07

解析血缘字段的转换关系

前文中已经解析出字段血缘关系,但只是生成了字段间的映射关系,但是不知道字段间的转换关系,比如两个字段经过怎样的加工生成了目标表的字段。解决这一问题的思路也是修改 Calcite 的源代码,继续修改 RelMdColumnOrigins 在获取血缘关系的时候,记录字段间的转换关系。下面通过三个例子来展示。

FlinkSQL的字段血缘解决方案

从上图的例子中可以看到,birthday 字段经过了 date_format 函数的处理,在血缘关系里面可以看到,在源表、字段、目的表、字段后面多了一个转换关系。

FlinkSQL的字段血缘解决方案

再展示一个例子,是自定义的一个函数 my_suffix_udf,可以看到血缘关系中就包括这一加工映射。

FlinkSQL的字段血缘解决方案

最后来看一个稍微复杂的例子。这里涉及到子查询,字段经过了多个步骤的加工,比如 ods_mysql_users.id 字段,第一步在子查询先经过 sum(id) 处理生成 sum_id,再经过 ABS(sum_id) 处理插入到 dwd_hudi_users.id 字段。此时会记录血缘上整个完整的转换关系 ABS(SUM(id))。

以上就是本次分享的内容,谢谢大家。

FlinkSQL的字段血缘解决方案

继续阅读