天天看點

hive周遊_MLSQL系列: Hive列權限控制

hive周遊_MLSQL系列: Hive列權限控制

大資料安全問題至關重要,基于Hive建立的資料倉庫也很普遍,提起Hive權限控制,首先能想到的可能就是Apache Ranger,通過攔截Hive Thrift Sever請求的方式,進行SQL解析與權限認證。但對于Spark SQL來講,更傾向于直接使用SQL通路Hive,而不是通過JDBC方式。是以對于Spark SQL通路Hive,就需要使用者自己去解決安全問題。今天,筆者以解析Spark SQL查詢計劃的方式,來提取庫表列的資訊,進而去做權限認證。

在開始之前,先講一下Spark SQL的邏輯計劃:

hive周遊_MLSQL系列: Hive列權限控制

邏輯算子樹的生成過程經曆三個階段:

  1. 由SparkSqlParser中的AstBuilder通路各種context節點,生成未解析的邏輯算子樹,此時未綁定資料資訊(資料源資訊、列資訊等);
  2. Analyzer在Unresolved LogicalPlan上應用一系列Rule,綁定資料資訊(資料源資訊、列資訊等),生成解析後的邏輯算子樹;
  3. 應用各種優化Rule在保證語義正确的前提下對一些低效的邏輯計劃進行轉換等,生成優化的邏輯算子樹。

下面通過執行一段SQL來感受下:

//表資訊CREATE TABLE if not exists respect.test(id string,name string,  age int,  dt string)//Spark SQL代碼  val spark = SparkSession      .builder()      .enableHiveSupport()      .master("local[*]")      .getOrCreate()    val sql =       """        |select name,        |       max_age,        |       "mlsql" mlsql        |  from(        |    select name,        |           max(age) max_age        |      from respect.test        |     where dt <= 20200501        |     group by name) t      """.stripMargin    val df = spark.sql(sql)    df.explain(true)
           

輸出:

== Parsed Logical Plan =='Project ['name, 'max_age, mlsql AS mlsql#1]+- 'SubqueryAlias `t`   +- 'Aggregate ['name], ['name, 'max('age) AS max_age#0]      +- 'Filter ('dt <= 20200501)         +- 'UnresolvedRelation `respect`.`test`== Analyzed Logical Plan ==name: string, max_age: int, mlsql: stringProject [name#12, max_age#0, mlsql AS mlsql#1]+- SubqueryAlias `t`   +- Aggregate [name#12], [name#12, max(age#13) AS max_age#0]      +- Filter (cast(dt#14 as int) <= 20200501)         +- SubqueryAlias `respect`.`test`            +- HiveTableRelation `respect`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#11, name#12, age#13, dt#14]== Optimized Logical Plan ==Aggregate [name#12], [name#12, max(age#13) AS max_age#0, mlsql AS mlsql#1]+- Project [name#12, age#13]   +- Filter (isnotnull(dt#14) && (cast(dt#14 as int) <= 20200501))      +- HiveTableRelation `respect`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#11, name#12, age#13, dt#14]
           

可以看出,通過分析解析後的邏輯計劃就可以提取出庫表列的資訊。請注意一點,在Project [name#18, max_age#8]中,#18中的18是name的exprId,後面會涉及到。 然後在了解一下Spark的TreeNode體系:

hive周遊_MLSQL系列: Hive列權限控制
hive周遊_MLSQL系列: Hive列權限控制

從上圖中可以看出,邏輯計劃屬于TreeNode體系,是以它可以使用TreeNode的所有方法。

下面來看MLSQL中是如何實作的(MLSQLDFParser類):

def extractTableWithColumns(df: DataFrame) = {    val tableAndCols = mutable.HashMap.empty[String, mutable.HashSet[String]]    val relationMap = new mutable.HashMap[Long, String]()    val analyzed = df.queryExecution.analyze
           

對于Hive表,Relation包含兩種:1. HiveTableRelation,2. 有catalogTable的LogicalRelation。 首先通過collectLeaves()找到所有的葉子節點(Relation一定是葉子節點),比對出以上兩種Relation。然後記錄每一個屬性(對應Hive中的列)的exprId與資料庫表的對應關系。 比如上述SQL:17->respect.test  (17為id的exprId)

(HiveTableRelation `respect`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#17, name#18, age#19, dt#20])

//隻對Hive表處理,臨時表不需要授權    analyzed.collectLeaves().foreach { lp =>      lp match {case r: HiveTableRelation =>          r.dataCols.foreach(c =>            relationMap.put(c.exprId.id              , r.tableMeta.identifier.toString().replaceAll("`", ""))          )case r: LogicalRelation =>          r.attributeMap.foreach(c =>if (r.catalogTable.nonEmpty) {              relationMap.put(c._2.exprId.id                , r.catalogTable.get.identifier.toString().replaceAll("`", ""))            }          )case _ =>      }    }
           

從分析的查詢計劃中可看出,SQL中使用的列(Select、Where、On等)都要經過Project或Aggregate,是以隻需要收集Project和Aggregate的屬性(對應Hive中的列)。 通過analyzed.map遞歸周遊所有節點,收集Project和Aggregate的屬性。

//如果查詢SQL中包含Hive表if (relationMap.nonEmpty) {     val neSet = mutable.HashSet.empty[NamedExpression]     analyzed.map { lp =>        lp match {case wowLp: Project =>            wowLp.projectList.map { item =>              item.collectLeaves().foreach(                _ match {case ne: NamedExpression => neSet.add(ne)case _ =>                }              )            }case wowLp: Aggregate =>            wowLp.aggregateExpressions.map { item =>              item.collectLeaves().foreach(                _ match {case ne: NamedExpression => neSet.add(ne)case _ =>                }              )            }case _ =>        }      }
           

exprId->資料庫表的對應關系與exprId->列名的對應關系做交集,就可以計算出資料庫表與查詢的所有列的對應關系。

//相當于記錄列的exprId與列名的對應關系      val neMap = neSet.zipWithIndex.map(ne => (ne._1.exprId.id ,ne._1)).toMap      relationMap.foreach { x =>if (neMap.contains(x._1)) {          val dbTable = x._2          val value = tableAndCols.getOrElse(dbTable, mutable.HashSet.empty[String])          value.add(neMap.get(x._1).get.name)          tableAndCols.update(dbTable, value)        }      }    }    tableAndCols  }
           

一個SQL的DataFrame傳進去,可以分析出庫表列的對應關系,然後就可以拿着這些資訊去做權限認證了。

println(extractTableWithColumns(df))//執行輸出的結果為:Map(respect.test -> Set(name, age))
           

其實通過分析Spark SQL查詢計劃還可以做很多其他的事情,比如名額的血緣分析、聚合分析、join分析等,或許還能算出個SQL複雜度、評估危險SQL,随便想一想哈

hive周遊_MLSQL系列: Hive列權限控制

(當然這裡面有一些要注意的地方,在做某些分析時,比如union、window等就需要特殊處理) 更多介紹請通路: http://docs.mlsql.tech/zh/ 如何加入MLSQL社群? 添加如下号碼,發送mlsql

hive周遊_MLSQL系列: Hive列權限控制