天天看點

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

   TreeNode Library是Catalyst的核心類庫,文法樹的建構都是由一個個TreeNode組成。TreeNode本身是一個BaseType <: TreeNode[BaseType] 的類型,并且實作了Product這個trait,這樣可以存放異構的元素了。

   TreeNode有三種形态:BinaryNode、UnaryNode、Leaf

Node. 

   在Catalyst裡,這些Node都是繼承自Logical Plan,可以說每一個TreeNode節點就是一個Logical Plan(包含Expression)(直接繼承自TreeNode)

   主要繼承關系類圖如下:

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

二進制節點,即有左右孩子的二叉節點

[[TreeNode]] that has two children, [[left]] and [[right]].  

trait BinaryNode[BaseType <: TreeNode[BaseType]] {  

  def left: BaseType  

  def right: BaseType  

  def children = Seq(left, right)  

}  

abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {  

  self: Product =>  

 節點定義比較簡單,左孩子,右孩子都是BaseType。 children是一個Seq(left, right)

下面列出主要繼承二進制節點的類,可以當查詢手冊用 :)

這裡提示下平常常用的二進制節點:Join和Union

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

 一進制節點,即隻有一個孩子節點

 A [[TreeNode]] with a single [[child]].  

trait UnaryNode[BaseType <: TreeNode[BaseType]] {  

  def child: BaseType  

  def children = child :: Nil  

abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {  

下面列出主要繼承一進制節點的類,可以當查詢手冊用 :)

常用的二進制節點有,Project,Subquery,Filter,Limit ...等

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

葉子節點,沒有孩子節點的節點。

A [[TreeNode]] with no children.  

trait LeafNode[BaseType <: TreeNode[BaseType]] {  

  def children = Nil  

abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {  

  // Leaf nodes by definition cannot reference any input attributes.  

  override def references = Set.empty  

下面列出主要繼承葉子節點的類,可以當查詢手冊用 :)

提示常用的葉子節點: Command類系列,一些Funtion函數,以及Unresolved Relation...etc.

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

  簡單介紹一個TreeNode這個類的屬性和方法

  currentId

private val currentId = new java.util.concurrent.atomic.AtomicLong  

protected def nextId() = currentId.getAndIncrement()  

  sameInstance

  判斷2個執行個體是否是同一個的時候,隻需要判斷TreeNode的id。

def sameInstance(other: TreeNode[_]): Boolean = {  

  this.id == other.id  

  fastEquals,更常用的一個快捷的判定方法,沒有重寫Object.Equals,這樣防止scala編譯器生成case

class equals 方法

def fastEquals(other: TreeNode[_]): Boolean = {  

   sameInstance(other) || this == other  

 }  

  map,flatMap,collect都是遞歸的對子節點進行應用PartialFunction,其它方法還有很多,篇幅有限這裡不一一描述了。

  transform該方法接受一個PartialFunction,就是就是前一篇文章Analyzer裡提到的Batch裡面的Rule。

  是會将Rule疊代應用到該節點的所有子節點,最後傳回這個節點的副本(一個和目前節點不同的節點,後面會介紹,其實就是利用反射來傳回一個修改後的節點)。

  如果rule沒有對一個節點進行PartialFunction的操作,就傳回這個節點本身。

  來看一個例子:

object GlobalAggregates extends Rule[LogicalPlan] {  

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {   //apply方法這裡調用了logical plan(TreeNode) 的transform方法來應用一個PartialFunction。  

    case Project(projectList, child) if containsAggregates(projectList) =>  

      Aggregate(Nil, projectList, child)  

  }  

  def containsAggregates(exprs: Seq[Expression]): Boolean = {  

    exprs.foreach(_.foreach {  

      case agg: AggregateExpression => return true  

      case _ =>  

    })  

    false  

 這個方法真正的調用是transformChildrenDown,這裡提到了用先序周遊來對子節點進行遞歸的Rule應用。

 如果在對目前節點應用rule成功,修改後的節點afterRule,來對其children節點進行rule的應用。

 transformDown方法:

/** 

* Returns a copy of this node where `rule` has been recursively applied to it and all of its 

* children (pre-order). When `rule` does not apply to a given node it is left unchanged. 

* @param rule the function used to transform this nodes children 

*/  

ef transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {  

 val afterRule = rule.applyOrElse(this, identity[BaseType])  

 // Check if unchanged and then possibly return old copy to avoid gc churn.  

 if (this fastEquals afterRule) {  

   transformChildrenDown(rule)  //修改前節點this.transformChildrenDown(rule)  

 } else {  

   afterRule.transformChildrenDown(rule) //修改後節點進行transformChildrenDown  

  最重要的方法transformChildrenDown:

  對children節點進行遞歸的調用PartialFunction,利用最終傳回的newArgs來生成一個新的節點,這裡調用了makeCopy()來生成節點。

 transformChildrenDown方法:

 /** 

 * Returns a copy of this node where `rule` has been recursively applied to all the children of 

 * this node.  When `rule` does not apply to a given node it is left unchanged. 

 * @param rule the function used to transform this nodes children 

 */  

def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {  

  var changed = false  

  val newArgs = productIterator.map {  

    case arg: TreeNode[_] if children contains arg =>  

      val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //遞歸子節點應用rule  

      if (!(newChild fastEquals arg)) {  

        changed = true  

        newChild  

      } else {  

        arg  

      }  

    case Some(arg: TreeNode[_]) if children contains arg =>  

      val newChild = arg.asInstanceOf[BaseType].transformDown(rule)  

        Some(newChild)  

        Some(arg)  

    case m: Map[_,_] => m  

    case args: Traversable[_] => args.map {  

      case arg: TreeNode[_] if children contains arg =>  

        val newChild = arg.asInstanceOf[BaseType].transformDown(rule)  

        if (!(newChild fastEquals arg)) {  

          changed = true  

          newChild  

        } else {  

          arg  

        }  

      case other => other  

    }  

    case nonChild: AnyRef => nonChild  

    case null => null  

  }.toArray  

  if (changed) makeCopy(newArgs) else this //根據作用結果傳回的newArgs數組,反射生成新的節點副本。  

  makeCopy方法,反射生成節點副本  

  * Creates a copy of this type of tree node after a transformation. 

  * Must be overridden by child classes that have constructor arguments 

  * that are not present in the productIterator. 

  * @param newArgs the new product arguments. 

  */  

 def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {  

   try {  

     val defaultCtor = getClass.getConstructors.head  //反射擷取預設構造函數的第一個  

     if (otherCopyArgs.isEmpty) {  

       defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成目前節點類型的節點  

     } else {  

       defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //如果還有其它參數,++  

     }  

   } catch {  

     case e: java.lang.IllegalArgumentException =>  

       throw new TreeNodeException(  

         this, s"Failed to copy node.  Is otherCopyArgs specified correctly for $nodeName? "  

           + s"Exception message: ${e.getMessage}.")  

   }  

  現在準備從一段sql來出發,畫一下這個spark sql的整體樹的transformation。

 SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key

 首先,我們先執行一下,在控制台裡看一下生成的計劃:

<span style="font-size:12px;">sbt/sbt hive/console  

Using /usr/java/default as default JAVA_HOME.  

Note, this will be overridden by -java-home if it is set.  

[info] Loading project definition from /app/hadoop/shengli/spark/project/project  

[info] Loading project definition from /app/hadoop/shengli/spark/project  

[info] Set current project to root (in build file:/app/hadoop/shengli/spark/)  

[info] Starting scala interpreter...  

[info]   

import org.apache.spark.sql.catalyst.analysis._  

import org.apache.spark.sql.catalyst.dsl._  

import org.apache.spark.sql.catalyst.errors._  

import org.apache.spark.sql.catalyst.expressions._  

import org.apache.spark.sql.catalyst.plans.logical._  

import org.apache.spark.sql.catalyst.rules._  

import org.apache.spark.sql.catalyst.types._  

import org.apache.spark.sql.catalyst.util._  

import org.apache.spark.sql.execution  

import org.apache.spark.sql.hive._  

import org.apache.spark.sql.hive.test.TestHive._  

import org.apache.spark.sql.parquet.ParquetTestData  

scala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>  

  第一步生成UnResolve Logical Plan 如下:

scala> query.queryExecution.logical  

res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   

Project [*]  

 Join Inner, Some(('a.key = 'b.key))  

  Subquery a  

   Project [*]  

    UnresolvedRelation None, src, None  

  Subquery b  

  如果畫成樹是這樣的,僅個人了解:

  我将一開始介紹的三種Node分别用綠色UnaryNode,紅色Binary Node 和 藍色 LeafNode 來表示。

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

  Analyzer會将允用Batch的Rules來對Unresolved Logical  Plan Tree 進行rule應用,這裡用來EliminateAnalysisOperators将Subquery給消除掉,Batch("Resolution将Atrribute和Relation給Resolve了,Analyzed Logical Plan Tree如下圖:

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

  我把Catalyst裡的Optimizer戲稱為Spark SQL的優化大師,因為整個Spark SQL的優化都是在這裡進行的,後面會有文章來講解Optimizer。

  在這裡,優化的不明顯,因為SQL本身不複雜

scala> query.queryExecution.optimizedPlan  

res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =   

Project [key#0,value#1,key#2,value#3]  

 Join Inner, Some((key#0 = key#2))  

  MetastoreRelation default, src, None  

生成的樹如下圖:

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

  最後一步是最終生成的實體執行計劃,裡面涉及到了Hive的TableScan,涉及到了HashJoin操作,還涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。

scala> query.queryExecution.executedPlan  

res4: org.apache.spark.sql.execution.SparkPlan =   

Project [key#0:0,value#1:1,key#2:2,value#3:3]  

 HashJoin [key#0], [key#2], BuildRight  

  Exchange (HashPartitioning [key#0:0], 150)  

   HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None  

  Exchange (HashPartitioning [key#2:0], 150)  

   HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None  

 生成的實體執行樹如圖:

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)

    本文介紹了Spark SQL的Catalyst架構核心TreeNode類庫,繪制了TreeNode繼承關系的類圖,了解了TreeNode這個類在Catalyst所起到的作用。文法樹中的Logical Plan均派生自TreeNode,并且Logical Plan派生出TreeNode的三種形态,即Binary Node, Unary Node, Leaft Node。 正式這幾種節點,組成了Spark

SQl的Catalyst的文法樹。

  TreeNode的transform方法是核心的方法,它接受一個rule,會對目前節點的孩子節點進行遞歸的調用rule,最後會傳回一個TreeNode的copy,這種操作就是transformation,貫穿了Spark SQL執行的幾個核心階段,如Analyze,Optimize階段。

  最後用一個實際的例子,展示出來Spark SQL的執行樹生成流程。

  我目前的了解就是這些,如果分析不到位的地方,請大家多多指正。

——EOF——

原創文章,轉載請注明:

Spark-SparkSQL深入學習系列四(轉自OopsOutOfMemory)