天天看點

Spark技術内幕:Stage劃分及送出源碼分析handleJobSubmitted

當觸發一個rdd的action後,以count為例,調用關系如下:

org.apache.spark.rdd.rdd#count

org.apache.spark.sparkcontext#runjob

org.apache.spark.scheduler.dagscheduler#runjob

org.apache.spark.scheduler.dagscheduler#submitjob

org.apache.spark.scheduler.dagschedulereventprocessactor#receive(jobsubmitted)

org.apache.spark.scheduler.dagscheduler#handlejobsubmitted

其中步驟五的dagschedulereventprocessactor是dagscheduler 的與外部互動的接口代理,dagscheduler在建立時會建立名字為eventprocessactor的actor。這個actor的作用看它的實作就一目了然了:

總結一下org.apache.spark.scheduler.dagschedulereventprocessactor的作用:可以把他了解成dagscheduler的對外的功能接口。它對外隐藏了自己内部實作的細節,也更易于了解其邏輯;也降低了維護成本,将dagscheduler的比較複雜功能接口化。

org.apache.spark.scheduler.dagscheduler#handlejobsubmitted首先會根據rdd建立finalstage。finalstage,顧名思義,就是最後的那個stage。然後建立job,最後送出。送出的job如果滿足一下條件,那麼它将以本地模式運作:

1)spark.localexecution.enabled設定為true  并且 2)使用者程式顯式指定可以本地運作 并且 3)finalstage的沒有父stage 并且 4)僅有一個partition

3)和 4)的話主要為了任務可以快速執行;如果有多個stage或者多個partition的話,本地運作可能會因為本機的計算資源的問題而影響任務的計算速度。

要了解什麼是stage,首先要搞明白什麼是task。task是在叢集上運作的基本機關。一個task負責處理rdd的一個partition。rdd的多個patition會分别由不同的task去處理。當然了這些task的處理邏輯完全是一緻的。這一組task就組成了一個stage。有兩種task:

 org.apache.spark.scheduler.shufflemaptask

 org.apache.spark.scheduler.resulttask

shufflemaptask根據task的partitioner将計算結果放到不同的bucket中。而resulttask将計算結果發送回driver application。一個job包含了多個stage,而stage是由一組完全相同的task組成的。最後的stage包含了一組resulttask。

在使用者觸發了一個action後,比如count,collect,sparkcontext會通過runjob的函數開始進行任務送出。最後會通過dag的event processor 傳遞到dagscheduler本身的handlejobsubmitted,它首先會劃分stage,送出stage,送出task。至此,task就開始在運作在叢集上了。

一個stage的開始就是從外部存儲或者shuffle結果中讀取資料;一個stage的結束就是由于發生shuffle或者生成結果時。

handlejobsubmitted 通過調用newstage來建立finalstage:

建立一個result stage,或者說finalstage,是通過調用org.apache.spark.scheduler.dagscheduler#newstage完成的;而建立一個shuffle stage,需要通過調用org.apache.spark.scheduler.dagscheduler#neworusedstage。 

對于result 的final stage來說,傳入的shuffledep是none。

我們知道,rdd通過org.apache.spark.rdd.rdd#getdependencies可以獲得它依賴的parent rdd。而stage也可能會有parent stage。看一個rdd論文的stage劃分吧:

Spark技術内幕:Stage劃分及送出源碼分析handleJobSubmitted

一個stage的邊界,輸入是外部的存儲或者一個stage shuffle的結果;輸入則是job的結果(result task對應的stage)或者shuffle的結果。

上圖的話stage3的輸入則是rdd a和rdd f shuffle的結果。而a和f由于到b和g需要shuffle,是以需要劃分到不同的stage。

從源碼實作的角度來看,通過觸發action也就是最後一個rdd建立final stage(上圖的stage 3),我們注意到new stage的第五個參數就是該stage的parent stage:通過rdd和job id擷取:

生成了finalstage後,就需要送出stage了。

dagscheduler将stage劃分完成後,送出實際上是通過把stage轉換為taskset,然後通過taskscheduler将計算任務最終送出到叢集。其所在的位置如下圖所示。

Spark技術内幕:Stage劃分及送出源碼分析handleJobSubmitted

接下來,将分析stage是如何轉換為taskset,并最終送出到executor去運作的。

btw,最近工作太忙了,基本上到家洗漱完都要10點多。也再沒有精力去進行源碼解析了。幸運的是周末不用加班。是以以後的博文更新都要集中在周末了。加油。

繼續閱讀