大資料安全問題至關重要,基于Hive建立的資料倉庫也很普遍,提起Hive權限控制,首先能想到的可能就是Apache Ranger,通過攔截Hive Thrift Sever請求的方式,進行SQL解析與權限認證。但對于Spark SQL來講,更傾向于直接使用SQL通路Hive,而不是通過JDBC方式。是以對于Spark SQL通路Hive,就需要使用者自己去解決安全問題。今天,筆者以解析Spark SQL查詢計劃的方式,來提取庫表列的資訊,進而去做權限認證。
在開始之前,先講一下Spark SQL的邏輯計劃:
邏輯算子樹的生成過程經曆三個階段:
- 由SparkSqlParser中的AstBuilder通路各種context節點,生成未解析的邏輯算子樹,此時未綁定資料資訊(資料源資訊、列資訊等);
- Analyzer在Unresolved LogicalPlan上應用一系列Rule,綁定資料資訊(資料源資訊、列資訊等),生成解析後的邏輯算子樹;
- 應用各種優化Rule在保證語義正确的前提下對一些低效的邏輯計劃進行轉換等,生成優化的邏輯算子樹。
下面通過執行一段SQL來感受下:
//表資訊CREATE TABLE if not exists respect.test(id string,name string, age int, dt string)//Spark SQL代碼 val spark = SparkSession .builder() .enableHiveSupport() .master("local[*]") .getOrCreate() val sql = """ |select name, | max_age, | "mlsql" mlsql | from( | select name, | max(age) max_age | from respect.test | where dt <= 20200501 | group by name) t """.stripMargin val df = spark.sql(sql) df.explain(true)
輸出:
== Parsed Logical Plan =='Project ['name, 'max_age, mlsql AS mlsql#1]+- 'SubqueryAlias `t` +- 'Aggregate ['name], ['name, 'max('age) AS max_age#0] +- 'Filter ('dt <= 20200501) +- 'UnresolvedRelation `respect`.`test`== Analyzed Logical Plan ==name: string, max_age: int, mlsql: stringProject [name#12, max_age#0, mlsql AS mlsql#1]+- SubqueryAlias `t` +- Aggregate [name#12], [name#12, max(age#13) AS max_age#0] +- Filter (cast(dt#14 as int) <= 20200501) +- SubqueryAlias `respect`.`test` +- HiveTableRelation `respect`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#11, name#12, age#13, dt#14]== Optimized Logical Plan ==Aggregate [name#12], [name#12, max(age#13) AS max_age#0, mlsql AS mlsql#1]+- Project [name#12, age#13] +- Filter (isnotnull(dt#14) && (cast(dt#14 as int) <= 20200501)) +- HiveTableRelation `respect`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#11, name#12, age#13, dt#14]
可以看出,通過分析解析後的邏輯計劃就可以提取出庫表列的資訊。請注意一點,在Project [name#18, max_age#8]中,#18中的18是name的exprId,後面會涉及到。 然後在了解一下Spark的TreeNode體系:
從上圖中可以看出,邏輯計劃屬于TreeNode體系,是以它可以使用TreeNode的所有方法。
下面來看MLSQL中是如何實作的(MLSQLDFParser類):
def extractTableWithColumns(df: DataFrame) = { val tableAndCols = mutable.HashMap.empty[String, mutable.HashSet[String]] val relationMap = new mutable.HashMap[Long, String]() val analyzed = df.queryExecution.analyze
對于Hive表,Relation包含兩種:1. HiveTableRelation,2. 有catalogTable的LogicalRelation。 首先通過collectLeaves()找到所有的葉子節點(Relation一定是葉子節點),比對出以上兩種Relation。然後記錄每一個屬性(對應Hive中的列)的exprId與資料庫表的對應關系。 比如上述SQL:17->respect.test (17為id的exprId)
(HiveTableRelation `respect`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#17, name#18, age#19, dt#20])
//隻對Hive表處理,臨時表不需要授權 analyzed.collectLeaves().foreach { lp => lp match {case r: HiveTableRelation => r.dataCols.foreach(c => relationMap.put(c.exprId.id , r.tableMeta.identifier.toString().replaceAll("`", "")) )case r: LogicalRelation => r.attributeMap.foreach(c =>if (r.catalogTable.nonEmpty) { relationMap.put(c._2.exprId.id , r.catalogTable.get.identifier.toString().replaceAll("`", "")) } )case _ => } }
從分析的查詢計劃中可看出,SQL中使用的列(Select、Where、On等)都要經過Project或Aggregate,是以隻需要收集Project和Aggregate的屬性(對應Hive中的列)。 通過analyzed.map遞歸周遊所有節點,收集Project和Aggregate的屬性。
//如果查詢SQL中包含Hive表if (relationMap.nonEmpty) { val neSet = mutable.HashSet.empty[NamedExpression] analyzed.map { lp => lp match {case wowLp: Project => wowLp.projectList.map { item => item.collectLeaves().foreach( _ match {case ne: NamedExpression => neSet.add(ne)case _ => } ) }case wowLp: Aggregate => wowLp.aggregateExpressions.map { item => item.collectLeaves().foreach( _ match {case ne: NamedExpression => neSet.add(ne)case _ => } ) }case _ => } }
exprId->資料庫表的對應關系與exprId->列名的對應關系做交集,就可以計算出資料庫表與查詢的所有列的對應關系。
//相當于記錄列的exprId與列名的對應關系 val neMap = neSet.zipWithIndex.map(ne => (ne._1.exprId.id ,ne._1)).toMap relationMap.foreach { x =>if (neMap.contains(x._1)) { val dbTable = x._2 val value = tableAndCols.getOrElse(dbTable, mutable.HashSet.empty[String]) value.add(neMap.get(x._1).get.name) tableAndCols.update(dbTable, value) } } } tableAndCols }
一個SQL的DataFrame傳進去,可以分析出庫表列的對應關系,然後就可以拿着這些資訊去做權限認證了。
println(extractTableWithColumns(df))//執行輸出的結果為:Map(respect.test -> Set(name, age))
其實通過分析Spark SQL查詢計劃還可以做很多其他的事情,比如名額的血緣分析、聚合分析、join分析等,或許還能算出個SQL複雜度、評估危險SQL,随便想一想哈
(當然這裡面有一些要注意的地方,在做某些分析時,比如union、window等就需要特殊處理) 更多介紹請通路: http://docs.mlsql.tech/zh/ 如何加入MLSQL社群? 添加如下号碼,發送mlsql