flink-sql-security
FlinkSQL的行級權限解決方案及源碼,支援面向使用者級别的行級資料通路控制,即特定使用者隻能通路授權過的行,隐藏未授權的行資料。此方案是實時領域Flink的解決方案,類似離線數倉Hive中Ranger Row-level Filter方案。
序号 | 作者 | 版本 | 時間 | 備注 |
1 | HamaWhite | 1.0.0 | 2022-12-15 | 1. 增加文檔和源碼 |
源碼位址: https://github.com/HamaWhiteGG/flink-sql-security
一、基礎知識
1.1 行級權限
行級權限即橫向資料安全保護,可以解決不同人員隻允許通路不同資料行的問題。例如針對訂單表,使用者A隻能檢視到北京區域的資料,使用者B隻能檢視到杭州區域的資料。
1.2 業務流程
1.2.1 設定行級權限
管理者配置使用者、表、行級權限條件,例如下面的配置。
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
2 | 使用者B | orders | region = 'hangzhou' |
1.2.2 使用者查詢資料
使用者在系統上查詢orders表的資料時,系統在底層查詢時會根據該使用者的行級權限條件來自動過濾資料,即讓行級權限生效。
當使用者A和使用者B在執行下面相同的SQL時,會檢視到不同的結果資料。
SELECT * FROM orders;
使用者A檢視到的結果資料是:
order_id | order_date | customer_name | price | product_id | order_status | region |
10001 | 2020-07-30 10:08:22 | Jack | 50.50 | 102 | false | beijing |
10002 | 2020-07-30 10:11:09 | Sally | 15.00 | 105 | false | beijing |
注: 系統底層最終執行的SQL是: SELECT * FROM orders WHERE region = 'beijing'
。
使用者B檢視到的結果資料是:
order_id | order_date | customer_name | price | product_id | order_status | region |
10003 | 2020-07-30 12:00:30 | Edward | 25.25 | 106 | false | hangzhou |
10004 | 2022-12-15 12:11:09 | John | 78.00 | 103 | false | hangzhou |
注: 系統底層最終執行的SQL是: SELECT * FROM orders WHERE region = 'hangzhou'
。
1.3 元件版本
元件名稱 | 版本 | 備注 |
Flink | 1.16.0 | |
Flink-connector-mysql-cdc | 2.3.0 |
二、Hive行級權限解決方案
在離線數倉工具Hive領域,由于發展多年已有Ranger來支援表資料的行級權限控制,詳見參考文獻[2][1]。下圖是在Ranger裡配置Hive表行級過濾條件的頁面,供參考。
但由于Flink實時數倉領域發展相對較短,Ranger還不支援FlinkSQL,以及要依賴Ranger會導緻系統部署和運維過重,是以開始自研實時數倉的行級權限解決工具。
三、FlinkSQL行級權限解決方案
3.1 解決方案
3.1.1 FlinkSQL執行流程
可以參考作者文章[FlinkSQL字段血緣解決方案及源碼][2],本文根據Flink1.16修正和簡化後的執行流程如下圖所示。
在CalciteParser.parse()處理後會得到一個SqlNode類型的抽象文法樹(
Abstract Syntax Tree
,簡稱AST),本文會在Parse階段,通過組裝行級過濾條件生成新的AST來實作行級權限控制。
3.1.2 Calcite對象繼承關系
下面章節要用到Calcite中的SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall和SqlSelect等類,此處進行簡單介紹以及展示它們間繼承關系,以便讀者閱讀本文源碼。
序号 | 類 | 介紹 |
1 | SqlNode | A SqlNode is a SQL parse tree. |
2 | SqlCall | A SqlCall is a call to an SqlOperator operator. |
3 | SqlIdentifier | A SqlIdentifier is an identifier, possibly compound. |
4 | SqlJoin | Parse tree node representing a JOIN clause. |
5 | SqlBasicCall | Implementation of SqlCall that keeps its operands in an array. |
6 | SqlSelect | A SqlSelect is a node of a parse tree which represents a select statement. |
3.1.3 解決思路
在Parser階段,如果執行的SQL包含對表的查詢操作,則一定會建構Calcite SqlSelect對象。是以限制表的行級權限,隻要在建構Calcite SqlSelect對象時對Where條件進行攔截即可,而不需要解析使用者執行的各種SQL來查找配置過行級權限條件限制的表。
在SqlSelect對象構造Where條件時,要通過執行使用者和表名來查找配置的行級權限條件,系統會把此條件用CalciteParser提供的
parseExpression(String sqlExpression)
方法解析生成一個SqlBacicCall再傳回。然後結合使用者執行的SQL和配置的行級權限條件重新組裝Where條件,即生成新的帶行級過濾條件Abstract Syntax Tree,最後基于新的AST再執行後續的Validate、Convert、Optimize和Execute階段。
以上整個過程對執行SQL的使用者都是透明和無感覺的,還是調用Flink自帶的TableEnvironment.executeSql(String statement)方法即可。
注: 要通過技術手段把執行使用者傳遞到Calcite SqlSelect中。
3.2 重寫SQL
主要在org.apache.calcite.sql.SqlSelect的構造方法中完成。
3.2.1 主要流程
主流程如下圖所示,根據From的類型進行不同的操作,例如針對SqlJoin類型,要分别周遊其left和right節點,而且要支援遞歸操作以便支援三張表及以上JOIN;針對SqlIdentifier類型,要額外判斷下是否來自JOIN,如果是的話且JOIN時且未定義表别名,則用表名作為别名;針對SqlBasicCall類型,如果來自于子查詢,說明已在子查詢中組裝過行級權限條件,則直接傳回目前Where即可,否則分别取出表名和别名。
然後再擷取行級權限條件解析後生成SqlBacicCall類型的Permissions,并給Permissions增加别名,最後把已有Where和Permissions進行組裝生成新的Where,來作為SqlSelect對象的Where限制。
上述流程圖的各個分支,都會在下面的用例測試章節中會舉例說明。
3.2.2 核心源碼
核心源碼位于SqlSelect中新增的
addCondition()
、
addPermission()
、
buildWhereClause()
三個方法,下面隻給出控制主流程
addCondition()
的源碼。
/**
* The main process of controlling row-level permissions
*/
private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {
if (from instanceof SqlIdentifier) {
String tableName = from.toString();
// the table name is used as an alias for join
String tableAlias = fromJoin ? tableName : ;
return addPermission(where, tableName, tableAlias);
} else if (from instanceof SqlJoin) {
SqlJoin sqlJoin = (SqlJoin) from;
// support recursive processing, such as join for three tables, process left sqlNode
where = addCondition(sqlJoin.getLeft(), where, true);
// process right sqlNode
return addCondition(sqlJoin.getRight(), where, true);
} else if (from instanceof SqlBasicCall) {
// Table has an alias or comes from a subquery
SqlNode[] tableNodes = ((SqlBasicCall) from).getOperands();
/**
* If there is a subquery in the Join, row-level filtering has been appended to the subquery.
* What is returned here is the SqlSelect type, just return the original where directly
*/
if (!(tableNodes[0] instanceof SqlIdentifier)) {
return where;
}
String tableName = tableNodes[0].toString();
String tableAlias = tableNodes[1].toString();
return addPermission(where, tableName, tableAlias);
}
return where;
}
四、用例測試
用例測試資料來自于CDC Connectors for Apache Flink [6][3]官網,在此表示感謝。下載下傳本文源碼後,可通過Maven運作單元測試。
$ cd flink-sql-security
$ mvn test
4.1 建立Mysql表及初始化資料
Mysql建立表語句及初始化資料SQL詳見源碼[flink-sql-security/data/database][4]裡面的mysql_ddl.sql和mysql_init.sql檔案,本文給
orders
表增加一個region字段。
4.2 建立Flink表
4.2.1 建立mysql cdc類型的orders表
DROP TABLE IF EXISTS orders;
CREATE TABLE IF NOT EXISTS orders (
order_id INT PRIMARY KEY NOT ENFORCED,
order_date TIMESTAMP(0),
customer_name STRING,
product_id INT,
price DECIMAL(10, 5),
order_status BOOLEAN,
region STRING
) WITH (
'connector'='mysql-cdc',
'hostname'='xxx.xxx.xxx.xxx',
'port'='3306',
'username'='root',
'password'='xxx',
'server-time-zone'='Asia/Shanghai',
'database-name'='demo',
'table-name'='orders'
);
4.2.2 建立mysql cdc類型的products表
DROP TABLE IF EXISTS products;
CREATE TABLE IF NOT EXISTS products (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
description STRING
) WITH (
'connector'='mysql-cdc',
'hostname'='xxx.xxx.xxx.xxx',
'port'='3306',
'username'='root',
'password'='xxx',
'server-time-zone'='Asia/Shanghai',
'database-name'='demo',
'table-name'='products'
);
4.2.3 建立mysql cdc類型shipments表
DROP TABLE IF EXISTS shipments;
CREATE TABLE IF NOT EXISTS shipments (
shipment_id INT PRIMARY KEY NOT ENFORCED,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector'='mysql-cdc',
'hostname'='xxx.xxx.xxx.xxx',
'port'='3306',
'username'='root',
'password'='xxx',
'server-time-zone'='Asia/Shanghai',
'database-name'='demo',
'table-name'='shipments'
);
4.2.4 建立print類型print_sink表
DROP TABLE IF EXISTS print_sink;
CREATE TABLE IF NOT EXISTS print_sink (
order_id INT PRIMARY KEY NOT ENFORCED,
order_date TIMESTAMP(0),
customer_name STRING,
product_id INT,
price DECIMAL(10, 5),
order_status BOOLEAN,
region STRING
) WITH (
'connector'='print'
);
4.3 測試用例
詳細測試用例可檢視源碼中的單測,下面隻描述部分測試點。
4.3.1 簡單SELECT
4.3.1.1 行級權限條件
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
4.3.1.2 輸入SQL
SELECT * FROM orders;
4.3.1.3 輸出SQL
SELECT * FROM orders WHERE region = 'beijing';
4.3.1.4 測試小結
輸入SQL中沒有WHERE條件,隻需要把行級過濾條件
region = 'beijing'
追加到WHERE後即可。
4.3.2 SELECT帶複雜WHERE限制
4.3.2.1 行級權限條件
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
4.3.2.2 輸入SQL
SELECT * FROM orders WHERE price > 45.0 OR customer_name = 'John';
4.3.2.3 輸出SQL
SELECT * FROM orders WHERE (price > 45.0 OR customer_name = 'John') AND region = 'beijing';
4.3.2.4 測試小結
輸入SQL中有兩個限制條件,中間用的是OR,是以在組裝
region = 'beijing'
時,要給已有的
price > 45.0 OR customer_name = 'John'
增加括号。
4.3.3 兩表JOIN且含子查詢
4.3.3.1 行級權限條件
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
4.3.3.2 輸入SQL
SELECT
o.*,
p.name,
p.description
FROM
(SELECT
*
FROM
orders
WHERE
order_status = FALSE
) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
o.price > 45.0 OR o.customer_name = 'John'
4.3.3.3 輸出SQL
SELECT
o.*,
p.name,
p.description
FROM
(SELECT
*
FROM
orders
WHERE
order_status = FALSE AND region = 'beijing'
) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
o.price > 45.0 OR o.customer_name = 'John'
4.3.3.4 測試小結
針對比較複雜的SQL,例如兩表在JOIN時且其中左表來自于子查詢
SELECT * FROM orders WHERE order_status = FALSE
,行級過濾條件
region = 'beijing'
隻會追加到子查詢的裡面。
4.3.4 三表JOIN
4.3.4.1 行級權限條件
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
2 | 使用者A | products | name = 'hammer' |
3 | 使用者A | shipments | is_arrived = FALSE |
4.3.4.2 輸入SQL
SELECT
o.*,
p.name,
p.description,
s.shipment_id,
s.origin,
s.destination,
s.is_arrived
FROM
orders AS o
LEFT JOIN products AS p ON o.product_id=p.id
LEFT JOIN shipments AS s ON o.order_id=s.order_id;
4.3.4.3 輸出SQL
SELECT
o.*,
p.name,
p.description,
s.shipment_id,
s.origin,
s.destination,
s.is_arrived
FROM
orders AS o
LEFT JOIN products AS p ON o.product_id=p.id
LEFT JOIN shipments AS s ON o.order_id=s.order_id
WHERE
o.region='beijing'
AND p.name='hammer'
AND s.is_arrived=FALSE;
4.3.4.4 測試小結
三張表進行JOIN時,會分别擷取
orders
、
products
、
shipments
三張表的行級權限條件:
region = 'beijing'
、
name = 'hammer'
和
is_arrived = FALSE
,然後增加
orders
表的别名o、
products
表的别名p、
shipments
表的别名s,最後組裝到WHERE子句後面。
4.3.5 INSERT來自帶子查詢的SELECT
4.3.5.1 行級權限條件
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
4.3.5.2 輸入SQL
INSERT INTO print_sink SELECT * FROM (SELECT * FROM orders);
4.3.5.3 輸出SQL
INSERT INTO print_sink (SELECT * FROM (SELECT * FROM orders WHERE region = 'beijing'));
4.3.5.4 測試小結
無論運作SQL類型是INSERT、SELECT或者其他,隻會找到查詢
oders
表的子句,然後對其組裝行級權限條件。
4.3.6 運作SQL
測試兩個不同使用者執行相同的SQL,兩個使用者的行級權限條件不一樣。
4.3.6.1 行級權限條件
序号 | 使用者名 | 表名 | 行級權限條件 |
1 | 使用者A | orders | region = 'beijing' |
2 | 使用者B | orders | region = 'hangzhou' |
4.3.6.2 輸入SQL
SELECT * FROM orders;
4.3.6.3 執行SQL
使用者A的真實執行SQL:
SELECT * FROM orders WHERE region = 'beijing';
使用者B的真實執行SQL:
SELECT * FROM orders WHERE region = 'hangzhou';
4.3.6.4 測試小結
使用者調用下面的執行方法,除傳遞要執行的SQL參數外,隻需要額外指定執行的使用者即可,便能自動按照行級權限限制來執行。
/**
* Execute the single sql with user permissions
*/
public TableResult execute(String username, String singleSql) {
System.setProperty(EXECUTE_USERNAME, username);
return tableEnv.executeSql(singleSql);
}
五、源碼修改步驟
注: Flink版本1.16.0依賴的Calcite是1.26.0版本。
5.1 新增Parser和ParserImpl類
複制Flink源碼中的org.apache.flink.table.delegation.Parser和org.apache.flink.table.planner.delegation.ParserImpl到項目下,新增下面兩個方法及實作。
/**
* Parses a SQL expression into a {@link SqlNode}. The {@link SqlNode} is not yet validated.
*
* @param sqlExpression a SQL expression string to parse
* @return a parsed SQL node
* @throws SqlParserException if an exception is thrown when parsing the statement
*/
@Override
public SqlNode parseExpression(String sqlExpression) {
CalciteParser parser = calciteParserSupplier.get();
return parser.parseExpression(sqlExpression);
}
/**
* Entry point for parsing SQL queries and return the abstract syntax tree
*
* @param statement the SQL statement to evaluate
* @return abstract syntax tree
* @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
*/
@Override
public SqlNode parseSql(String statement) {
CalciteParser parser = calciteParserSupplier.get();
// use parseSqlList here because we need to support statement end with ';' in sql client.
SqlNodeList sqlNodeList = parser.parseSqlList(statement);
List<SqlNode> parsed = sqlNodeList.getList();
Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
return parsed.get(0);
}
5.2 新增SqlSelect類
複制Calcite源碼中的org.apache.calcite.sql.SqlSelect到項目下,新增上文提到的
addCondition()
、
addPermission()
、
buildWhereClause()
三個方法。 并且在構造方法中注釋掉原有的
this.where = where
行,并添加如下代碼:
// add row level filter condition for where clause
SqlNode rowFilterWhere = addCondition(from, where, false);
if (rowFilterWhere != where) {
LOG.info("Rewritten SQL based on row-level privilege filtering for user [{}]", System.getProperty(EXECUTE_USERNAME));
}
this.where = rowFilterWhere;
5.3 封裝SecurityContext類
建立SecurityContext類,主要添加下面三個方法:
/**
* Add row-level filter conditions and return new SQL
*/
public String addRowFilter(String username, String singleSql) {
System.setProperty(EXECUTE_USERNAME, username);
// in the modified SqlSelect, filter conditions will be added to the where clause
SqlNode parsedTree = tableEnv.getParser().parseSql(singleSql);
return parsedTree.toString();
}
/**
* Query the configured permission point according to the user name and table name, and return
* it to SqlBasicCall
*/
public SqlBasicCall queryPermissions(String username, String tableName) {
String permissions = rowLevelPermissions.get(username, tableName);
LOG.info("username: {}, tableName: {}, permissions: {}", username, tableName, permissions);
if (permissions != ) {
return (SqlBasicCall) tableEnv.getParser().parseExpression(permissions);
}
return ;
}
/**
* Execute the single sql with user permissions
*/
public TableResult execute(String username, String singleSql) {
System.setProperty(EXECUTE_USERNAME, username);
return tableEnv.executeSql(singleSql);
}
六、下一步計劃
- 1. 支援資料脫敏(Data Masking)
- 2. 開發ranger-flink-plugin
七、參考文獻
- 1. 資料管理DMS-敏感資料管理-行級管控[5]
- 2. Apache Ranger Row-level Filter[6]
- 3. OpenLooKeng的行級權限控制[7]
- 4. PostgreSQL中的行級權限/資料權限/行安全政策[8]
- 5. FlinkSQL字段血緣解決方案及源碼[9]
- 6. 基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL[10]
引用連結
[1]
[2]: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/row_level_filtering_in_hive_with_ranger_policies.html
[2]
[FlinkSQL字段血緣解決方案及源碼]: https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md
[3]
[6]: https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html
[4]
[flink-sql-security/data/database]: https://github.com/HamaWhiteGG/flink-sql-security/tree/main/data/database
[5]
資料管理DMS-敏感資料管理-行級管控: https://help.aliyun.com/document_detail/161149.html
[6]
Apache Ranger Row-level Filter: https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.0/authorization-ranger/content/row_level_filtering_in_hive_with_ranger_policies.html
[7]
OpenLooKeng的行級權限控制: https://www.modb.pro/db/212124
[8]
PostgreSQL中的行級權限/資料權限/行安全政策: https://www.kankanzhijian.com/2018/09/28/PostgreSQL-rowsecurity/
[9]
FlinkSQL字段血緣解決方案及源碼: https://github.com/HamaWhiteGG/flink-sql-lineage/blob/main/README_CN.md
[10]
基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL: https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html