天天看點

資料傾斜?Spark 3.0 AQE專治各種不服(上)

Spark3.0已經釋出半年之久,這次大版本的更新主要是集中在性能優化和文檔豐富上,其中46%的優化都集中在Spark SQL上,SQL優化裡最引人注意的非Adaptive Query Execution莫屬了。

資料傾斜?Spark 3.0 AQE專治各種不服(上)

Adaptive Query Execution(AQE)是英特爾大資料技術團隊和百度大資料基礎架構部工程師在Spark 社群版本的基礎上,改進并實作的自适應執行引擎。近些年來,Spark SQL 一直在針對CBO 特性進行優化,而且做得十分成功。

CBO基本原理

在介紹CBO前,我們先來看一下另一個基于規則優化(Rule-Based Optimization,簡稱RBO)的優化器,這是一種經驗式、啟發式的優化思路,優化規則都已經預先定義好,隻需要将SQL往這些規則上套就可以。簡單的說,RBO就像是一個經驗豐富的老司機,基本套路全都知道。

然而世界上有一種東西叫做 – 不按套路來。與其說它不按套路來,倒不如說它本身并沒有什麼套路。最典型的莫過于複雜Join算子優化,對于這些Join來說,通常有兩個選擇題要做:

  1. Join應該選擇哪種算法政策來執行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的執行政策對系統的資源要求不同,執行效率也有天壤之别,同一個SQL,選擇到合适的政策執行可能隻需要幾秒鐘,而如果沒有選擇到合适的執行政策就可能會導緻系統OOM。
  2. 對于雪花模型或者星型模型來講,多表Join應該選擇什麼樣的順序執行?不同的Join順序意味着不同的執行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很顯然需要大量的系統資源來運算,執行時間必然不會短。而如果使用A join C join B的執行順序,因為C表很小,是以A join C會很快得到結果,而且結果集會很小,再使用小的結果集 join B,性能顯而易見會好于前一種方案。

大家想想,這有什麼固定的優化規則麼?并沒有。說白了,你需要知道更多關于表的基礎資訊(表大小、表記錄總條數等),再通過一定規則代價評估才能從中選擇一條最優的執行計劃。是以,CBO 意為基于代價優化政策,它需要計算所有可能執行計劃的代價,并挑選出代價最小的執行計劃。

AQE對于整體的Spark SQL的執行過程做了相應的調整和優化,它最大的亮點是可以根據已經完成的計劃結點真實且精确的執行統計結果來不停的回報并重新優化剩下的執行計劃。

CBO這麼難優化,Spark怎麼解決?

CBO 會計算一些和業務資料相關的統計資料,來優化查詢,例如行數、去重後的行數、空值、最大最小值等。Spark會根據這些資料,自動選擇BHJ或者SMJ,對于多Join場景下的Cost-based Join Reorder,來達到優化執行計劃的目的。

但是,由于這些統計資料是需要預先處理的,會過時,是以我們在用過時的資料進行判斷,在某些情況下反而會變成負面效果,拉低了SQL執行效率。

Spark3.0的AQE架構用了三招解決這個問題:

  • 動态合并shuffle分區(Dynamically coalescing shuffle partitions)
  • 動态調整Join政策(Dynamically switching join strategies)
  • 動态優化資料傾斜Join(Dynamically optimizing skew joins)

下面我們來詳細介紹這三個特性。

動态合并 shuffle 的分區

在我們處理的資料量級非常大時,shuffle通常來說是最影響性能的。因為shuffle是一個非常耗時的算子,它需要通過網絡移動資料,分發給下遊算子。在shuffle中,partition的數量十分關鍵。partition的最佳數量取決于資料,而資料大小在不同的query不同stage都會有很大的差異,是以很難去确定一個具體的數目:

  • 如果partition過少,每個partition資料量就會過多,可能就會導緻大量資料要落到磁盤上,進而拖慢了查詢。
  • 如果partition過多,每個partition資料量就會很少,就會産生很多額外的網絡開銷,并且影響Spark task scheduler,進而拖慢查詢。

為了解決該問題,我們在最開始設定相對較大的shuffle partition個數,通過執行過程中shuffle檔案的資料來合并相鄰的小partitions。例如,假設我們執行SELECT max(i) FROM tbl GROUP BY j,表tbl隻有2個partition并且資料量非常小。我們将初始shuffle partition設為5,是以在分組後會出現5個partitions。若不進行AQE優化,會産生5個tasks來做聚合結果,事實上有3個partitions資料量是非常小的。

資料傾斜?Spark 3.0 AQE專治各種不服(上)

然而在這種情況下,AQE隻會生成3個reduce task。

資料傾斜?Spark 3.0 AQE專治各種不服(上)

動态切換join政策

Spark 支援許多 Join 政策,其中 broadcast hash join 通常是性能最好的,前提是參加 join 的一張表的資料能夠裝入記憶體。由于這個原因,當 Spark 估計參加 join 的表資料量小于廣播大小的門檻值時,其會将 Join 政策調整為 broadcast hash join。但是,很多情況都可能導緻這種大小估計出錯——例如存在一個非常有選擇性的過濾器。

由于AQE擁有精确的上遊統計資料,是以可以解決該問題。比如下面這個例子,右表的實際大小為15M,而在該場景下,經過filter過濾後,實際參與join的資料大小為8M,小于了預設broadcast門檻值10M,應該被廣播。

資料傾斜?Spark 3.0 AQE專治各種不服(上)

在我們執行過程中轉化為BHJ的同時,我們甚至可以将傳統shuffle優化為本地shuffle(例如shuffle讀在mapper而不是基于reducer)來減小網絡開銷。

動态優化資料傾斜

Join裡如果出現某個key的資料傾斜問題,那麼基本上就是這個任務的性能殺手了。在AQE之前,使用者沒法自動處理Join中遇到的這個棘手問題,需要借助外部手動收集資料統計資訊,并做額外的加鹽,分批處理資料等相對繁瑣的方法來應對資料傾斜問題。

資料傾斜本質上是由于叢集上資料在分區之間分布不均勻所導緻的,它會拉慢join場景下整個查詢。AQE根據shuffle檔案統計資料自動檢測傾斜資料,将那些傾斜的分區打散成小的子分區,然後各自進行join。

我們可以看下這個場景,Table A join Table B,其中Table A的partition A0資料遠大于其他分區。

資料傾斜?Spark 3.0 AQE專治各種不服(上)

AQE會将partition A0切分成2個子分區,并且讓他們獨自和Table B的partition B0進行join。

資料傾斜?Spark 3.0 AQE專治各種不服(上)

如果不做這個優化,SMJ将會産生4個tasks并且其中一個執行時間遠大于其他。經優化,這個join将會有5個tasks,但每個task執行耗時差不多相同,是以個整個查詢帶來了更好的性能。

如何開啟AQE

我們可以設定參數spark.sql.adaptive.enabled為true來開啟AQE,在Spark 3.0中預設是false,并滿足以下條件:

  • 非流式查詢
  • 包含至少一個exchange(如join、聚合、視窗算子)或者一個子查詢

AQE通過減少了對靜态統計資料的依賴,成功解決了Spark CBO的一個難以處理的trade off(生成統計資料的開銷和查詢耗時)以及資料精度問題。相比之前具有局限性的CBO,現在就顯得非常靈活。

Spark CBO源碼實作

Adaptive Execution 模式是在使用Spark實體執行計劃注入生成的。在QueryExecution類中有 preparations 一組優化器來對實體執行計劃進行優化, InsertAdaptiveSparkPlan 就是第一個優化器。

InsertAdaptiveSparkPlan 使用 PlanAdaptiveSubqueries Rule對部分SubQuery處理後,将目前 Plan 包裝成 AdaptiveSparkPlanExec 。

當執行 AdaptiveSparkPlanExec 的 collect() 或 take() 方法時,全部會先執行 getFinalPhysicalPlan() 方法生成新的SparkPlan,再執行對應的SparkPlan對應的方法。

// QueryExecution類
lazy val executedPlan: SparkPlan = {
    executePhase(QueryPlanningTracker.PLANNING) {
      QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
    }
  }

  protected def preparations: Seq[Rule[SparkPlan]] = {
    QueryExecution.preparations(sparkSession,
      Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
  }

  private[execution] def preparations(
      sparkSession: SparkSession,
      adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
    // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
    // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
    adaptiveExecutionRule.toSeq ++
    Seq(
      PlanDynamicPruningFilters(sparkSession),
      PlanSubqueries(sparkSession),
      EnsureRequirements(sparkSession.sessionState.conf),
      ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
        sparkSession.sessionState.columnarRules),
      CollapseCodegenStages(sparkSession.sessionState.conf),
      ReuseExchange(sparkSession.sessionState.conf),
      ReuseSubquery(sparkSession.sessionState.conf)
    )
  }


// InsertAdaptiveSparkPlan 
  override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

  private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
   // ...some checking
    case _ if shouldApplyAQE(plan, isSubquery) =>
      if (supportAdaptive(plan)) {
        try {
          // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
          // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
          val subqueryMap = buildSubqueryMap(plan)
          val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
          val preprocessingRules = Seq(
            planSubqueriesRule)
          // Run pre-processing rules.
          val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
          logDebug(s"Adaptive execution enabled for plan: $plan")
          AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
        } catch {
          case SubqueryAdaptiveNotSupportedException(subquery) =>
            logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
              s"but is not supported for sub-query: $subquery.")
            plan
        }
      } else {
        logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
          s"but is not supported for query: $plan.")
        plan
      }
    case _ => plan
  }           

複制

AQE對Stage 分階段送出執行和優化過程如下:

private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
    // 第一次調用 getFinalPhysicalPlan方法時為false,等待該方法執行完畢,全部Stage不會再改變,直接傳回最終plan
    if (isFinalPlan) return currentPhysicalPlan

    // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
    // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
    // created in the middle of the execution.
    context.session.withActive {
      val executionId = getExecutionId
      var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
      var result = createQueryStages(currentPhysicalPlan)
      val events = new LinkedBlockingQueue[StageMaterializationEvent]()
      val errors = new mutable.ArrayBuffer[Throwable]()
      var stagesToReplace = Seq.empty[QueryStageExec]
      while (!result.allChildStagesMaterialized) {
        currentPhysicalPlan = result.newPlan
        // 接下來有哪些Stage要執行,參考 createQueryStages(plan: SparkPlan) 方法
        if (result.newStages.nonEmpty) {
          stagesToReplace = result.newStages ++ stagesToReplace
          // onUpdatePlan 通過listener更新UI
          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

          // Start materialization of all new stages and fail fast if any stages failed eagerly
          result.newStages.foreach { stage =>
            try {
              // materialize() 方法對Stage的作為一個單獨的Job送出執行,并傳回 SimpleFutureAction 來接收執行結果
              // QueryStageExec: materialize() -> doMaterialize() ->
              // ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
              // SparkContext: -> submitMapStage(shuffleDependency)
              stage.materialize().onComplete { res =>
                if (res.isSuccess) {
                  events.offer(StageSuccess(stage, res.get))
                } else {
                  events.offer(StageFailure(stage, res.failed.get))
                }
              }(AdaptiveSparkPlanExec.executionContext)
            } catch {
              case e: Throwable =>
                cleanUpAndThrowException(Seq(e), Some(stage.id))
            }
          }
        }

        // Wait on the next completed stage, which indicates new stats are available and probably
        // new stages can be created. There might be other stages that finish at around the same
        // time, so we process those stages too in order to reduce re-planning.
        // 等待,直到有Stage執行完畢
        val nextMsg = events.take()
        val rem = new util.ArrayList[StageMaterializationEvent]()
        events.drainTo(rem)
        (Seq(nextMsg) ++ rem.asScala).foreach {
          case StageSuccess(stage, res) =>
            stage.resultOption = Some(res)
          case StageFailure(stage, ex) =>
            errors.append(ex)
        }

        // In case of errors, we cancel all running stages and throw exception.
        if (errors.nonEmpty) {
          cleanUpAndThrowException(errors, None)
        }

        // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
        // than that of the current plan; otherwise keep the current physical plan together with
        // the current logical plan since the physical plan's logical links point to the logical
        // plan it has originated from.
        // Meanwhile, we keep a list of the query stages that have been created since last plan
        // update, which stands for the "semantic gap" between the current logical and physical
        // plans. And each time before re-planning, we replace the corresponding nodes in the
        // current logical plan with logical query stages to make it semantically in sync with
        // the current physical plan. Once a new plan is adopted and both logical and physical
        // plans are updated, we can clear the query stage list because at this point the two plans
        // are semantically and physically in sync again.
        // 對前面的Stage替換為 LogicalQueryStage 節點
        val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
        // 再次調用optimizer 和planner 進行優化
        val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
        val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
        val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
        if (newCost < origCost ||
            (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
          logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
          cleanUpTempTags(newPhysicalPlan)
          currentPhysicalPlan = newPhysicalPlan
          currentLogicalPlan = newLogicalPlan
          stagesToReplace = Seq.empty[QueryStageExec]
        }
        // Now that some stages have finished, we can try creating new stages.
        // 進入下一輪循環,如果存在Stage執行完畢, 對應的resultOption 會有值,對應的allChildStagesMaterialized 屬性 = true
        result = createQueryStages(currentPhysicalPlan)
      }

      // Run the final plan when there's no more unfinished stages.
      // 所有前置stage全部執行完畢,根據stats資訊優化實體執行計劃,确定最終的 physical plan
      currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
      isFinalPlan = true
      executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
      currentPhysicalPlan
    }
  }           

複制

// SparkContext
  /**
   * Submit a map stage for execution. This is currently an internal API only, but might be
   * promoted to DeveloperApi in the future.
   */
  private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
      : SimpleFutureAction[MapOutputStatistics] = {
    assertNotStopped()
    val callSite = getCallSite()
    var result: MapOutputStatistics = null
    val waiter = dagScheduler.submitMapStage(
      dependency,
      (r: MapOutputStatistics) => { result = r },
      callSite,
      localProperties.get)
    new SimpleFutureAction[MapOutputStatistics](waiter, result)
  }


// DAGScheduler
  def submitMapStage[K, V, C](
      dependency: ShuffleDependency[K, V, C],
      callback: MapOutputStatistics => Unit,
      callSite: CallSite,
      properties: Properties): JobWaiter[MapOutputStatistics] = {

    val rdd = dependency.rdd
    val jobId = nextJobId.getAndIncrement()
    if (rdd.partitions.length == 0) {
      throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
    }

    // We create a JobWaiter with only one "task", which will be marked as complete when the whole
    // map stage has completed, and will be passed the MapOutputStatistics for that stage.
    // This makes it easier to avoid race conditions between the user code and the map output
    // tracker that might result if we told the user the stage had finished, but then they queries
    // the map output tracker and some node failures had caused the output statistics to be lost.
    val waiter = new JobWaiter[MapOutputStatistics](
      this, jobId, 1,
      (_: Int, r: MapOutputStatistics) => callback(r))
    eventProcessLoop.post(MapStageSubmitted(
      jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
    waiter
  }           

複制

目前,AdaptiveSparkPlanExec 中對實體執行的優化器清單如下:

// AdaptiveSparkPlanExec
  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf),
    ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
    CollapseCodegenStages(conf)
  )           

複制

其中 OptimizeSkewedJoin方法就是針對最容易出現資料傾斜的Join進行的優化:

AQE模式下,每個Stage執行之前,前置依賴Stage已經全部執行完畢,那麼就可以擷取到每個Stage的stats資訊。當發現shuffle partition的輸出超過partition size的中位數的5倍,且partition的輸出大于 256M 會被判斷産生資料傾斜, 将partition 資料按照targetSize進行切分為N份。targetSize = max(64M, 非資料傾斜partition的平均大小)。

優化前 shuffle 如下:

資料傾斜?Spark 3.0 AQE專治各種不服(上)

優化後 shuffle:

資料傾斜?Spark 3.0 AQE專治各種不服(上)