作者:玄弟 七鋒 PolarDB-X 面向 HTAP 的混合執行器 一文詳細說明了PolarDB-X執行器設計的初衷,其初衷一直是緻力于為PolarDB-X注入并行計算的能力,兼顧TP和AP場景,逐漸為其打造一款具備TB級資料處理能力的資料庫。為了做到這一點,借鑒了各種資料庫和大資料庫産品,包括分析型資料庫,實時數倉等,吸取了各方面的優勢來打造出一個全新的并行執行引擎。這裡将對整個分布式并行執行架構做詳細的介紹,希望閱讀之後對我們的執行器有一個全面的認識。
▶ 整體設計
PolarDB-X 是一個 Share Nothing 的資料庫,采樣了計算和存儲分離的架構。其中資料以分片的形式存儲于每個DN節點,計算節點叫CN。在計算過程中,DN和CN間、DN和DN、CN和CN間是通過千兆、萬兆網絡通信。每個CN節點上一般都會有排程元件、資源管理元件、RPC元件等。一個複雜的查詢,會被排程到多個CN節點上執行,考慮到資料一般都會根據均勻政策分到各個DN上,是以每個CN節點同時會通路多個DN。
當使用者送出一條複雜的SQL語句,往往需要通路多個DN節點,這個時候就會啟動并行排程政策,整個執行步驟可以簡單了解:
- 使用者所連接配接的這個 CN 将承擔查詢協調者(Query Coordinator)的角色;
- Query先發送給Query Coordinator,會首先經過優化器生成最新的Plan,然後會拆分到多個子計劃(Fragment), 每個Fragment可能會包含多個執行算子。如果有的Framgnt負責掃描DN的話,它裡頭必定包含Scan算子,從DN拉取資料;Fragment也可以包含Agg或者Join等其他算子;
- Query Coordinator裡頭的排程器(Task Scheduler)會按照定義的排程邏輯将各個Framgnts封裝成Task,排程到合适的CN上執行,這裡頭可能會涉及到一些資源上的計算;
- 各個CN收到Task後,會申請執行資源,構造執行的上下文,開始啟動Task,并定期會向Query Coordinator彙報狀态;
- 各個Task間會通過資料傳輸通道(DTL)交換資料,當所有的Task都執行完畢後,會将資料結果傳回給Query Coordinator,由它負責将結果傳回給使用者;
- 成功傳回給使用者後,Query Coordinator和被排程到的各個CN節點的Task會做清理動作,釋放計算資源。
整個流程大緻就這樣,有細心的同學會發現我們的架構在DN上有一層叫Split概念。我們的資料是按分片存儲在各個DN上的,Split指的是資料分片partition的位址。對于包含掃描算子Scan的 Task,會計算出需要通路哪些 partition,這些 partition 分布在哪些 DN 上,然後封裝成splits按比例劃分給這些掃描Task。但是實際運作過程中每個掃描Task并不是預配置設定好splits的,而是預配置設定部分splits給掃描Task,看哪一個Task掃描的更快就會從Query Coordinator繼續擷取餘下splits,這樣可以盡可能避免由于各個掃描Task資源不均衡導緻的消費長尾現象。但是如果一個表隻被分成了2個分片,是不是意味着掃描任務至多隻能是2,這可能起不到明顯的并行加速效果。是以我們也支援在分片上繼續按照分段做拆分,那麼這個時候的Split除了會記錄分片的位址,也會記錄在分片上分段的位移。按照分段做拆分後,即便資料的分片數量有限,執行過程我們依然可以啟動更多的掃描Task,并行去加速掃描。
▶ 執行計劃
執行引擎執行的是由優化器生成的分布式執行計劃。執行計劃由算子組成。因為PolarDB-X的資料按照分片存儲到各個的DN節點上去,執行計劃執行也會盡可能的滿足資料分布的locality,能下推的計劃會被放到DN上執行,不能下推的計劃會會被切分成一個個子計劃(Fragment),會被放到各個CN節點上執行。是以這裡我們需要關心如何将一個從優化器出來的計劃,拆分成分布式計劃,放到各個CN上執行?
為了更好地了解這個過程,我們這裡以一條簡單SQL:
select * from (select useid, count(*) as b from user_data group by userid) as T where T.b > 10
為例,經過優化器生成這樣的相對最優計劃:
針對并行執行計劃,為了更高效地執行盡量減少資料傳輸,可以把執行計劃按照計算過程是否需要資料重分布(ReDistribution)分為不同片段(fragment)分布到相應節點執行,并且把一些操作下推來減少掃描輸出的資料,上面的計劃可能就變成這樣的執行計劃,由多個子片段構成。
不同片段之間通過 NetWork Write/Read 算子進行資料交換。更複雜的比如多表關聯(join)查詢,會有更多的片段和更複雜的資料交換模式。每個片段的并發度可以不同, 并發度是基于代價推導出來的。多機排程的基本機關是Stage,Stage記錄了上下遊片段的位置資訊,以便上下遊之間建立網絡通道(DTL)。每個片段排程到計算CN節點後,會被封裝成邏輯執行Task,比如fragment-1并發度是2的話,那麼會将Task-1.0和Task-1.1 兩個Task分别排程到兩個CN節點。
Task仍然是CN節點計算的邏輯單元,PolarDB-X執行器不僅僅可以支援單機并行能力(Parallel Query),也可以做多機并行(MPP)。是以在CN節點還引入了二層排程邏輯。當然二層排程的好處不僅僅于此,後面我們還會提到。這裡會繼續在Task内部根據算子間資料交換的特性,繼續做切分,切分成不同Pipeline。
不同的Pipeline并發度也可以不同,每個Pipeline會根據處理的資料規模大小會計算出不同的并發度,生成具體的執行單元Driver,Driver間會根據二層排程确定上下遊的本地通道(Local Channel)。
至此你應該可以了解從執行邏輯計劃轉化為分布式實體執行的整個過程。引入了一些新的名稱,這裡統一做下梳理:
- Fragment:指的是邏輯執行計劃按照計算過程中資料是否需要重分布,切割成的子計劃。
- Stage:是由Fragment封裝而成的排程邏輯機關,Stage除了封裝Fragment外,還會記錄上下遊Stage間的排程位置資訊。
- Task:Stage并不會在CN上直接運作,他們是通過并發度分解成一系列可排程到CN上的Task, Task依然是邏輯執行單元。
- Pipeline:對CN上的Task根據二層并發度做進一步切分,切分成不同的Pipeline。
- Driver:一個Pipeline包含多個Driver,Driver是具體的執行單元,是一系列可運作算子的集合。
一般來說針對一個複雜查詢,一個query包含多個Fragment,每個Fragment和Stage一一對應,每個Stage包含多個Tasks,每個Task會切分成不同的Pipeline,一個Pipeline包含了多個Driver。隻有了解上面說的Fragment/Stage/Task/Pipeline/Driver這些概念,你才能更清楚了解我們接下來的設計。
▶ 排程政策
并行計算在運作之初,需要解決任務排程問題。排程的直白了解,就是将切分好的Task排程到各個CN節點去執行,充分利用各個CN的計算資源。這裡頭大家很容易有這些疑問:
1. 執行過程中各個CN節點的計算資源是不均衡了,那麼在多機排程中是如何将各個Task打散到不同CN節點去執行? 2. 和各個DN互動的Task是如何并行的拉資料的?比如某個邏輯表分成了16個實體表,分布在4個DN節點上,有4個Driver去并行拉資料,每個Driver并不是均勻拉取4個實體表,而是根據自身的消費能力來确定拉取的實體表數量;多個Driver下發掃描任務會不會同時恰好落地一個DN節點上,導緻某個DN成為瓶頸? 3. 我們完全可以在一個CN節點,同時排程多個Task執行,已經可以做到單機并行,為什麼還要二層排程?
一層排程(多節點間)
為了解決(1) 和 (2) 的問題,我們在CN節點内部引入了排程子產品(Task Scheduler),主要負責Task在不同CN節點上的排程,這一層排程我們這裡稱之為一層排程,在這層排程中,同屬于一個Stage的多個Task一定會被排程到不同CN節點上去,確定一個CN節點隻能有相同tage的一個Task。排程過程中通過心跳不斷維護Task狀态機,也維護着叢集各個CN節點Load資訊,整個排程是基于CN Load做排程的。多機排程流程如下所示:
Resource Manager(RM)是CN節點上個一個資源管理子產品,RM會借助Task心跳機制實時維護叢集各個CN節點的負載,Task Scheduler元件會基于負載選擇合适的CN節點下發執行任務,比如CN-1 負載相對叢集其他CN節點來說高很多,那麼目前查詢的Task會分發給其他CN節點,避免排程到CN-1節點去。執行器在執行Task任務時,Task并不是建立好的時候就确定了其消費DN splits的映射關系。各個Task按批次動态拉取splits進行消費, 直白了解就是誰的消費能力越強誰就有可能消費更多的splits。同樣為了解決同一個時刻多個任務同時消費同一個DN上的splits問題,我們在排程之初會将splits根據位址資訊按照Zig-zag方式,把各個DN上的splits打散到整個splits queue上去,消費的時候可以盡可能分攤各個DN壓力,這樣計算過程中也會充分利用各個DN的資源。
有了一層排程後,我們也可以将同屬于一個Stage的多個Task排程到同一個CN,這樣其實也可以做到單機并行。如果這樣設計的話,我們容易忽略兩個問題:
- 一層排程的邏輯比較複雜,需要多次互動,一個CN内部需要同時維護各個Task的狀态,代價會比較大,這在TP場景是無法容忍的;
- 一層排程中,并發度越高,生成Task就越多,這些Task間需要建立更多的網絡傳輸通道。
二層排程(節點内部)
為了解決上述一層排程的不足,為此我們在參考Hyper的論文[1],引入了二層排程,既在CN節點内部單獨做單機并行排程,簡單來說我們會在Task内部借助CN的本地排程元件(Local Scheduler),對Task做進一步的并行排程,讓Task在CN上執行,也可以做到并行運作。下圖中,Stage-1和Stage-2是上下遊關系,各自并發度都是9,排程到3個CN節點執行。如果隻有一層并發度的話,每個CN節點還會排程運作3個Task,那麼上下遊之間總共會建立81個Channel,CN節點内部Task是互相獨立的,這樣缺點還是很明顯:
- 多個Channel,放大了網絡開銷,同一份buffer會被發送多次,發送和接收對CPU和Memory都有代價;
- 資料發送的對象是Task,資料本身有傾斜,會導緻同節點内Task之間的負載不均衡(hash skew),存在長尾問題。
而一層排程和二層排程相結合的話,Stage-1和Stage-2的一層并發度是3,這樣每個CN節點隻會有1個Task,Task内部并發度3。由于shuffle的對象是Task,是以Stage-1和Stage-2間隻會建立9個Channel,大大減少了網絡開銷,同時Task内部的3個Driver内資料是共享的,Task内部的所有的Driver可以共同消費接受到的資料,并行執行,避免長尾問題。針對于HashJoin,假設Ta為大表,Tb為小表,這兩個表做HashJoin,可以讓Ta和Tb同時shuffle到同一個節點做計算;也可以讓小表Tb廣播到Ta所在節點做計算,前者的網絡代價是Ta+Tb,而後者的代價是N*Tb(N代表廣播的份數)。是以如果隻有一層排程的話,N可能比較大,執行過程中我們可能會選擇兩端做shuffle的執行計劃;而一層和二層相結合的排程政策,可以讓執行過程中選擇BroadcastHashJoin,這樣可以避免大表做shuffle,提高執行效率。
此外在二層排程政策中,task内部的多線程很容易做到資料共享,有利于更高效的算法。如下圖,同樣是HashJoin過程中,build端的Task内部多個線程(driver)協同計算:build端收到shuffle的資料後,多線程協同建立一個共享的hash表。這樣一個task隻有一個build table,probe端收到shuffle資料後,也不用做ReDistribution了,直接讀取接受到資料,進行并行的probe。
▶ 并行執行
聊完排程,接下來應該是關心任務是如何在CN上運作,運作過程中遇到異常我們系統是如何處理的呢?
線程模型
說到執行,有經驗的同學可能會發現我們的排程并沒有解決排程死鎖問題,比如對于下面這樣一個執行計劃,兩表Join。一般會遇到兩種問題:
1. 如果先排程f3和f2的話,這個時候假設叢集沒有排程資源,則f1不能遲遲排程起來。而HashJoin的邏輯就是需要先建構buildTable,這裡f1剛好是build table部分。最終會導緻執行死鎖:f1在等待f3和f2的計算資源釋放,而f2和f3又在等待f1建構完buildTable;
2. 如果f1先排程起來了,假設這個時候f2和f3沒有排程資源,這個時候f1從DN拉出來的資料,其實是無法發送給f3的,因為f3還沒有被排程起來。
解決問題1,業界有很多方式,比較常見是在排程之初建構排程依賴關系(Scheduler Depedency):f1->f3-f2。而解決問題2,往往是将f1把DN拉出來的資料先放到記憶體中,實在放不下就落盤處理。可見處理上述兩個問題,執行架構不僅僅需要在多機排程上做複雜的排程依賴關系,同時還需要考慮對落盤的支援。而其實我們在排程的時候,并沒有去考慮排程依賴這個事情,我們是一次性把f1/f2/f3全部排程起來了,這個是為何呢?這就要說下我們執行中的邏輯線程模型概念。在大多數計算引擎中,一個查詢首先會通過資源排程節點,在各個CN上申請執行線程和記憶體,申請成功後,這些執行資源會被排程元件占用,用來配置設定目前查詢的Task,不可以再被其他查詢所利用,這種是真實的執行資源,和排程資源互相綁定,當CN上可利用的執行資源不夠的時候,才會出現排程死鎖問題。而在PolarDB-X中,我們并沒有在排程的時候申請真實的線程資源,排程隻需要考慮各個CN的負載,不需要考慮各個CN到底還剩多少可利用的真實資源。我們的線程模型也并沒有和排程資源綁死,每個Driver其實不獨占一個真實的線程,換句話說,真實的線程也并沒有和排程資源一一對應。雖然說Driver是執行的基本單元,但是在排程上來看,它又是邏輯的線程模型而已。那是不是意味着隻要有排程任務,都可以被成功排程到CN上去,答案是肯定的。一次性排程所有的執行單元到CN上去執行,對記憶體和CPU也是一種開銷。比如f2被執行起來後,但是f1并沒有執行完畢,那麼f2也會不斷執行,其資料其實也會被緩存起來,但是也不能無限緩存資料呀?為了解決這個問題,接下來就需要借助我們的時間片執行了。
時間片執行
我們在每個CN節點内部會有一組執行線程池來運作這些Driver,每個Driver會排隊進入線程池參與計算,如果Driver被阻塞就會退出到Blocking隊列中,等待被喚醒。比如f2 driver 啟動後,從DN拉了資料放到有限空間buffer裡頭去,這個時候假設f1 driver都沒有結束,那麼f2 driver 對應的buffer就會滿,滿了後就會阻塞住,一旦阻塞我們的執行架構就會讓f2 driver從執行器退出來,加入到Blocking隊列中,簡單的說就是将計算資源騰讓出來,等待被喚醒。直到f1 driver都執行完畢後, f2 driver會被喚醒,執行架構就會将他移動到Pending隊列中,等待被排程到執行線程池中繼續運作。這裡頭還是會浪費點記憶體,但相對于CPU資源來說,記憶體資源還是比較充裕的。
時間片執行的核心就是需要判斷Driver何時會被Block的,總結起來被阻塞的原因一般分為三種情況:
- 根據算子依賴模型來确定,比如圖中f1 driver未執行完畢,那麼f2 driver其實也會被阻塞(這個是一個可配置的選項);
- 計算資源不足(主要指記憶體),對應的driver會被挂起,等待資源釋放;
- 等待DN響應,實體SQL下發給DN後,Driver會被挂起,等待實體SQL執行完畢。
除此之外我們在借鑒Linux 時間片排程機制,會在軟體層面上統計Driver的運作時長,超過門檻值(500ms),也會被強制退出執行線程,加入到Pending隊列,等待下一輪的執行排程。這種軟體層面上的時間片排程模型,可以解決複雜查詢長時間占用計算資源問題。其實實作起來也挺簡單的,就是每計算完一個批資料後,我們會對driver的運作時長進行統計,超過門檻值,就退出線程池。下面貼出了Driver處理邏輯的部分僞代碼,Driver在執行采用的是經典的Producer-Consumer模型,每消費一個Chunk我們就會累計時間,當超過既定門檻值,就會退出來。
任務狀态機
高并發系統,頻繁地等待或者任務切換是常見的系統瓶頸。異步處理是一種已經被證明行之有效地避免這些瓶頸,把高并發系統性能推到極緻的方法。是以PolarDB-X執行器的整個後端,統一使用全異步的執行架構;同時MPP執行過程涉及到多機的協調,是以這就要求我們在系統内部維護這些異步狀态。異步狀态的維護特别重要,比如某個查詢下的Task執行失敗,需要立即通知到整個叢集中該查詢正在運作的Task任務,以便立即中止,以防出現Suspend Task,造成資源不釋放問題。
是以在執行器内部,我們從三個次元(Task Stage Query)去維護狀态, 這三種State是互相依賴耦合的,比如Query 被Cancel,會立即通知其所有的Stage,各個Stage監聽到狀态變化,會及時通知給其所有的Task,隻有等待Task都被Cancel後,Stage 最後的狀态才變更為Cancel,最終Query的狀态才被标記為Cancel。在這個過程中我們會引入對狀态機異步監聽機制,一旦狀态發送變更就會異步回調相關處理邏輯。通過維護這些狀态,我們也可以及時通過查詢或者監控診斷到任務是否異常,異常發生在哪個環節,也便于我們後期排查問題。
▶ 資源隔離
如果并發請求過多的時候,資源緊張會讓請求線程參與排隊。但是正在運作的線程,需要耗費比較多的計算資源(CPU和Memory)的時候,會嚴重影響到其他正常正在運作的Driver。這對我們這種面向HTAP場景的執行器是決定不被允許的。是以在資源隔離這一塊,我們會針對不同WorkLoad做計算資源隔離,但這種隔離是搶占式的。
CPU
在CPU層面上我們是基于CGroup做資源隔離的,根據WorkLoad不同我們把CPU資源分為AP Group和TP Group兩組,其中對TP Group的CPU資源不限制;而對AP Group是基于CGroup做硬隔離,其CPU使用水位的最小門檻值(cpu.min.cfs_quota)和最大門檻值(cpu.max.cfs_quota)來做限制。執行線程分為三組: TP Core Pool 、AP Core Pool、SlowQuery AP Core Pool,其中後兩者會被劃分到AP Croup一組,做嚴格的CPU限制。Driver會根據WorkLoad劃分到不同的Pool執行。看似很美的實作,這裡頭依然存在兩個問題:
1. 基于COST識别的WorkLoad不準怎麼辦?
2. AP查詢比較耗資源,在同一個Group下的多個慢查詢互相影響怎麼辦?
出現問題(1)主要的場景是我們把AP類型的查詢識别成了TP,結果會導緻AP影響到TP,這是不可以接受的。是以我們在執行過程中會監視TP Driver的執行時長,超過一定門檻值後仍沒有結束的查詢,會主動退出時間片,然後将其它排程到AP Core Pool執行。而為了解決問題(2),我們會将AP Core Pool中長時間運作都未結束的Driver,進一步做優雅降級,排程到SlowQuery AP Core Pool執行。其中SlowQuery AP Core Pool會設定執行權重,盡可能降低其執行Driver的頻率。
MEMORY
在記憶體層面上,會将CN節點堆内記憶體區域大緻可以分為四大塊:
- TP Memory:用于存放TP計算過程中的臨時資料
- AP Memory:用于存放AP計算過程中的臨時資料
- Other:存放資料結構、臨時對象和中繼資料等
- System Reserverd:系統保留記憶體
TP和AP Memory分别會有最大門檻值和最小門檻值限制,兩者記憶體使用過程中可以互相搶占,但是基本原則是:TP Memory可以搶占AP Memory,直到查詢結束才釋放;而AP Memory可以搶占記憶體TP,但是一旦TP需要記憶體的時候,AP Memory需要立即釋放記憶體,釋放方式可以是自殺或者落盤。
▶ 資料傳輸層(DTL)
并行計算是充分利用各個CN資源參與計算,那麼DN與DN之間必然會存在資料互動。各個DN上的上下遊的Task資料需要傳輸,比如上遊的Task數量N,下遊的Task數量是M,那麼他們之間的資料傳輸通道需要用到M*N個通道(Channel),同樣的我們将這些通道(Channel)的概念抽象成資料傳輸層。這個傳輸層的設計往往也會面臨兩個問題:
1. 通道分為發送端和接受端,當發送端源源不斷發送資料,而接受端無法處理的話就會造成記憶體雪崩;
2. 資料在傳輸過程中丢失。
在業界實作資料傳輸主要有兩種傳輸方式:Push和Pull。Push就是發送端往接受端推送資料,這裡頭為了避免接收端處理不過來,需要引入流控邏輯,一般的做法都是在接收端預留了槽位,當槽位被資料占滿時會通知發送端暫停發送資料,當有接收端資料被消費空閑槽位出現時通知發送端繼續發送,這裡頭會涉及到發送端和接收端的多次互動,流控機制相對比較複雜。Pull就是發送端将資料先發送到buffer裡頭去,接收端按需從發送端的的buffer拉資料,而當發送端發送的資料到buffer,接收端假設長時間不來拉資料,最後發送端buffer滿了,也會觸發上遊反壓,為了避免頻繁反壓,往往發送端的buffer不應該設定太小。綜合起來我們選擇了pull方式來做。采樣pull方式也會遇到兩個問題:
1. 每個receiver一般會和上遊多個sender建立連接配接,那麼每次都是通過廣播的方式從上遊所有的sender拉資料嗎?
2. 一次從sender端到底請求多少的資料呢,即averageBytesPerRequest?
我們先回答問題(2),我們這裡會記錄上一次請求的資料量lastAverageBytesPerRequest、目前建連通道個數n以及上一次總共傳回的資料量responseBytes,來計算出目前averageBytesPerRequest,具體的公式下面也給出了。至于問題(1),有了目前的averageBytesPerRequest後,結合目前receiver上buffer剩餘空間,可以估算出這一次需要向上遊幾個sender發送請求。
在異步通信過程中為了保證傳輸可靠性,我們采用了類似tcp ack的方式,當receiver端帶着token去上遊拉資料的時候,則表示目前token之前的資料均已經被receiver端消費完畢,sender可以釋放這些資料,然後将下一批資料以及nextToken傳回給receiver端。
▶ 效果展示
前後說了很多幹貨,下面咱們來點簡單實際的東西。這裡以TPCH Q13為例來示範下執行器在不同場景下的加速效果,為了友善截圖在Q13後面都加了limit。該測試環環境下,CN和DN規格都是2*16C64G。
單機單線程下運作,耗時3min31s
使用Parallel Query加速,既單機多線程執行,耗時23.3s
使用MPP加速,既同時利用兩個CN節點的資源計算,耗時11.4s
▶ 總結
不管是簡單查詢,還是 Parallel Query和MPP場景下的複雜查詢,共用的都是一套執行架構。不同場景下對執行器的要求,更多的是并發度設定和排程政策的差異。相對于業界其他産品來說,PolarDB-X執行器主要特點:
- 在資源模式上使用的是輕量化的資源管理,不像大資料計算引擎,需要額外引入的資源管理的節點,做嚴格的資源預配置設定,主要考慮到我們的場景是針對于小叢集的線上計算;
- 在排程模型上執行器支援DAG排程,相對于MPP排程可以做到更加靈活的并發控制模型,各個Stage間、Pipeline間的并發可以不一樣;
- 差別與其他産品,AP加速引用的是外挂并行計算引擎,PolarDB-X并行執行器是内置的,不同查詢間共用一套執行模型,確定TP和AP享有一緻的SQL相容性。
PolarDB-X并行計算線上上已經平穩運作了近兩年,這兩年來我們不僅僅在執行架構上做了很多穩定性工作,在算子層的優化我們也沉澱了不少的技術。但這些還不夠,目前比較熱的是自适應執行,結合Pipeline模式的自适應執行挑戰比較大,我們近期也在研究,歡迎感興趣的朋友來拍拍磚,一起進步!
▶ Reference
[1] V. Leis, et al., Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age, in SIGMOD, 2014.
[2] Presto: SQL on Everything.
[3] A Deep Dive into Query Execution Engine of Spark SQL.
[4] Impala: A Modern, Open-Source SQL Engine for Hadoop
[5] FusionInsight LibrA: Huawei's Enterprise Cloud Data Analytics Platform. Proc. VLDB Endow. 11(12): 1822-1834 (2018)