天天看點

Spark源碼分析之四:Stage送出

        Stage送出階段的主要目的就一個,就是将每個Stage生成一組Task,即TaskSet,其處理流程如下圖所示:

Spark源碼分析之四:Stage送出

        與Stage劃分階段一樣,我們還是從handleJobSubmitted()方法入手,在Stage劃分階段,包括最好的ResultStage和前面的若幹ShuffleMapStage均已生成,那麼順理成章的下一步便是Stage的送出。在handleJobSubmitted()方法的最後兩行代碼,便是Stage送出的處理。代碼如下:

        從代碼我們可以看出,Stage送出的邏輯順序,是由後往前,即先送出最後一個finalStage,即ResultStage,然後再送出其parent stages,但是實際實體順序是否如此呢?我們首先看下finalStage的送出,方法submitStage()代碼如下:

        代碼邏輯比較簡單。根據stage擷取到jobId,如果jobId未定義,說明該stage不屬于明确的Job,則調用abortStage()方法放棄該stage。如果jobId已定義的話,則需要判斷該stage屬于waitingStages、runningStages、failedStages中任意一個,則該stage忽略,不被處理。顧名思義,waitingStages為等待處理的stages,spark采取由後往前的順序處理stage送出,即先處理child stage,然後再處理parent

stage,是以位于waitingStages中的stage,由于其child stage尚未處理,是以必須等待,runningStages為正在運作的stages,正在運作意味着已經送出了,是以無需再送出,而最後的failedStages就是失敗的stages,既然已經失敗了,再送出也還是會失敗,徒勞無益啊~

        此時,如果stage不位于上述三個資料結構中,則可以繼續執行送出流程。接下來該怎麼做呢?

        首先調用getMissingParentStages()方法,擷取stage還沒有送出的parent,即missing;如果missing為空,說明該stage要麼沒有parent stage,要麼其parent stages都已被送出,此時該stage就可以被送出,用于送出的方法submitMissingTasks()我們稍後分析。

        如果missing不為空,則說明該stage還存在尚未被送出的parent stages,那麼,我們就需要周遊missing,循環送出每個stage,并将該stage添加到waitingStages中,等待其parent stages都被送出後再被送出。

        我們先看下這個missing是如何擷取的。進入getMissingParentStages()方法,代碼如下:

        1、missing:HashSet[Stage]類型,存儲尚未送出的parent stages,用于最後結果的傳回;

        2、visited:HashSet[RDD[_]]類型,已被處理的RDD集合,位于其中的RDD不會被重複處理;

        3、waitingForVisit:Stack[RDD[_]]類型,等待被處理的RDD棧,後入先出。

        visit()方法的處理邏輯也比較簡單,大緻如下:

        通過RDD是否在visited中判斷RDD是否已處理,若未被處理,添加到visited中,然後循環rdd的dependencies,如果是寬依賴ShuffleDependency,調用getShuffleMapStage(),擷取ShuffleMapStage(此次調用則是直接取出已生成的stage,因為劃分階段已将stage全部生成,拿來主義即可),判斷該stage的isAvailable标志位,若為false,則說明該stage未被送出過,加入到missing集合,如果是窄依賴NarrowDependency,直接将RDD壓入waitingForVisit棧,等待後續處理,因為窄依賴的RDD同屬于同一個stage,加入waitingForVisit隻是為了後續繼續沿着DAG圖繼續往上處理。

        那麼,整個missing的擷取就一目了然,将final stage即ResultStage的RDD壓入到waitingForVisit頂部,循環處理即可得到missing。

        至此,各位可能有個疑問,這個ShuffleMapStage的isAvailable為什麼能決定該stage是否已被送出呢?賣個關子,後續再分析。

        submitStage()方法已分析完畢,go on,我們再回歸到handleJobSubmitted()方法,在調用submitStage()方法送出finalStage之後,實際上隻是将最原始的parent stage送出,其它child stage均存儲在了waitingStages中,那麼,接下來,我們就要調用submitWaitingStages()方法送出其中的stage。代碼如下:

        很簡單,既然stages的順序已經梳理正确,将waitingStages轉換為數組waitingStagesCopy,針對每個stage挨個調用submitStage()方法進行送出即可。

        還記得我賣的那個關子嗎?ShuffleMapStage的isAvailable為什麼能決定該stage是否已被送出呢?現在來解開這個謎團。首先,看下ShuffleMapStage的isAvailable是如何定義的,在ShuffleMapStage中,代碼如下:

        它是通過判斷_numAvailableOutputs和numPartitions是否相等來确定stage是否已被送出(或者說準備就緒可以送出is ready)的,而numPartitions很好了解,就是stage中的全部分區數目,那麼_numAvailableOutputs是什麼呢?

        可以看出,_numAvailableOutputs就是擁有shuffle outputs的分區數量,當這個numAvailableOutputs達到numPartitions時,這個map stage也就準備好了。

        那麼這個_numAvailableOutputs開始時預設為0,它是在何時被指派的呢?通篇看完ShuffleMapStage的源碼,隻有兩個方法對_numAvailableOutputs的值做修改,代碼如下:

        什麼時候調用的這個addOutputLoc()方法呢?答案就在DAGScheduler的newOrUsedShuffleStage()方法中。方法主要邏輯如下:

        這個方法在stage劃分過程中,第一輪被調用,此時mapOutputTracker中并沒有注冊shuffle相關資訊,是以走的是else分支,調用mapOutputTracker的registerShuffle()方法注冊shuffle,而在stage送出過程中,第二輪被調用,此時shuffle已在mapOutputTracker中注冊,則會根據shuffleId從mapOutputTracker中擷取序列化的多個MapOutputStatus對象,反序列化并循環調用stage的addOutputLoc()方法,更新stage的outputLocs,并累加_numAvailableOutputs,至此,關子賣完,再有疑問,後續再慢慢分析吧。

        到了這裡,就不得不分析下真正送出stage的方法submitMissingTasks()了。莫慌,慢慢看,代碼如下:

        submitMissingTasks()方法,最主要的就是針對每個stage生成一組Tasks,即TaskSet,并調用TaskScheduler的submitTasks()方法送出tasks。它主要做了以下幾件事情:

        1、清空stage的pendingPartitions;

        2、首先确定該stage需要計算的分區ID索引,儲存至partitionsToCompute;

        3、将stage加入到runningStages中,标記stage正在運作,與上面的闡述對應;

        4、開啟一個stage時,需要調用outputCommitCoordinator的stageStart()方法;

        5、建立一個Map:taskIdToLocations,存儲的是id->Seq[TaskLocation]的映射關系,并對stage中指定RDD的每個分區擷取位置資訊,映射成id->Seq[TaskLocation]的關系;

        6、标記新的stage attempt,并發送一個SparkListenerStageSubmitted事件;

        7、對stage進行序列化并廣播,如果是ShuffleMapStage,序列化rdd和shuffleDep,如果是ResultStage,序列化rdd和func;

        8、最重要的,針對stage的每個分區構造task,形成tasks:ShuffleMapStage生成ShuffleMapTasks,ResultStage生成ResultTasks;

        9、如果存在tasks,則利用taskScheduler.submitTasks()送出task,否則标記stage已完成。

        至此,stage送出的主體流程已全部分析完畢,後續的Task排程與執行留待以後分析,而stage送出部分細節或者遺漏之處,特别是task生成時的部分細節,也留待以後再細細琢磨吧~

        晚安!

繼續閱讀