submitJob方法分析
JobClientActor通過向JobManager的Actor發送SubmitJob消息來送出Job,JobManager接收到消息對象之後。建構一個JobInfo對象以封裝Job的基本資訊。然後将這兩個對象傳遞給submitJob方法:
case SubmitJob(jobGraph, listeningBehaviour) =>
val client = sender()
val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),
jobGraph.getSessionTimeout)
submitJob(jobGraph, jobInfo)
我們會以submitJob的關鍵方法調用來串講其主要邏輯。首先推斷jobGraph參數,假設為空則直接回應JobResultFailure消息:
if (jobGraph == null) {
jobInfo.client ! decorateMessage(JobResultFailure(
new SerializedThrowable(
new JobSubmissionException(null, "JobGraph must not be null.")
)
))
}
接着,向類庫緩存管理器注冊該Job相關的庫檔案、類路徑:
libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,
jobGraph.getClasspaths)
必須確定該步驟領先成功運作,由于一旦興許産生不論什麼異常才幹夠確定上傳的類庫和Jar等被成功從類庫緩存管理器中移除。從這開始的整個代碼段都被包裹在try語句塊中。一旦捕獲到不論什麼異常。會通過libraryCacheManager的unregisterJob方法将相關Jar檔案删除:
catch { case t: Throwable =>
libraryCacheManager.unregisterJob(jobId)
//...
}
接下來是獲得使用者代碼的類載入器classLoader以及發生失敗時的重新啟動政策restartStrategy:
val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration())
.map(RestartStrategyFactory.createRestartStrategy(_)) match {
case Some(strategy) => strategy
case None => defaultRestartStrategy
}
接着,獲得運作圖ExecutionGraph對象的執行個體。首先嘗試從緩存中查找。假設緩存中存在則直接傳回,否則直接建立然後增加緩存:
executionGraph = currentJobs.get(jobGraph.getJobID) match {
case Some((graph, currentJobInfo)) =>
currentJobInfo.setLastActive()
graph
case None =>
val graph = new ExecutionGraph(
executionContext,
jobGraph.getJobID,
jobGraph.getName,
jobGraph.getJobConfiguration,
timeout,
restartStrategy,
jobGraph.getUserJarBlobKeys,
jobGraph.getClasspaths,
userCodeLoader)
currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
graph
}
獲得了executionGraph之後會對其相關屬性進行設定。這些屬性包括排程模式、是否同意被增加排程隊列、計劃的Json格式表示。
executionGraph.setScheduleMode(jobGraph.getScheduleMode())
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))
接下來初始化JobVertex的一些屬性:
val numSlots = scheduler.getTotalNumberOfSlots()
for (vertex <- jobGraph.getVertices.asScala) {
val executableClass = vertex.getInvokableClassName
if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {
vertex.setParallelism(numSlots)
}
vertex.initializeOnMaster(userCodeLoader)
}
獲得JobGraph中從source開始的依照拓撲順序排序的頂點集合,然後将該集合附加到ExecutionGraph上,附加的過程完畢了非常多事情。我們興許進行分析:
val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
executionGraph.attachJobGraph(sortedTopology)
接下來将快照配置和檢查點配置的資訊寫入ExecutionGraph:
val snapshotSettings = jobGraph.getSnapshotSettings
if (snapshotSettings != null) {
val jobId = jobGraph.getJobID()
val idToVertex: JobVertexID => ExecutionJobVertex = id => {
val vertex = executionGraph.getJobVertex(id)
vertex
}
val triggerVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava
val ackVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava
val confirmVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava
val completedCheckpoints = checkpointRecoveryFactory
.createCompletedCheckpoints(jobId, userCodeLoader)
val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)
executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
snapshotSettings.getCheckpointTimeout,
snapshotSettings.getMinPauseBetweenCheckpoints,
snapshotSettings.getMaxConcurrentCheckpoints,
triggerVertices,
ackVertices,
confirmVertices,
context.system,
leaderSessionID.orNull,
checkpointIdCounter,
completedCheckpoints,
recoveryMode,
savepointStore)
}
JobManager自身會注冊Job狀态變更的事件回調:
executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID.orNull))
假設Client也須要感覺到運作結果以及Job狀态的變更,那麼也會為Client注冊事件回調:
if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)
executionGraph.registerExecutionListener(gateway)
executionGraph.registerJobStatusListener(gateway)
}
以上這些代碼從将Job相關的Jar增加到類庫緩存管理器開始,都被包裹在try塊中。假設産生異常将進入catch代碼塊中進行異常處理:
catch {
case t: Throwable =>
log.error(s"Failed to submit job $jobId ($jobName)", t)
libraryCacheManager.unregisterJob(jobId)
currentJobs.remove(jobId)
if (executionGraph != null) {
executionGraph.fail(t)
}
val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {
t
} else {
new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)
}
jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))
return
}
異常處理時首先依據jobID移除類庫緩存中跟目前Job有關的類庫,接着從currentJobsMap中移除job相應的ExecutionGraph,JobInfo元組資訊。然後調用ExecutionGraph的fail方法。促使其失敗。最後。将産生的異常以JobResultFailure消息告知用戶端并結束方法調用。
從目前開始直到最後的這段代碼可能會造成堵塞,将會被包裹在future塊中并以異步的方式運作。先推斷目前的是否是恢複模式,假設是恢複模式則從近期的檢查點恢複:
if (isRecovery) {
executionGraph.restoreLatestCheckpointedState()
}
假設不是恢複模式,但快照配置中存在儲存點路徑。也将基于儲存點來重置狀态:
executionGraph.restoreSavepoint(savepointPath)
然後會把目前的JobGraph資訊寫入SubmittedJobGraphStore,它主要用于恢複的目的
submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
運作到這一步。就能夠向Client回複JobSubmitSuccess消息了:
jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))
接下來會基于ExecutionGraph觸發Job的排程,這是Task被運作的前提:
if (leaderElectionService.hasLeadership) {
executionGraph.scheduleForExecution(scheduler)
} else {
self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
}
為了防止多個JobManager同一時候排程同樣的Job的情況産生,這裡首先推斷目前節點是否是Leader。
假設是,才會進行排程。
否則将會向自身發送一條RemoveJob消息。以進入其它處理邏輯。