天天看點

FlinkSQL行級權限解決方案及源碼

FlinkSQL的行級權限解決方案及源碼,支援面向使用者級别的行級資料通路控制,即特定使用者隻能通路授權過的行,隐藏未授權的行資料。此方案是實時領域Flink的解決方案,類似離線數倉Hive中Ranger Row-level Filter方案。

源碼位址: https://github.com/HamaWhiteGG/flink-sql-security

注: 此方案已産品化內建到實時計算平台Dinky,歡迎試用。

一、基礎知識

1.1 行級權限

行級權限即橫向資料安全保護,可以解決不同人員隻允許通路不同資料行的問題。例如針對訂單表,使用者A隻能檢視到北京區域的資料,使用者B隻能檢視到杭州區域的資料。

FlinkSQL行級權限解決方案及源碼

1.2 業務流程

1.2.1 設定行級權限

管理者配置使用者、表、行級權限條件,例如下面的配置。

序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
2 使用者B orders region = ‘hangzhou’

1.2.2 使用者查詢資料

使用者在系統上查詢orders表的資料時,系統在底層查詢時會根據該使用者的行級權限條件來自動過濾資料,即讓行級權限生效。

當使用者A和使用者B在執行下面相同的SQL時,會檢視到不同的結果資料。

使用者A檢視到的結果資料是:

order_id order_date customer_name price product_id order_status region
10001 2020-07-30 10:08:22 Jack 50.50 102 false beijing
10002 2020-07-30 10:11:09 Sally 15.00 105 false beijing
注: 系統底層最終執行的SQL是:

SELECT * FROM orders WHERE region = 'beijing'

使用者B檢視到的結果資料是:

order_id order_date customer_name price product_id order_status region
10003 2020-07-30 12:00:30 Edward 25.25 106 false hangzhou
10004 2022-12-15 12:11:09 John 78.00 103 false hangzhou
注: 系統底層最終執行的SQL是:

SELECT * FROM orders WHERE region = 'hangzhou'

1.3 元件版本

元件名稱 版本 備注
Flink 1.16.0
Flink-connector-mysql-cdc 2.3.0

二、Hive行級權限解決方案

在離線數倉工具Hive領域,由于發展多年已有Ranger來支援表資料的行級權限控制,詳見參考文獻[2]。下圖是在Ranger裡配置Hive表行級過濾條件的頁面,供參考。

FlinkSQL行級權限解決方案及源碼

但由于Flink實時數倉領域發展相對較短,Ranger還不支援FlinkSQL,以及要依賴Ranger會導緻系統部署和運維過重,是以開始自研實時數倉的行級權限解決工具。

三、FlinkSQL行級權限解決方案

3.1 解決方案

3.1.1 FlinkSQL執行流程

可以參考作者文章[FlinkSQL字段血緣解決方案及源碼],本文根據Flink1.16修正和簡化後的執行流程如下圖所示。

FlinkSQL行級權限解決方案及源碼

在CalciteParser.parse()處理後會得到一個SqlNode類型的抽象文法樹(

Abstract Syntax Tree

,簡稱AST),本文會在Parse階段,通過組裝行級過濾條件生成新的AST來實作行級權限控制。

3.1.2 Calcite對象繼承關系

下面章節要用到Calcite中的SqlNode、SqlCall、SqlIdentifier、SqlJoin、SqlBasicCall和SqlSelect等類,此處進行簡單介紹以及展示它們間繼承關系,以便讀者閱讀本文源碼。

序号 介紹
1 SqlNode A SqlNode is a SQL parse tree.
2 SqlCall A SqlCall is a call to an SqlOperator operator.
3 SqlIdentifier A SqlIdentifier is an identifier, possibly compound.
4 SqlJoin Parse tree node representing a JOIN clause.
5 SqlBasicCall Implementation of SqlCall that keeps its operands in an array.
6 SqlSelect A SqlSelect is a node of a parse tree which represents a select statement.
FlinkSQL行級權限解決方案及源碼

3.1.3 解決思路

在Parser階段,如果執行的SQL包含對表的查詢操作,則一定會建構Calcite SqlSelect對象。是以限制表的行級權限,隻要在建構Calcite SqlSelect對象時對Where條件進行攔截即可,而不需要解析使用者執行的各種SQL來查找配置過行級權限條件限制的表。

在SqlSelect對象構造Where條件時,要通過執行使用者和表名來查找配置的行級權限條件,系統會把此條件用CalciteParser提供的

parseExpression(String sqlExpression)

方法解析生成一個SqlBacicCall再傳回。然後結合使用者執行的SQL和配置的行級權限條件重新組裝Where條件,即生成新的帶行級過濾條件Abstract Syntax Tree,最後基于新的AST再執行後續的Validate、Convert、Optimize和Execute階段。

FlinkSQL行級權限解決方案及源碼

以上整個過程對執行SQL的使用者都是透明和無感覺的,還是調用Flink自帶的TableEnvironment.executeSql(String statement)方法即可。

注: 要通過技術手段把執行使用者傳遞到Calcite SqlSelect中。

3.2 重寫SQL

主要在org.apache.calcite.sql.SqlSelect的構造方法中完成。

3.2.1 主要流程

主流程如下圖所示,根據From的類型進行不同的操作,例如針對SqlJoin類型,要分别周遊其left和right節點,而且要支援遞歸操作以便支援三張表及以上JOIN;針對SqlIdentifier類型,要額外判斷下是否來自JOIN,如果是的話且JOIN時且未定義表别名,則用表名作為别名;針對SqlBasicCall類型,如果來自于子查詢,說明已在子查詢中組裝過行級權限條件,則直接傳回目前Where即可,否則分别取出表名和别名。

然後再擷取行級權限條件解析後生成SqlBacicCall類型的Permissions,并給Permissions增加别名,最後把已有Where和Permissions進行組裝生成新的Where,來作為SqlSelect對象的Where限制。

FlinkSQL行級權限解決方案及源碼

上述流程圖的各個分支,都會在下面的用例測試章節中會舉例說明。

3.2.2 核心源碼

核心源碼位于SqlSelect中新增的

addCondition()

addPermission()

buildWhereClause()

三個方法,下面隻給出控制主流程

addCondition()

的源碼。

/**
 * The main process of controlling row-level permissions
 */
private SqlNode addCondition(SqlNode from, SqlNode where, boolean fromJoin) {
    if (from instanceof SqlIdentifier) {
        String tableName = from.toString();
        // the table name is used as an alias for join
        String tableAlias = fromJoin ? tableName : null;
        return addPermission(where, tableName, tableAlias);
    } else if (from instanceof SqlJoin) {
        SqlJoin sqlJoin = (SqlJoin) from;
        // support recursive processing, such as join for three tables, process left sqlNode
        where = addCondition(sqlJoin.getLeft(), where, true);
        // process right sqlNode
        return addCondition(sqlJoin.getRight(), where, true);
    } else if (from instanceof SqlBasicCall) {
        // Table has an alias or comes from a subquery
        SqlNode[] tableNodes = ((SqlBasicCall) from).getOperands();
        /**
         * If there is a subquery in the Join, row-level filtering has been appended to the subquery.
         * What is returned here is the SqlSelect type, just return the original where directly
         */
        if (!(tableNodes[0] instanceof SqlIdentifier)) {
            return where;
        }
        String tableName = tableNodes[0].toString();
        String tableAlias = tableNodes[1].toString();
        return addPermission(where, tableName, tableAlias);
    }
    return where;
}
           

四、用例測試

用例測試資料來自于CDC Connectors for Apache Flink

[6]官網,在此表示感謝。下載下傳本文源碼後,可通過Maven運作單元測試。

$ cd flink-sql-security
$ mvn test
           

4.1 建立Mysql表及初始化資料

Mysql建立表語句及初始化資料SQL詳見源碼[flink-sql-security/data/database]裡面的mysql_ddl.sql和mysql_init.sql檔案,本文給

orders

表增加一個region字段。

4.2 建立Flink表

4.2.1 建立mysql cdc類型的orders表

DROP TABLE IF EXISTS orders;

CREATE TABLE IF NOT EXISTS orders (
    order_id INT PRIMARY KEY NOT ENFORCED,
    order_date TIMESTAMP(0),
    customer_name STRING,
    product_id INT,
    price DECIMAL(10, 5),
    order_status BOOLEAN,
    region STRING
) WITH (
    'connector'='mysql-cdc',
    'hostname'='xxx.xxx.xxx.xxx',
    'port'='3306',
    'username'='root',
    'password'='xxx',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='demo',
    'table-name'='orders'
);
           

4.2.2 建立mysql cdc類型的products表

DROP TABLE IF EXISTS products;

CREATE TABLE IF NOT EXISTS products (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING,
    description STRING
) WITH (
    'connector'='mysql-cdc',
    'hostname'='xxx.xxx.xxx.xxx',
    'port'='3306',
    'username'='root',
    'password'='xxx',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='demo',
    'table-name'='products'
);
           

4.2.3 建立mysql cdc類型shipments表

DROP TABLE IF EXISTS shipments;

CREATE TABLE IF NOT EXISTS shipments (
    shipment_id INT PRIMARY KEY NOT ENFORCED,
    order_id INT,
    origin STRING,
    destination STRING,
    is_arrived BOOLEAN
) WITH (
    'connector'='mysql-cdc',
    'hostname'='xxx.xxx.xxx.xxx',
    'port'='3306',
    'username'='root',
    'password'='xxx',
    'server-time-zone'='Asia/Shanghai',
    'database-name'='demo',
    'table-name'='shipments'
);
           

4.2.4 建立print類型print_sink表

DROP TABLE IF EXISTS print_sink;

CREATE TABLE IF NOT EXISTS print_sink (
    order_id INT PRIMARY KEY NOT ENFORCED,
    order_date TIMESTAMP(0),
    customer_name STRING,
    product_id INT,
    price DECIMAL(10, 5),
    order_status BOOLEAN,
    region STRING
) WITH (
    'connector'='print'
);
           

4.3 測試用例

詳細測試用例可檢視源碼中的單測,下面隻描述部分測試點。

4.3.1 簡單SELECT

4.3.1.1 行級權限條件
序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
4.3.1.2 輸入SQL
4.3.1.3 輸出SQL
4.3.1.4 測試小結

輸入SQL中沒有WHERE條件,隻需要把行級過濾條件

region = 'beijing'

追加到WHERE後即可。

4.3.2 SELECT帶複雜WHERE限制

4.3.2.1 行級權限條件
序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
4.3.2.2 輸入SQL
4.3.2.3 輸出SQL
4.3.2.4 測試小結

輸入SQL中有兩個限制條件,中間用的是OR,是以在組裝

region = 'beijing'

時,要給已有的

price > 45.0 OR customer_name = 'John'

增加括号。

4.3.3 兩表JOIN且含子查詢

4.3.3.1 行級權限條件
序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
4.3.3.2 輸入SQL
SELECT
    o.*,
    p.name,
    p.description
FROM 
    (SELECT
        *
     FROM 
        orders
     WHERE 
        order_status = FALSE
    ) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
    o.price > 45.0 OR o.customer_name = 'John' 
           
4.3.3.3 輸出SQL
SELECT
    o.*,
    p.name,
    p.description
FROM 
    (SELECT
        *
     FROM 
        orders
     WHERE 
        order_status = FALSE AND region = 'beijing'
    ) AS o
LEFT JOIN products AS p ON o.product_id = p.id
WHERE
    o.price > 45.0 OR o.customer_name = 'John' 
           
4.3.3.4 測試小結

針對比較複雜的SQL,例如兩表在JOIN時且其中左表來自于子查詢

SELECT * FROM orders WHERE order_status = FALSE

,行級過濾條件

region = 'beijing'

隻會追加到子查詢的裡面。

4.3.4 三表JOIN

4.3.4.1 行級權限條件
序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
2 使用者A products name = ‘hammer’
3 使用者A shipments is_arrived = FALSE
4.3.4.2 輸入SQL
SELECT
  o.*,
  p.name,
  p.description,
  s.shipment_id,
  s.origin,
  s.destination,
  s.is_arrived
FROM
  orders AS o
  LEFT JOIN products AS p ON o.product_id=p.id
  LEFT JOIN shipments AS s ON o.order_id=s.order_id;
           
4.3.4.3 輸出SQL
SELECT
  o.*,
  p.name,
  p.description,
  s.shipment_id,
  s.origin,
  s.destination,
  s.is_arrived
FROM
  orders AS o
  LEFT JOIN products AS p ON o.product_id=p.id
  LEFT JOIN shipments AS s ON o.order_id=s.order_id
WHERE
  o.region='beijing'
  AND p.name='hammer'
  AND s.is_arrived=FALSE;
           
4.3.4.4 測試小結

三張表進行JOIN時,會分别擷取

orders

products

shipments

三張表的行級權限條件:

region = 'beijing'

name = 'hammer'

is_arrived = FALSE

,然後增加

orders

表的别名o、

products

表的别名p、

shipments

表的别名s,最後組裝到WHERE子句後面。

4.3.5 INSERT來自帶子查詢的SELECT

4.3.5.1 行級權限條件
序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
4.3.5.2 輸入SQL
4.3.5.3 輸出SQL
4.3.5.4 測試小結

無論運作SQL類型是INSERT、SELECT或者其他,隻會找到查詢

oders

表的子句,然後對其組裝行級權限條件。

4.3.6 運作SQL

測試兩個不同使用者執行相同的SQL,兩個使用者的行級權限條件不一樣。

4.3.6.1 行級權限條件
序号 使用者名 表名 行級權限條件
1 使用者A orders region = ‘beijing’
2 使用者B orders region = ‘hangzhou’
4.3.6.2 輸入SQL
4.3.6.3 執行SQL

使用者A的真實執行SQL:

使用者B的真實執行SQL:

4.3.6.4 測試小結

使用者調用下面的執行方法,除傳遞要執行的SQL參數外,隻需要額外指定執行的使用者即可,便能自動按照行級權限限制來執行。

/**
 * Execute the single sql with user permissions
 */
public TableResult execute(String username, String singleSql) {
    System.setProperty(EXECUTE_USERNAME, username);
    return tableEnv.executeSql(singleSql);
}
           

五、源碼修改步驟

注: Flink版本1.16.0依賴的Calcite是1.26.0版本。

5.1 新增Parser和ParserImpl類

複制Flink源碼中的org.apache.flink.table.delegation.Parser和org.apache.flink.table.planner.delegation.ParserImpl到項目下,新增下面兩個方法及實作。

/**
 * Parses a SQL expression into a {@link SqlNode}. The {@link SqlNode} is not yet validated.
 *
 * @param sqlExpression a SQL expression string to parse
 * @return a parsed SQL node
 * @throws SqlParserException if an exception is thrown when parsing the statement
 */
@Override
public SqlNode parseExpression(String sqlExpression) {
    CalciteParser parser = calciteParserSupplier.get();
    return parser.parseExpression(sqlExpression);
}


/**
 * Entry point for parsing SQL queries and return the abstract syntax tree
 *
 * @param statement the SQL statement to evaluate
 * @return abstract syntax tree
 * @throws org.apache.flink.table.api.SqlParserException when failed to parse the statement
 */
@Override
public SqlNode parseSql(String statement) {
    CalciteParser parser = calciteParserSupplier.get();

    // use parseSqlList here because we need to support statement end with ';' in sql client.
    SqlNodeList sqlNodeList = parser.parseSqlList(statement);
    List<SqlNode> parsed = sqlNodeList.getList();
    Preconditions.checkArgument(parsed.size() == 1, "only single statement supported");
    return parsed.get(0);
}
           

5.2 新增SqlSelect類

複制Calcite源碼中的org.apache.calcite.sql.SqlSelect到項目下,新增上文提到的

addCondition()

addPermission()

buildWhereClause()

三個方法。

并且在構造方法中注釋掉原有的

this.where = where

行,并添加如下代碼:

// add row level filter condition for where clause
SqlNode rowFilterWhere = addCondition(from, where, false);
if (rowFilterWhere != where) {
    LOG.info("Rewritten SQL based on row-level privilege filtering for user [{}]", System.getProperty(EXECUTE_USERNAME));
}
this.where = rowFilterWhere;
           

5.3 封裝SecurityContext類

建立SecurityContext類,主要添加下面三個方法:

/**
 * Add row-level filter conditions and return new SQL
 */
public String addRowFilter(String username, String singleSql) {
    System.setProperty(EXECUTE_USERNAME, username);

    // in the modified SqlSelect, filter conditions will be added to the where clause
    SqlNode parsedTree = tableEnv.getParser().parseSql(singleSql);
    return parsedTree.toString();
}


/**
 * Query the configured permission point according to the user name and table name, and return
 * it to SqlBasicCall
 */
public SqlBasicCall queryPermissions(String username, String tableName) {
    String permissions = rowLevelPermissions.get(username, tableName);
    LOG.info("username: {}, tableName: {}, permissions: {}", username, tableName, permissions);
    if (permissions != null) {
        return (SqlBasicCall) tableEnv.getParser().parseExpression(permissions);
    }
    return null;
}


/**
 * Execute the single sql with user permissions
 */
public TableResult execute(String username, String singleSql) {
    System.setProperty(EXECUTE_USERNAME, username);
    return tableEnv.executeSql(singleSql);
}

           

六、下一步計劃

  1. 支援資料脫敏(Data Masking)
  2. 開發ranger-flink-plugin

七、參考文獻

  1. 資料管理DMS-敏感資料管理-行級管控
  2. Apache Ranger Row-level Filter
  3. OpenLooKeng的行級權限控制
  4. PostgreSQL中的行級權限/資料權限/行安全政策
  5. FlinkSQL字段血緣解決方案及源碼
  6. 基于 Flink CDC 建構 MySQL 和 Postgres 的 Streaming ETL

繼續閱讀