spark作為一套高效的分布式運算架構,但是想要更深入的學習它,就要通過分析spark的源碼,不但可以更好的幫助了解spark的工作過程,還可以提高對叢集的排錯能力,本文主要關注的是spark的master的啟動流程與worker啟動流程。
我們啟動一個master是通過shell指令啟動了一個腳本<code>start-master.sh</code>開始的,這個腳本的啟動流程如下
我們可以看到腳本首先啟動了一個<code>org.apache.spark.deploy.master.master</code>類,啟動時會傳入一些參數,比如cpu的執行核數,記憶體大小,app的main方法等
檢視master類的main方法
這裡主要看<code>startsystemandactor</code>方法
spark底層通信使用的是akka
通過actorsystem建立actor -> actorsystem.actorof, 就會執行master的構造方法->然後執行actor生命周期方法
執行master的構造方法初始化一些變量
主構造器執行完就會執行prestart --》執行完receive方法
prestart方法裡建立了一個定時器,定時檢查woker的逾時時間 <code>val worker_timeout = conf.getlong("spark.worker.timeout", 60) * 1000</code> 預設為60秒
到此master的初始化的主要過程到我們已經看到了,主要就是構造一個master的actor進行等待消息,并初始化了一堆集合來儲存worker資訊,和一個定時器來檢查worker的逾時
master啟動時序圖
通過shell腳本執行<code>salves.sh</code> -> 通過讀取slaves 通過ssh的方式啟動遠端的worker
<code>spark-daemon.sh start org.apache.spark.deploy.worker.worker</code>
腳本會啟動<code>org.apache.spark.deploy.worker.worker</code>類
看worker源碼
這裡最重要的是woker的startsystemandactor
這裡worker同樣的構造了一個屬于worker的actor對象,到此worker的啟動初始化完成
根據actor生命周期接着worker的prestart方法被調用
這裡調用了一個registerwithmaster方法,開始向master注冊
registerwithmaster裡通過比對調用了tryregisterallmasters方法
,接下來看
通過<code>masterakkaurl</code>和master建立連接配接後
<code>actor ! registerworker(workerid, host, port, cores, memory, webui.boundport, publicaddress)</code>worker向master發送了一個消息,帶去一些參數,id,主機,端口,cpu核數,記憶體等待
這裡是最主要的内容;
receivewithlogging裡會輪詢到worker發送的消息,
master收到消息後将參數封裝成workinfo對象添加到集合中,并加入到持久化引擎中
<code>sender ! registeredworker(masterurl, masterwebuiurl)</code>向worker發送一個消息回報
接下來看worker的receivewithlogging
worker接受來自master的注冊成功的回報資訊,啟動定時器,定時發送心跳heartbeat
master端的receivewithlogging收到心跳消息
記錄并更新<code>workerinfo.lastheartbeat = system.currenttimemillis()</code>最後一次心跳時間
master的定時任務會不斷的發送一個<code>checkforworkertimeout</code>内部消息不斷的輪詢集合裡的worker資訊,如果超過60秒就将worker資訊移除
timeoutdeadworkers方法
如果 (最後一次心跳時間<目前時間-逾時時間)則判斷為worker逾時,
将集合裡的資訊移除。
當下一次收到心跳資訊時,如果是已注冊過的,workerid不為空,但是workerinfo已被移除的條件,就會<code>sender ! reconnectworker(masterurl)</code>發送一個重新注冊的消息
worker與master時序圖
master與worker啟動以後的大緻的通信流程到此,接下來就是如何啟動叢集上的executor 程序計算任務了。