天天看點

Flink SQL的行級權限解決方案及源碼解讀

作者:Lakehouse

flink-sql-security

FlinkSQL的行級權限解決方案及源碼,支援面向使用者級别的行級資料通路控制,即特定使用者隻能通路授權過的行,隐藏未授權的行資料。此方案是實時領域Flink的解決方案,類似離線數倉Hive中Ranger Row-level Filter方案。

序号 作者 版本 時間 備注
1 HamaWhite 1.0.0 2022-12-15 1. 增加文檔和源碼

源碼位址: https://github.com/HamaWhiteGG/flink-sql-security

一、基礎知識

1.1 行級權限

行級權限即橫向資料安全保護,可以解決不同人員隻允許通路不同資料行的問題。例如針對訂單表,使用者A隻能檢視到北京區域的資料,使用者B隻能檢視到杭州區域的資料。

Flink SQL的行級權限解決方案及源碼解讀

1.2 業務流程

1.2.1 設定行級權限

管理者配置使用者、表、行級權限條件,例如下面的配置。

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'
2 使用者B orders region = 'hangzhou'

1.2.2 使用者查詢資料

使用者在系統上查詢orders表的資料時,系統在底層查詢時會根據該使用者的行級權限條件來自動過濾資料,即讓行級權限生效。

當使用者A和使用者B在執行下面相同的SQL時,會檢視到不同的結果資料。

SELECT * FROM orders;           

使用者A檢視到的結果資料是:

order_id order_date customer_name price product_id order_status region
10001 2020-07-30 10:08:22 Jack 50.50 102 false beijing
10002 2020-07-30 10:11:09 Sally 15.00 105 false beijing
注: 系統底層最終執行的SQL是:

SELECT * FROM orders WHERE region = 'beijing'

使用者B檢視到的結果資料是:

order_id order_date customer_name price product_id order_status region
10003 2020-07-30 12:00:30 Edward 25.25 106 false hangzhou
10004 2022-12-15 12:11:09 John 78.00 103 false hangzhou
注: 系統底層最終執行的SQL是:

SELECT * FROM orders WHERE region = 'hangzhou'

1.3 元件版本

元件名稱 版本 備注
Flink 1.16.0
Flink-connector-mysql-cdc 2.3.0

二、Hive行級權限解決方案

在離線數倉工具Hive領域,由于發展多年已有Ranger來支援表資料的行級權限控制,詳見參考文獻[2][1]。下圖是在Ranger裡配置Hive表行級過濾條件的頁面,供參考。

Flink SQL的行級權限解決方案及源碼解讀

但由于Flink實時數倉領域發展相對較短,Ranger還不支援FlinkSQL,以及要依賴Ranger會導緻系統部署和運維過重,是以開始自研實時數倉的行級權限解決工具。

三、FlinkSQL行級權限解決方案

3.1 解決方案

3.1.1 FlinkSQL執行流程

可以參考作者文章[FlinkSQL字段血緣解決方案及源碼][2],本文根據Flink1.16修正和簡化後的執行流程如下圖所示。

Flink SQL的行級權限解決方案及源碼解讀

在CalciteParser.parse()處理後會得到一個SqlNode類型的抽象文法樹(

Abstract Syntax Tree

,簡稱AST),本文會在Parse階段,通過組裝行級過濾條件生成新的AST來實作行級權限控制。

3.1.2 Calcite對象繼承關系

下面章節要用到Calcite中的SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall和SqlSelect等類,此處進行簡單介紹以及展示它們間繼承關系,以便讀者閱讀本文源碼。

序号 介紹
1 SqlNode A SqlNode is a SQL parse tree.
2 SqlCall A SqlCall is a call to an SqlOperator operator.
3 SqlIdentifier A SqlIdentifier is an identifier, possibly compound.
4 SqlJoin Parse tree node representing a JOIN clause.
5 SqlBasicCall Implementation of SqlCall that keeps its operands in an array.
6 SqlSelect A SqlSelect is a node of a parse tree which represents a select statement.
Flink SQL的行級權限解決方案及源碼解讀

3.1.3 解決思路

在Parser階段,如果執行的SQL包含對表的查詢操作,則一定會建構Calcite SqlSelect對象。是以限制表的行級權限,隻要在建構Calcite SqlSelect對象時對Where條件進行攔截即可,而不需要解析使用者執行的各種SQL來查找配置過行級權限條件限制的表。

在SqlSelect對象構造Where條件時,要通過執行使用者和表名來查找配置的行級權限條件,系統會把此條件用CalciteParser提供的

parseExpression(String sqlExpression)

方法解析生成一個SqlBacicCall再傳回。然後結合使用者執行的SQL和配置的行級權限條件重新組裝Where條件,即生成新的帶行級過濾條件Abstract Syntax Tree,最後基于新的AST再執行後續的Validate、Convert、Optimize和Execute階段。

Flink SQL的行級權限解決方案及源碼解讀

以上整個過程對執行SQL的使用者都是透明和無感覺的,還是調用Flink自帶的TableEnvironment.executeSql(String statement)方法即可。

注: 要通過技術手段把執行使用者傳遞到Calcite SqlSelect中。

3.2 重寫SQL

主要在org.apache.calcite.sql.SqlSelect的構造方法中完成。

3.2.1 主要流程

主流程如下圖所示,根據From的類型進行不同的操作,例如針對SqlJoin類型,要分别周遊其left和right節點,而且要支援遞歸操作以便支援三張表及以上JOIN;針對SqlIdentifier類型,要額外判斷下是否來自JOIN,如果是的話且JOIN時且未定義表别名,則用表名作為别名;針對SqlBasicCall類型,如果來自于子查詢,說明已在子查詢中組裝過行級權限條件,則直接傳回目前Where即可,否則分别取出表名和别名。

然後再擷取行級權限條件解析後生成SqlBacicCall類型的Permissions,并給Permissions增加别名,最後把已有Where和Permissions進行組裝生成新的Where,來作為SqlSelect對象的Where限制。

Flink SQL的行級權限解決方案及源碼解讀

上述流程圖的各個分支,都會在下面的用例測試章節中會舉例說明。

3.2.2 核心源碼

核心源碼位于SqlSelect中新增的

addCondition()

addPermission()

buildWhereClause()

三個方法,下面隻給出控制主流程

addCondition()

的源碼。

/**
 * The main process of controlling row-level permissions
 */
private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {
if (from instanceof SqlIdentifier) {
String tableName = from.toString();
// the table name is used as an alias for join
String tableAlias = fromJoin ? tableName : ;
return addPermission(where, tableName, tableAlias);
 } else if (from instanceof SqlJoin) {
SqlJoin sqlJoin = (SqlJoin) from;
// support recursive processing, such as join for three tables, process left sqlNode
 where = addCondition(sqlJoin.getLeft(), where, true);
// process right sqlNode
return addCondition(sqlJoin.getRight(), where, true);
 } else if (from instanceof SqlBasicCall) {
// Table has an alias or comes from a subquery
 SqlNode[] tableNodes = ((SqlBasicCall) from).getOperands();
/**
 * If there is a subquery in the Join, row-level filtering has been appended to the subquery.
 * What is returned here is the SqlSelect type, just return the original where directly
 */
if (!(tableNodes[0] instanceof SqlIdentifier)) {
return where;
 }
String tableName = tableNodes[0].toString();
String tableAlias = tableNodes[1].toString();
return addPermission(where, tableName, tableAlias);
 }
return where;
}           

四、用例測試

用例測試資料來自于CDC Connectors for Apache Flink [6][3]官網,在此表示感謝。下載下傳本文源碼後,可通過Maven運作單元測試。

$ cd flink-sql-security
$ mvn test           

4.1 建立Mysql表及初始化資料

Mysql建立表語句及初始化資料SQL詳見源碼[flink-sql-security/data/database][4]裡面的mysql_ddl.sql和mysql_init.sql檔案,本文給

orders

表增加一個region字段。

4.2 建立Flink表

4.2.1 建立mysql cdc類型的orders表

DROP TABLE IF EXISTS orders;

CREATE TABLE IF NOT EXISTS orders (
 order_id INT PRIMARY KEY NOT ENFORCED,
 order_date TIMESTAMP(0),
 customer_name STRING,
 product_id INT,
 price DECIMAL(10, 5),
 order_status BOOLEAN,
 region STRING
) WITH (
'connector'='mysql-cdc',
'hostname'='xxx.xxx.xxx.xxx',
'port'='3306',
'username'='root',
'password'='xxx',
'server-time-zone'='Asia/Shanghai',
'database-name'='demo',
'table-name'='orders'
);           

4.2.2 建立mysql cdc類型的products表

DROP TABLE IF EXISTS products;

CREATE TABLE IF NOT EXISTS products (
 id INT PRIMARY KEY NOT ENFORCED,
 name STRING,
 description STRING
) WITH (
'connector'='mysql-cdc',
'hostname'='xxx.xxx.xxx.xxx',
'port'='3306',
'username'='root',
'password'='xxx',
'server-time-zone'='Asia/Shanghai',
'database-name'='demo',
'table-name'='products'
);           

4.2.3 建立mysql cdc類型shipments表

DROP TABLE IF EXISTS shipments;

CREATE TABLE IF NOT EXISTS shipments (
 shipment_id INT PRIMARY KEY NOT ENFORCED,
 order_id INT,
 origin STRING,
 destination STRING,
 is_arrived BOOLEAN
) WITH (
'connector'='mysql-cdc',
'hostname'='xxx.xxx.xxx.xxx',
'port'='3306',
'username'='root',
'password'='xxx',
'server-time-zone'='Asia/Shanghai',
'database-name'='demo',
'table-name'='shipments'
);           

4.2.4 建立print類型print_sink表

DROP TABLE IF EXISTS print_sink;

CREATE TABLE IF NOT EXISTS print_sink (
 order_id INT PRIMARY KEY NOT ENFORCED,
 order_date TIMESTAMP(0),
 customer_name STRING,
 product_id INT,
 price DECIMAL(10, 5),
 order_status BOOLEAN,
 region STRING
) WITH (
'connector'='print'
);           

4.3 測試用例

詳細測試用例可檢視源碼中的單測,下面隻描述部分測試點。

4.3.1 簡單SELECT

4.3.1.1 行級權限條件

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'

4.3.1.2 輸入SQL

SELECT * FROM orders;           

4.3.1.3 輸出SQL

SELECT * FROM orders WHERE region = 'beijing';           

4.3.1.4 測試小結

輸入SQL中沒有WHERE條件,隻需要把行級過濾條件

region = 'beijing'

追加到WHERE後即可。

4.3.2 SELECT帶複雜WHERE限制

4.3.2.1 行級權限條件

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'

4.3.2.2 輸入SQL

SELECT * FROM orders WHERE price > 45.0 OR customer_name = 'John';           

4.3.2.3 輸出SQL

SELECT * FROM orders WHERE (price > 45.0 OR customer_name = 'John') AND region = 'beijing';           

4.3.2.4 測試小結

輸入SQL中有兩個限制條件,中間用的是OR,是以在組裝

region = 'beijing'

時,要給已有的

price > 45.0 OR customer_name = 'John'

增加括号。

4.3.3 兩表JOIN且含子查詢

4.3.3.1 行級權限條件

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'

4.3.3.2 輸入SQL

SELECT
 o.*,
 p.name,
 p.description
FROM 
 (SELECT
*
FROM 
 orders
WHERE 
 order_status = FALSE
 ) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
 o.price > 45.0 OR o.customer_name = 'John'            

4.3.3.3 輸出SQL

SELECT
 o.*,
 p.name,
 p.description
FROM 
 (SELECT
*
FROM 
 orders
WHERE 
 order_status = FALSE AND region = 'beijing'
 ) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
 o.price > 45.0 OR o.customer_name = 'John'            

4.3.3.4 測試小結

針對比較複雜的SQL,例如兩表在JOIN時且其中左表來自于子查詢

SELECT * FROM orders WHERE order_status = FALSE

,行級過濾條件

region = 'beijing'

隻會追加到子查詢的裡面。

4.3.4 三表JOIN

4.3.4.1 行級權限條件

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'
2 使用者A products name = 'hammer'
3 使用者A shipments is_arrived = FALSE

4.3.4.2 輸入SQL

SELECT
 o.*,
 p.name,
 p.description,
 s.shipment_id,
 s.origin,
 s.destination,
 s.is_arrived
FROM
 orders AS o
LEFT JOIN products AS p ON o.product_id=p.id
LEFT JOIN shipments AS s ON o.order_id=s.order_id;           

4.3.4.3 輸出SQL

SELECT
 o.*,
 p.name,
 p.description,
 s.shipment_id,
 s.origin,
 s.destination,
 s.is_arrived
FROM
 orders AS o
LEFT JOIN products AS p ON o.product_id=p.id
LEFT JOIN shipments AS s ON o.order_id=s.order_id
WHERE
 o.region='beijing'
AND p.name='hammer'
AND s.is_arrived=FALSE;           

4.3.4.4 測試小結

三張表進行JOIN時,會分别擷取

orders

products

shipments

三張表的行級權限條件:

region = 'beijing'

name = 'hammer'

is_arrived = FALSE

,然後增加

orders

表的别名o、

products

表的别名p、

shipments

表的别名s,最後組裝到WHERE子句後面。

4.3.5 INSERT來自帶子查詢的SELECT

4.3.5.1 行級權限條件

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'

4.3.5.2 輸入SQL

INSERT INTO print_sink SELECT * FROM (SELECT * FROM orders);           

4.3.5.3 輸出SQL

INSERT INTO print_sink (SELECT * FROM (SELECT * FROM orders WHERE region = 'beijing'));           

4.3.5.4 測試小結

無論運作SQL類型是INSERT、SELECT或者其他,隻會找到查詢

oders

表的子句,然後對其組裝行級權限條件。

4.3.6 運作SQL

測試兩個不同使用者執行相同的SQL,兩個使用者的行級權限條件不一樣。

4.3.6.1 行級權限條件

序号 使用者名 表名 行級權限條件
1 使用者A orders region = 'beijing'
2 使用者B orders region = 'hangzhou'

4.3.6.2 輸入SQL

SELECT * FROM orders;           

4.3.6.3 執行SQL

使用者A的真實執行SQL:

SELECT * FROM orders WHERE region = 'beijing';           

使用者B的真實執行SQL:

SELECT * FROM orders WHERE region = 'hangzhou';           

4.3.6.4 測試小結

使用者調用下面的執行方法,除傳遞要執行的SQL參數外,隻需要額外指定執行的使用者即可,便能自動按照行級權限限制來執行。

/**
 * Execute the single sql with user permissions
 */
public TableResult execute(String username, String singleSql) {
 System.setProperty(EXECUTE_USERNAME, username);
return tableEnv.executeSql(singleSql);
}           

五、源碼修改步驟

注: Flink版本1.16.0依賴的Calcite是1.26.0版本。

5.1 新增Parser和ParserImpl類

複制Flink源碼中的org.apache.flink.table.delegation.Parser和org.apache.flink.table.planner.delegation.ParserImpl到項目下,新增下面兩個方法及實作。

/**
 * Parses a SQL expression into a {@link SqlNode}. The {@link SqlNode} is not yet validated.
 *
 * @param sqlExpression a SQL expression string to parse
 * @return a parsed SQL node
 * @throws SqlParserException if an exception is thrown when parsing the statement
 */
@Override
public SqlNode parseExpression(String sqlExpression) {
CalciteParser parser = calciteParserSupplier.get();
return parser.parseExpression(sqlExpression);
}


/**
 * Entry point for parsing SQL queries and return the abstract syntax tree
 *
 * @param statement the SQL statement to evaluate
 * @return abstract syntax tree
 * @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
 */
@Override
public SqlNode parseSql(String statement) {
CalciteParser parser = calciteParserSupplier.get();

// use parseSqlList here because we need to support statement end with ';' in sql client.
SqlNodeList sqlNodeList = parser.parseSqlList(statement);
 List<SqlNode> parsed = sqlNodeList.getList();
 Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
return parsed.get(0);
}           

5.2 新增SqlSelect類

複制Calcite源碼中的org.apache.calcite.sql.SqlSelect到項目下,新增上文提到的

addCondition()

addPermission()

buildWhereClause()

三個方法。 并且在構造方法中注釋掉原有的

this.where = where

行,并添加如下代碼:

// add row level filter condition for where clause
SqlNode rowFilterWhere = addCondition(from, where, false);
if (rowFilterWhere != where) {
 LOG.info("Rewritten SQL based on row-level privilege filtering for user [{}]", System.getProperty(EXECUTE_USERNAME));
}
this.where = rowFilterWhere;           

5.3 封裝SecurityContext類

建立SecurityContext類,主要添加下面三個方法:

/**
 * Add row-level filter conditions and return new SQL
 */
public String addRowFilter(String username, String singleSql) {
 System.setProperty(EXECUTE_USERNAME, username);

// in the modified SqlSelect, filter conditions will be added to the where clause
SqlNode parsedTree = tableEnv.getParser().parseSql(singleSql);
return parsedTree.toString();
}


/**
 * Query the configured permission point according to the user name and table name, and return
 * it to SqlBasicCall
 */
public SqlBasicCall queryPermissions(String username, String tableName) {
String permissions = rowLevelPermissions.get(username, tableName);
 LOG.info("username: {}, tableName: {}, permissions: {}", username, tableName, permissions);
if (permissions != ) {
return (SqlBasicCall) tableEnv.getParser().parseExpression(permissions);
 }
return ;
}


/**
 * Execute the single sql with user permissions
 */
public TableResult execute(String username, String singleSql) {
 System.setProperty(EXECUTE_USERNAME, username);
return tableEnv.executeSql(singleSql);
}
           

六、下一步計劃

  1. 1. 支援資料脫敏(Data Masking)
  2. 2. 開發ranger-flink-plugin

七、參考文獻

  1. 1. 資料管理DMS-敏感資料管理-行級管控[5]
  2. 2. Apache Ranger Row-level Filter[6]
  3. 3. OpenLooKeng的行級權限控制[7]
  4. 4. PostgreSQL中的行級權限/資料權限/行安全政策[8]
  5. 5. FlinkSQL字段血緣解決方案及源碼[9]
  6. 6. 基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL[10]

引用連結

[1]

[2]: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/row_level_filtering_in_hive_with_ranger_policies.html

[2]

[FlinkSQL字段血緣解決方案及源碼]: https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md

[3]

[6]: https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

[4]

[flink-sql-security/data/database]: https://github.com/HamaWhiteGG/flink-sql-security/tree/main/data/database

[5]

資料管理DMS-敏感資料管理-行級管控: https://help.aliyun.com/document_detail/161149.html

[6]

Apache Ranger Row-level Filter: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/row_level_filtering_in_hive_with_ranger_policies.html

[7]

OpenLooKeng的行級權限控制: https://www.modb.pro/db/212124

[8]

PostgreSQL中的行級權限/資料權限/行安全政策: https://www.kankanzhijian.com/2018/09/28/PostgreSQL-rowsecurity/

[9]

FlinkSQL字段血緣解決方案及源碼: https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md

[10]

基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL: https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

繼續閱讀