天天看點

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

本文基于 Apahce Spark 3.1.1 版本,講述 AQE 自适應查詢優化的原理,以及網易有數在 AQE 實踐中遇到的痛點和做出的思考。

1

前言

自适應查詢優化 (Adaptive Query Execution, AQE) 是 Spark 3.0 版本引入的重大特性之一,可以在運作時動态的優化使用者的 SQL 執行計劃,很大程度上提高了 Spark 作業的性能和穩定性。AQE 包含動态分區合并、Join 資料傾斜自動優化、動态 Join 政策選擇等多個子特性,這些特性可以讓使用者省去很多需要根據作業負載逐個手動調優,甚至修改業務邏輯的痛苦過程,極大的提升了 Spark 自身的易用性和靈活性。

作為網易大資料基礎軟體的締造者,網易數帆旗下網易有數團隊自 AQE 誕生起就關注其應用。第一個應用 AQE 的系統是 Kyuubi。Kyuubi 是網易開源的一款企業級資料湖探索平台,它基于 Spark SQL 實作了多租戶 SQL on Hadoop 查詢引擎。在網易内部,基于 Kyuubi 的 C/S 架構,在保證 SQL 相容性的前提下,服務端可以平滑地實作 Spark 版本更新,将社群和内部的最新優化和增強快速賦能使用者。從 Spark 3.0.2 開始,網易有數就在生産環境中逐漸試用和推廣 AQE 的特性。而在 Spark 3.1.1 釋出後,AQE 在 Kyuubi 生産環境中已經是使用者預設的執行方式。在這個過程中,我們還端到端地幫助某個業務遷移了 1500+ Hive 曆史任務到 Spark 3.1.1 上,不僅實作了資源量減半,更将總執行時間縮短了 70% 以上,綜合來看執行性能提升 7 倍多。

當然,AQE 作為一個“新”特性,在實踐過程中我們也發現它在很多方面不盡如人意,還有很大的優化空間。秉着堅持開源政策,網易有數努力将團隊遇到的問題和 Spark 社群分享,将我們的優化努力合進社群。以下章節,我們将展開介紹這半年多來 AQE 特性在網易的實踐經驗和優化改進。

2

AQE的設計思路

首先明确一個核心概念,AQE 的設計和優化完全圍繞着 shuffle,也就是說如果執行計劃裡不包含 shuffle,那麼 AQE 是無效的。常見的可能産生 shuffle 的算子比如 Aggregate(group by), Join, Repartition。

不同于傳統以整個執行計劃為粒度進行排程的方式,AQE 會把執行計劃基于 shuffle 劃分成若幹個子計劃,每個子計劃用一個新的葉子節點包裹起來,進而使得執行計劃的排程粒度細化到 stage 級别 (stage 也是基于 shuffle 劃分)。這樣拆解後,AQE 就可以在某個子執行計劃完成後擷取到其 shuffle 的統計資料,并基于這些統計資料再對下一個子計劃動态優化。

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

圖檔來自 databricks 部落格

有了這個排程流程之後,AQE 才可能有接下來的優化政策,從宏觀上來看 AQE 優化執行計劃的政策有兩種:一是動态修改執行計劃;二是動态生成 shuffle reader。

2.1 動态修改執行計劃

動态修改執行計劃包括兩個部分:對其邏輯計劃重新優化,以及生成新的實體執行計劃。我們知道一般的 SQL 執行流程是,邏輯執行計劃 -> 實體執行計劃,而 AQE 的執行邏輯是,子實體執行計劃 -> 父邏輯執行計劃 -> 父實體執行計劃,這樣的執行流程提供了更多優化的空間。比如在對 Join 算子選擇執行方式的時候可能有原來的 Sort Merge Join 優化為 Broadcast Hash Join。執行計劃層面看起來是這樣:

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

2.2 動态生成 Shuffle Reader

先明确一個簡單的概念 map 負責寫 shuffle 資料,reduce 負責讀取 shuffle 資料。而 shuffle reader 可以了解為在 reduce 裡負責拉 shuffle 資料的工具。标準的 shuffle reader 會根據預設定的分區數量 (也就是我們經常改的 spark.sql.shuffle.partitions),在每個 reduce 内拉取配置設定給它的 shuffle 資料。而動态生成的 shuffle reader 會根據運作時的 shuffle 統計資料來決定 reduce 的數量。下面舉兩個例子,分區合并和 Join 動态優化。

(1)分區合并是一個通用的優化,其思路是将多個讀取 shuffle 資料量少的 reduce 合并到 1 個 reduce。假如有一個極端情況,shuffle 的資料量隻有幾十 KB,但是分區數聲明了幾千,那麼這個任務就會極大的浪費排程資源。在這個背景下,AQE 在跑完 map 後,會感覺到這個情況,然後動态的合并 reduce 的數量,而在這個 case 下 reduce 的數量就會合并為 1。這樣優化後可以極大的節省 reduce 數量,并提高 reduce 吞吐量。

(2)Join 傾斜優化相對于分區合并,Join 傾斜優化則隻專注于 Join 的場景。如果我們 Join 的某個 key 存在傾斜,那麼對應到 Spark 中就會出現某個 reduce 的分區出現傾斜。在這個背景下,AQE 在跑完 map 後,會預統計每個 reduce 讀取到的 shuffle 資料量,然後把資料量大的 reduce 分區做切割,也就是把原本由 1 個 reduce 讀取的 shuffle 資料改為 n 個 reduce 讀取。這樣處理後就保證了每個 reduce 處理的資料量是一緻的,進而解決資料傾斜問題。

AQE 優化規則實作都是非常巧妙的,其他更多優化細節就不展開了,推薦閱讀 Kyuubi 與 AQE。

3

社群原生AQE的問題

看起來 AQE 已經是萬能的,我們經常遇到的問題點都被覆寫到了,那麼實際用起來的時候真的有這麼絲滑嗎?這裡列舉一些網易在使用 AQE 過程中遇到的痛點。

3.1 覆寫場景不足

就拿 Join 傾斜優化來說,這真的是一個非常棒的 idea,什麼都很好但是有一個缺陷:覆寫的場景有限。在網易的深度實踐過程中,經常會遇到一些 Join 明明就是肉眼可見的傾斜,但卻沒有被優化成想象中的樣子。這種情況對使用者來說會帶來極大的困擾,在成百上千行的 SQL 裡,哪些 Join 能被優化,哪些不能被優化?要花費很大一部分時間來去校驗确認。

3.2 廣播 Join 不可逆

廣播配置 spark.sql.autoBroadcastJoinThreshold 是我們最常修改的配置之一,其優勢是可以把 Join 用廣播的形式實作,避免了資料 shuffle。但是廣播有個很嚴重的問題:判定一張表是否可以被廣播是基于靜态的統計資料,特别是在經過一系列的過濾操作後,再完美的代價估計都是不精确的。由這個問題引發的任務失敗報錯就很常見了,Driver 端的 OOM,廣播逾時等。而 AQE 中的廣播是不可逆的,也就是說如果一個 Join 在進入 AQE 優化前已經被標明為廣播 Join,那麼 AQE 無法再将其轉換為其他 Join (比如 Sort Merge Join)。這對于一些由于錯誤估計大小而導緻被廣播的表是緻命的。也是我們遇到影響任務穩定性的一大因素。

3.3 配置不夠靈活

雖然 AQE 真的很好用,但是配置還是不夠靈活。比如 stage 級别的配置隔離,我們知道 AQE 是基于 stage 的排程,那麼更進一步的,SQL 的配置也可以是 stage 級别的,這樣可以最細粒度的優化每一次 shuffle。聽起來可能有點過猶不及的感覺,但是最容易遇到的一個需求就是單獨設定最後一個 stage 的配置。最後一個 stage 是與衆不同的,它代表着寫操作,也就是說它決定了最終産生檔案的數量。是以沖突和痛點就這樣出現了,最後一個 stage 考慮的是存儲,是檔案數,而過程中的 stage 考慮的是計算性能,是并發。

4

網易在 AQE 上的改進

網易是 AQE 這個特性的重度使用者,當然不應該放着這些痛點不管,基于社群版本的分支下我們做了一系列的優化和增強,并且已經把其中的一部分内容 push 到了社群。

4.1 回合社群更新檔

Spark 的釋出周期沒有那麼頻繁,就算小版本疊代一般也要小半年,那麼我們不可能隻眼睜睜看着一系列的 bug 存在于舊分支。是以網易在 Spark 分支管理上的政策是:自己維護小版本,及時跟進大版本 (小版本可能是從 3.0.1 到 3.0.2,大版本則是從 3.0 到 3.1)。在這個政策下,我們可以及時回合社群新發現的問題。比如 AQE 相關的更新檔 SPARK-33933,這個更新檔的作用是在執行子實體計劃的時候優先執行廣播其次 shuffle,進而減小在排程資源不足情況下廣播逾時的可能性。社群的這個更新檔需要到 3.2.0 分支才能釋出,但是出于穩定性的考慮,網易内部把它回合到了 3.1.1 分支。

4.2 回饋社群

提高廣播 Join 的穩定性

為了解決靜态估計執行計劃的統計資料不準确以及廣播在 AQE 中不可逆的問題,我們支援了在 AQE 自己的廣播配置 SPARK-35264。這個方案的思路是增加一個新的廣播配置 spark.sql.adaptive.autoBroadcastJoinThreshold 和已有的廣播配置隔離,再基于 AQE 運作時的統計資料來判斷是否可以用廣播來完成 Join,保證被廣播表的資料量是可信的。在這個條件下,我們可以禁用基于靜态估計的廣播 Join,隻開啟 AQE 的廣播,這樣我們就可以在享受廣播 Join 性能的同時兼顧穩定性。

增加 Join 傾斜優化覆寫次元

我們對 Join 傾斜優化做了很多增強,這個 case 是其中之一。在描述内容之前,我們先簡單介紹一個 SHJ 和 SMJ (Shuffled Hash Join 簡稱為 SHJ,Sort Merge Join 簡稱 SMJ)。SMJ 的實作原理是通過先把相同 key shuffle 到同一 reduce,然後做分區内部排序,最後完成 Join。而 SHJ 相對于 SMJ 有着優秀的時間複雜度,通過建構一個 hash map 做資料配對,節省了排序的時間,但缺點也同樣明顯,容易 OOM。

一直以來 SHJ 是一個很容易被遺忘的 Join 實作,這是因為預設配置 spark.sql.preferSortMerge 的存在,而且社群版本裡觸發 SHJ 的條件真的很苛刻。但自從 Spark 3.0 全面地支援了所有類型的 Join Hint SPARK-27225,SHJ 又逐漸進入了我們的視野。回到正題,社群版本的 AQE 目前隻對 SMJ 做了傾斜優化,這對于顯式聲明了 Join Hint 為 SHJ 的任務來說很不友好。在這個背景下,我們增加了 AQE 對 SHJ 傾斜優化的支援 SPARK-35214,使得 Join 傾斜優化在覆寫次元上得到了提升。

一些瑣碎的訂正

由于 Spark 在網易内部的使用場景是非常多的,包括但不限于數倉,ETL,Add hoc,是以我們需要最大程度減少負面的和誤導使用者的 case。

  • SPARK-35239,這個 issue 可以描述為當輸入的 RDD 分區是空的時候無法對其 shuffle 的分區合并。看起來影響并不大,如果是空表的話那麼就算空跑一些任務也是非常快的。但是在 Add hoc 場景下,預設的 spark.sql.shuffle.partitions 配置調整很大,這就會造成嚴重的 task 資源浪費,并且加重 Driver 的負擔
  • SPARK-34899,當我們發現某些 shuffle 分區在被 AQE 的分區合并規則成功優化後,分區數居然沒有下降,一度懷疑是沒有找到正确使用 AQE 的姿勢
  • SPARK-35168,一些 Hive 轉過來的同學可能會遇到的 issue,理論上 MapReduce 中 reduce 的數量等價于 Spark 的 shuffle 分區數,是以 Spark 做了一些配置映射。但是在映射中出現了 bug 這肯定是不能容忍的。

4.3 内部優化(已開源)

除了和社群保持交流之外,網易數帆也做了許多基于 AQE 的優化,這些優化都在我們的開源項目 Kyuubi 裡。

支援複雜場景下 Join 傾斜優化

社群版本對 AQE 的優化比較謹慎,隻對标準的 Sort Merge Join 做了傾斜優化,也就是每個 Join 下的子算子必須包含 Sort 和 Shuffle,這個政策極大的限制了 Join 傾斜優化的覆寫率。舉例來說,有一個執行計劃先 Aggregate 再 Join,并且這兩個算子之間沒有出現 shuffle。我們可以猜到,在沒有 AQE 的介入下,Aggregate 和 Join 之間的 shuffle 被剪枝了,這是一種常見的優化政策,一般是由于 Aggregate 的 key 和 Join 的 key 存在重複引起的。但是由于沒有擊中規則,AQE 無法優化這個場景的 Join。有一些可以繞過去的方法,比如手動在 Aggregate 和 Join 之間插入一個 shuffle,得到的執行計劃長這樣子:

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

我們在這種思路下,以增加規則的方式可以在不入侵 AQE 代碼的前提下,自動增加 shuffle 來滿足 Join 傾斜優化的觸發條件。選擇這樣處理的理由有 3 個

  • 增加 shuffle 可以帶來另一個優秀的副作用,就是支援多表 Join 場景下的優化,可以說是一舉兩得
  • 不用魔改 AQE 的代碼,可以獨立于我們内部的 Spark 分支快速疊代
  • 當然這不是最終的解決方案,和社群的交流還在繼續

小檔案合并以及 stage 級别的配置隔離

Spark 的小檔案問題已經存在很多年了,解決方案也有很多。而 AQE 的出現看起來可以天然的解決小檔案問題,是以網易内部基于 AQE 的分區合并優化規則,對每個涉及寫操作的 SQL,在其執行計劃的頂端動态插入一個 shuffle 節點,從執行計劃的角度看起來是這樣的:

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

再結合可以控制每個分區大小的相關配置,看起來一切都是這麼美好。但問題還是來了,其中有兩個最明顯的問題:

  • 簡單添加一個 shuffle 節點無法滿足動态分區寫的場景

假設我們最終産生 1k 個分區,動态插入的分區值的數量也是 1k,那麼最終會産生的檔案數是 1k x 1k = 1m。這肯定是不能被接受的,是以我們需要對動态分區字段做重分區,讓包含相同分區值的資料落在同一個分區内,這樣 1k 個分區生成的檔案數最多也是 1k。但是這樣處理後還有有一個潛在的風險點,不同分區值的分布是不均勻的,也就是說可能出現資料傾斜問題。對于這樣情況,我們又額外增加了與業務無關的重分區字段,并通過配置的方式幫助使用者快速應對不同的業務場景。

  • 單分區處理的資料量過大導緻性能瓶頸

成也蕭何,敗也蕭何。把 spark.sql.adaptive.advisoryPartitionSizeInBytes 調大後小檔案的問題是解決了,但是過程中每個分區處理的資料量也随之增加,這導緻過程中的并發度無法達到預期的要求。是以 stage 級别的配置隔離出現了。我們直接把整個 SQL 配置劃分為兩部分,最後一個 stage 以及之前的 stage,然後把這兩個部分之間的配置做了隔離。拿上面這個配置來說,在最後一個 stage 的樣子是 spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes。在配置隔離的幫助下,我們可以完美解決小檔案和計算性能不能兼得的問題,使用者可以更加優雅地使用 AQE。

4.4 案例分享

多表 Join 傾斜

下面這兩張圖為 3 表 Join 的執行計劃,由于長度的限制我們隻截取到 Join 相關的片段,并且沒有被優化的任務由于資料傾斜問題沒有執行成功。可以明顯看到社群版本無法對這類多表 Join 做傾斜優化,而我們在動态插入 shuffle 之後,兩次 Join 都成功的被優化。在這個特性的幫助下,Join 傾斜優化的覆寫場景相對于社群有明顯提升。

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

社群版本

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

内部版本

Stage 配置隔離

在支援了 stage 級别的配置隔離後,我們單獨設定了最後一個 stage 的參數,下面兩張圖是某個線上任務前後兩天的執行情況,可以明顯看到在配置隔離後,在保證最終産出的檔案數一緻的情況下,過程中 stage 的并發度得到了提升,進而使任務性能得到提升。

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

配置隔離前

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

配置隔離後

任務性能對比

這張圖展示了我們部分遷移任務的資源成本以及性能對比,其中藍線是遷移前的資料,紅線是遷移後的資料。可以非常明顯看到,在資源成本大幅下降的同時任務性能有不同程度的提升。

Apache Spark 3.0 自适應查詢優化在網易的深度實踐及改進

5

總結與展望

首先得感謝一下 Apache Spark 社群,在引入了 AQE 之後,我們的線上任務得到了不同程度的性能提升,也使得我們在遇到問題的時候可以有更多解決問題的思路。在深度實踐的過程中,我們也發現了一些可以優化的點:

  • 在優化細節上的角度,可以增加命中 AQE 優化的 case,比如 Join 傾斜優化增強,讓使用者不用逐個檢查不能被優化的執行計劃
  • 在業務使用上的角度,可以同時支援 ETL,Add hoc 等側重點不一樣的場景,比如 stage 配置隔離這個特性,讓關注寫和讀的業務都有良好的體驗

在完成這個階段性的優化後,接下來我們會繼續深耕在 AQE 的覆寫場景上,比如支援 Union 算子的細粒度優化,增強 AQE 的代價估計算法等。除此之外,還有一些潛在的性能回歸問題也是值得我們注意的,比如在做分區合并優化後會放大某些高時間複雜度算子的性能瓶頸。

作為可能是最快線上上使用 Apache Spark 3.1.1 的使用者,網易在享受社群技術福利的同時也在反哺社群。這也是網易對技術的思考和理念:

  • 因為開放,我們擁抱開源,深入社群
  • 因為熱愛,我們快速接收新的理論,實踐新的技術