天天看點

Flink執行時之client送出作業圖-下

submitJob方法分析

JobClientActor通過向JobManager的Actor發送SubmitJob消息來送出Job,JobManager接收到消息對象之後。建構一個JobInfo對象以封裝Job的基本資訊。然後将這兩個對象傳遞給submitJob方法:

case SubmitJob(jobGraph, listeningBehaviour) =>  
    val client = sender()  
    val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),    
        jobGraph.getSessionTimeout)  
    submitJob(jobGraph, jobInfo)      

我們會以submitJob的關鍵方法調用來串講其主要邏輯。首先推斷jobGraph參數,假設為空則直接回應JobResultFailure消息:

if (jobGraph == null) {  
    jobInfo.client ! decorateMessage(JobResultFailure(    
        new SerializedThrowable(      
            new JobSubmissionException(null, "JobGraph must not be null.")    
        )  
    ))
}      

接着,向類庫緩存管理器注冊該Job相關的庫檔案、類路徑:

libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,  
                                jobGraph.getClasspaths)      

必須確定該步驟領先成功運作,由于一旦興許産生不論什麼異常才幹夠確定上傳的類庫和Jar等被成功從類庫緩存管理器中移除。從這開始的整個代碼段都被包裹在try語句塊中。一旦捕獲到不論什麼異常。會通過libraryCacheManager的unregisterJob方法将相關Jar檔案删除:

catch {  case t: Throwable =>    
    libraryCacheManager.unregisterJob(jobId)
    //...
}      

接下來是獲得使用者代碼的類載入器classLoader以及發生失敗時的重新啟動政策restartStrategy:

val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
val restartStrategy = Option(jobGraph.getRestartStrategyConfiguration())  
    .map(RestartStrategyFactory.createRestartStrategy(_)) match {    
        case Some(strategy) => strategy    
        case None => defaultRestartStrategy  
}      

接着,獲得運作圖ExecutionGraph對象的執行個體。首先嘗試從緩存中查找。假設緩存中存在則直接傳回,否則直接建立然後增加緩存:

executionGraph = currentJobs.get(jobGraph.getJobID) match {  
    case Some((graph, currentJobInfo)) =>    
        currentJobInfo.setLastActive()    
        graph  
    case None =>    
        val graph = new ExecutionGraph(      
            executionContext,      
            jobGraph.getJobID,      
            jobGraph.getName,      
            jobGraph.getJobConfiguration,      
            timeout,      
            restartStrategy,      
            jobGraph.getUserJarBlobKeys,      
            jobGraph.getClasspaths,      
            userCodeLoader)    
        currentJobs.put(jobGraph.getJobID, (graph, jobInfo))    
        graph
}      

獲得了executionGraph之後會對其相關屬性進行設定。這些屬性包括排程模式、是否同意被增加排程隊列、計劃的Json格式表示。

executionGraph.setScheduleMode(jobGraph.getScheduleMode())
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling())
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph))      

接下來初始化JobVertex的一些屬性:

val numSlots = scheduler.getTotalNumberOfSlots()
for (vertex <- jobGraph.getVertices.asScala) {  
    val executableClass = vertex.getInvokableClassName 
    if (vertex.getParallelism() == ExecutionConfig.PARALLELISM_AUTO_MAX) {    
        vertex.setParallelism(numSlots)  
    }  
    vertex.initializeOnMaster(userCodeLoader)
}      

獲得JobGraph中從source開始的依照拓撲順序排序的頂點集合,然後将該集合附加到ExecutionGraph上,附加的過程完畢了非常多事情。我們興許進行分析:

val sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources()
executionGraph.attachJobGraph(sortedTopology)      

接下來将快照配置和檢查點配置的資訊寫入ExecutionGraph:

val snapshotSettings = jobGraph.getSnapshotSettings
if (snapshotSettings != null) {  
    val jobId = jobGraph.getJobID()  
    val idToVertex: JobVertexID => ExecutionJobVertex = id => {    
        val vertex = executionGraph.getJobVertex(id)      
        vertex  
    }  
    val triggerVertices: java.util.List[ExecutionJobVertex] =    
        snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava  
    val ackVertices: java.util.List[ExecutionJobVertex] =    
        snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava  
    val confirmVertices: java.util.List[ExecutionJobVertex] =    
        snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava  
    val completedCheckpoints = checkpointRecoveryFactory    
        .createCompletedCheckpoints(jobId, userCodeLoader)  
    val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId)  
    executionGraph.enableSnapshotCheckpointing(    
        snapshotSettings.getCheckpointInterval,    
        snapshotSettings.getCheckpointTimeout,    
        snapshotSettings.getMinPauseBetweenCheckpoints,    
        snapshotSettings.getMaxConcurrentCheckpoints,    
        triggerVertices,    
        ackVertices,    
        confirmVertices,    
        context.system,    
        leaderSessionID.orNull,    
        checkpointIdCounter,    
        completedCheckpoints,    
        recoveryMode,    
        savepointStore)
}      

JobManager自身會注冊Job狀态變更的事件回調:

executionGraph.registerJobStatusListener(new AkkaActorGateway(self, leaderSessionID.orNull))      

假設Client也須要感覺到運作結果以及Job狀态的變更,那麼也會為Client注冊事件回調:

if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {    
    val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull)  
    executionGraph.registerExecutionListener(gateway)  
    executionGraph.registerJobStatusListener(gateway)
}      

以上這些代碼從将Job相關的Jar增加到類庫緩存管理器開始,都被包裹在try塊中。假設産生異常将進入catch代碼塊中進行異常處理:

catch {  
    case t: Throwable =>    
        log.error(s"Failed to submit job $jobId ($jobName)", t)    
        libraryCacheManager.unregisterJob(jobId)    
        currentJobs.remove(jobId)    
        if (executionGraph != null) {      
            executionGraph.fail(t)    
        }    
        val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {      
            t    
        } else {      
            new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)    
        }    
        jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(rt)))    
        return
}      

異常處理時首先依據jobID移除類庫緩存中跟目前Job有關的類庫,接着從currentJobsMap中移除job相應的ExecutionGraph,JobInfo元組資訊。然後調用ExecutionGraph的fail方法。促使其失敗。最後。将産生的異常以JobResultFailure消息告知用戶端并結束方法調用。

從目前開始直到最後的這段代碼可能會造成堵塞,将會被包裹在future塊中并以異步的方式運作。先推斷目前的是否是恢複模式,假設是恢複模式則從近期的檢查點恢複:

if (isRecovery) {  
    executionGraph.restoreLatestCheckpointedState()
}      

假設不是恢複模式,但快照配置中存在儲存點路徑。也将基于儲存點來重置狀态:

executionGraph.restoreSavepoint(savepointPath)        

然後會把目前的JobGraph資訊寫入SubmittedJobGraphStore,它主要用于恢複的目的

submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))      

運作到這一步。就能夠向Client回複JobSubmitSuccess消息了:

jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))      

接下來會基于ExecutionGraph觸發Job的排程,這是Task被運作的前提:

if (leaderElectionService.hasLeadership) {  
    executionGraph.scheduleForExecution(scheduler)
} else {  
    self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))  
}      

為了防止多個JobManager同一時候排程同樣的Job的情況産生,這裡首先推斷目前節點是否是Leader。

假設是,才會進行排程。

否則将會向自身發送一條RemoveJob消息。以進入其它處理邏輯。