天天看點

《循序漸進學Spark》一3.2 Spark排程機制

本節書摘來自華章出版社《循序漸進學spark》一書中的第3章,第3.2節,作者 小象學院 楊 磊,更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視。

3.2 spark排程機制

spark排程機制是保證spark應用高效執行的關鍵。本節從application、job、stage和task的次元,從上層到底層來一步一步揭示spark的排程政策。

3.2.1 application的排程

spark中,每個application對應一個sparkcontext。sparkcontext之間的排程關系取決于spark的運作模式。對standalone模式而言,spark master節點先計算叢集内的計算資源能否滿足等待隊列中的應用對記憶體和cpu資源的需求,如果可以,則master建立spark driver,啟動應用的執行。宏觀上來講,這種對應用的排程類似于fifo政策。在mesos和yarn模式下,底層的資源排程系統的排程政策都是由mesos和yarn決定的。具體分類描述如下:

1. standalone模式

預設以使用者送出application的順序來排程,即fifo政策。每個應用執行時獨占所有資源。如果有多個使用者要共享叢集資源,則可以使用參數spark.cores.max來配置應用在叢集中可以使用的最大cpu核數。如果不配置,則采用預設參數spark.deploy.defaultcore的值來确定。

2. mesos模式

如果在mesos上運作spark,使用者想要靜态配置資源的話,可以設定spark.mesos.coarse為true,這樣mesos變為粗粒度排程模式,然後可以設定spark.cores.max指定叢集中可以使用的最大核數,與上面的standalone模式類似。同時,在mesos模式下,使用者還可以設定參數spark.executor.memory來配置每個executor的記憶體使用量。如果想使mesos在細粒度模式下運作,可以通過mesos://<url-info>設定動态共享cpu core的執行模式。在這種模式下,應用不執行時的空閑cpu資源得以被其他使用者使用,提升了cpu使用率。

3. yarn模式

如果在yarn上運作spark,使用者可以在yarn的用戶端上設定--num-executors 來控制為應用配置設定的executor數量,然後設定--executor-memory指定每個executor的記憶體大小,設定--executor-cores指定executor占用的cpu核數。

3.2.2 job的排程

前面章節提到過,spark應用程式實際上是一系列對rdd的操作,這些操作直至遇見action算子,才觸發job的送出。事實上,在底層實作中,action算子最後調用了runjob函數送出job給spark。其他的操作隻是生成對應的rdd關系鍊。如在rdd.scala程式檔案中,count函數源碼所示。

def count():

long = sc.runjob(this, utils.getiteratorsize _).sum

其中sc為sparkcontext的對象。可見在spark中,對job的送出都是在action算子中隐式完成的,并不需要使用者顯式地送出作業。在sparkcontext中job送出的實作中,最後會調用dagscheduler中的job送出接口。dagscheduler最重要的任務之一就是計算job與task的依賴關系,制定排程邏輯。

job排程的基本工作流程如圖3-4所示,每個job從送出到完成,都要經曆一系列步驟,拆分成以tsk為最小機關,按照一定邏輯依賴關系的執行序列。

《循序漸進學Spark》一3.2 Spark排程機制

圖3-4 job的排程流程

圖3-5則從job排程流程中的細節子產品出發,揭示了工作流程與對應子產品之間的關系。從整體上描述了各個類在job排程流程中的互動關系。

《循序漸進學Spark》一3.2 Spark排程機制

圖3-5 job排程流程細節

在spark1.5.0的排程目錄下的schedulingalgorithm.scala檔案中,描述了spark對job的排程模式。

1. fifo模式

預設情況下,spark對job以fifo(先進先出)的模式進行排程。在schedulingalgorithm.scala檔案中聲明了fifo算法實作。

3. 配置排程池

dagscheduler建構了具有依賴關系的任務集。taskscheduler負責提供任務給task-setmanager作為排程的先決條件。tasksetmanager負責具體任務集内部的排程任務。排程池(pool)則用于排程每個sparkcontext運作時并存的多個互相獨立無依賴關系的任務集。排程池負責管理下一級的排程池和tasksetmanager對象。

使用者可以通過配置檔案定義排程池的屬性。一般排程池支援如下3個參數:

1)排程模式scheduling mode:使用者可以設定fifo或者fair排程方式。

2)weight:排程池的權重,在擷取叢集資源上權重高的可以擷取多個資源。

3)minishare:代表計算資源中的cpu核數。

使用者可以通過conf/fairscheduler.xml配置排程池的屬性,同時要在sparkconf對象中配置屬性。

3.2.3 stage(排程階段)和tasksetmanager的排程

1. stage劃分

當一個job被送出後,dagscheduler會從rdd依賴鍊的末端觸發,周遊整個rdd依賴鍊,劃分stage(排程階段)。劃分依據主要基于shuffledependency依賴關系。換句話說,當某rdd在計算中需要将資料進行shuffle操作時,這個包含shuffle操作的rdd将會被用來作為輸入資訊,構成一個新的stage。以這個基準作為劃分stage,可以保證存在依賴關系的資料按照正确資料得到處理和運算。在spark1.5.0的源代碼中,dagscheduler.scala中的getparentstages函數的實作從一定角度揭示了stage的劃分邏輯。

2. stage排程

在第一步的stage劃分過程中,會産生一個或者多個互相關聯的stage。其中,真正執行action算子的rdd所在的stage被稱為final stage。dagscheduler會從這個final stage生成作業執行個體。

在stage送出時,dagscheduler首先會判斷該stage的父stage的執行結果是否可用。如果所有父stage的執行結果都可用,則送出該stage。如果有任意一個父stage的結果不可用,則嘗試疊代送出該父stage。所有結果不可用的stage都将會被加入waiting隊列,等待執行,如圖3-6所示。

《循序漸進學Spark》一3.2 Spark排程機制

圖3-6 stage依賴

在圖3-6中,虛箭頭表示依賴關系。stage序号越小,表示stage越靠近上遊。

圖3-6中的stage排程運作順序如圖3-7所示。

《循序漸進學Spark》一3.2 Spark排程機制

圖3-7 stage執行順序

從圖3-7可以看出,上遊父stage先得到執行,waiting queue中的stage随後得到執行。

3.

tasksetmanager

每個stage的送出會被轉化為一組task的送出。dagscheduler最終通過調用taskscheduler的接口來送出這組任務。在taskscheduler内部實作中建立了tasksetmanager執行個體來管理任務集taskset的生命周期。事實上可以說每個stage對應一個tasksetmanager。

至此,dagscheduler的工作基本完畢。taskscheduler在得到叢集計算資源時,taskset-manager會配置設定task到具體worker節點上執行。在spark1.5.0的taskschedulerimpl.scala檔案中,送出task的函數實作如下:

當tasksetmanager進入到排程池中時,會依據job id對tasksetmanager排序,總體上先進入的tasksetmanager先得到排程。對于同一job内的tasksetmanager而言,job id較小的先得到排程。如果有的tasksetmanager父stage還未執行完,則該taskset-manager不會被放到排程池。

3.2.4 task的排程

在dagscheduler.scala中,定義了函數submitmissingtasks,讀者閱讀完整實作,從中可以看到task的排程方式。限于篇幅,以下截取部分代碼。

繼續閱讀