天天看点

mapreduce job提交流程源码级分析(三)

  这篇文章说到了jobSubmitClient.submitJob(jobId,

submitJobDir.toString(),

jobCopy.getCredentials())提交job,最终调用的是JobTracker.submitJob;而这篇文章则是分析的JobTracker的启动过程,JobTracker启动之后就会等待提交作业管理作业等。

  接下来看看JobTracker.submitJob方法,调用这个方法之前已经将相关的资源分片信息、配置信息、外部文件、第三方jar包、一些归档文件以及job.jar上传到HDFS中了。

  一、首先看看jobs众有无要提交的Job,jobs是一个Map<JobID, JobInProgress>

这里存储着所有已知的Job及其对应的JobInProgress信息。如果已经存在这个Job则直接返回这个Job的状态;如果不存在则利用JobID和jobSubmitDir构造一个JobInfo对象,JobInfo类实现了Writable可以被序列化,而且存储三个字段JobID、user、以及上传资源的目录jobSubmitDir;

  二、创建一个JobInProgress对象,JobInProgress类主要用于监控和跟踪作业运行状态,存在于作业的整个运行过程中,并未调度器提供最底层的调度接口,维护了两部分信息:一种是静态信息这些在作业提交之时就确定好了;另一种是动态的会随着作业的运行而动态变化的。job

= new JobInProgress(this, this.conf, jobInfo, 0,

ts),这里会创建一个JobProfile一直跟踪作业的运行,不管作业作业活着还是死了;

  三、checkMemoryRequirements(job)检查Job是否有无效的内存需求而不能运行,检查JobTracker的配置有无问题,再检查Job的内存配置有无问题;

  四、是否存储作业信息以备恢复。在1.0.0版本中这还没实现(在这就是没存储信息),要存的信息是一个JobInfo对象存储着作业的存储目录、ID以及user。

  五、status = addJob(jobId,

job)这是核心的提交方法。会将此Job放入jobs中,jobs保存着JobTracker所有运行作业的对应关系<jobID,JobInProgress>;然后让所有的JobInProgressListener监听这个Job,根据

中可以知道这些JobInProgressListener实例都是通过调度器初始化(JobQueueTaskScheduler.start()方法)时,有俩线程一个是监控Job生命周期的,一个是对新加入的Job初始化的;然后加入监控统计中,返回job状态job.getStatus()。

  这样Job的提交过程就完了,剩下的就是作业的调度分配及监控了。后续再讲吧

参考:

  董西成,《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》