天天看點

spark源碼分析之任務排程篇

dag(directed acyclic graph)叫做有向無環圖,原始的rdd通過一系列的轉換就就形成了dag,根據rdd之間的依賴關系的不同将dag劃分成不同的stage,對于窄依賴,partition的轉換處理在stage中完成計算。對于寬依賴,由于有shuffle的存在,隻能在parent rdd處理完成後,才能開始接下來的計算,是以寬依賴是劃分stage的依據。

窄依賴 指的是每一個父rdd的partition最多被子rdd的一個partition使用

寬依賴 指的是多個子rdd的partition會依賴同一個父rdd的partition

當我們看完executor的建立與啟動流程後,我們繼續在sparkcontext的構造方法中繼續檢視

在構造方法中還建立了一個dagscheduler對象,這個類的任務就是用來劃分stage任務的,構造方法中初始化了 <code>private[scheduler] val eventprocessloop = new dagschedulereventprocessloop(this)</code>

dagschedulereventprocessloop是一個事件總線對象,用來負責任務的分發,在構造方法<code>eventprocessloop.start()</code>被調用,該方法是父類eventloop的start

調用了eventthread的start方法,開啟了一個線程

run方法中不斷的從linkedblockingdeque阻塞隊列中取消息,然後調用<code>onreceive(event)</code>方法,該方法是由子類dagschedulereventprocessloop實作的

onreceive中會比對到傳入的任務類型,執行相應的邏輯。到此dagscheduler的排程隊列會一直挂起,不斷輪詢隊列中的任務。

當rdd經過一系列的轉換transformation方法後,最終要執行action動作方法,這裡比如wordcount程式中最後調用<code>collect()</code>方法時會将資料送出到master上運作,任務真正的被執行,這裡的方法執行過程如下

<code>sc</code> 是sparkcontext對象,這裡調用 一個<code>runjob</code>該方法調用多次重載的方法後,該方法最終會調用 <code>dagscheduler.runjob</code>

dagscheduler的<code>runjob</code>是我們比較關心的

這裡面的我們主要看的是<code>submitjob(rdd, func, partitions, callsite, allowlocal, resulthandler, properties)</code>送出任務

這裡比較關鍵的地方是<code>eventprocessloop.post</code>往任務隊列中加入一個jobsubmitted類型的任務,eventprocessloop是在構造方法中就初始化好的事件總線對象,内部有一個線程不斷的輪詢隊列裡的任務

輪詢到任務後調用<code>onreceive</code>方法比對任務類型,在這裡我們送出的任務是jobsubmitted類型

調用了<code>handlejobsubmitted</code>方法,接下來檢視該方法

上面的代碼中,調用了<code>newstage</code>進行任務的劃分,該方法是劃分任務的核心方法,劃分任務的根據最後一個依賴關系作為開始,通過遞歸,将每個寬依賴做為切分stage的依據,切分stage的過程是流程中的一環,但在這裡不詳細闡述,當任務切分完畢後,代碼繼續執行來到<code>submitstage(finalstage)</code>這裡開始進行任務送出

這裡以遞歸的方式進行任務的送出

調用<code>submitmissingtasks(stage, jobid.get)</code>送出任務,将每一個stage和jobid傳入

這裡的代碼我們需要關注的是`taskscheduler.submittasks(

建立了一個taskset對象,将所有任務的資訊封裝,包括task任務清單,stageid,任務id,分區數參數等

該方法比較重要,主要将任務加入排程池,最後調用了<code>backend.reviveoffers()</code>這裡的backend是coarsegrainedschedulerbackend一個executor任務排程對象

這裡用了内部的driveractor對象發送了一個内部消息給自己,接下來檢視receiver方法接受的消息

收到消息後調用了<code>makeoffers()</code>方法

makeoffers方法中,将executor的資訊集合與排程池中的tasks封裝成wokeroffers清單傳給了

<code>launchtasks</code>

launchtasks方法将周遊tasks集合,每個task任務序列化,發送啟動task執行消息的給executor

executor的onreceive方法

executor收到driveractor發送的啟動task的消息,這裡才開始真正執行任務了,将收到的task序列化資訊反序列化,調用<code>executor</code>的<code>launchtask</code>方法執行任務

launchtask内将task送出到線程池去運作,taskrunner是runnable對象,裡面的run方法執行了我們app生成的每一個rdd的鍊上的邏輯。