天天看点

MapReduce V1:TaskTracker端启动Task流程分析

我们基于hadoop 1.2.1源码分析mapreduce v1的处理流程。

tasktracker周期性地向jobtracker发送心跳报告,在rpc调用返回结果后,解析结果得到jobtracker下发的运行task的指令,即launchtaskaction,就会在tasktracker节点上准备运行这个task。task的运行是在一个与tasktracker进程隔离的jvm实例中执行,该jvm实例是通过org.apache.hadoop.mapred.child来创建的,所以在创建child vm实例之前,需要做大量的准备工作来启动task运行。一个task的启动过程,如下序列图所示:

MapReduce V1:TaskTracker端启动Task流程分析

通过上图,结合源码,我们将一个task启动的过程,分为下面3个主要的步骤:

初始化跟踪task运行的相关数据结构

准备task运行所共享的job资源

启动task

下面,我们详细分析上面3个步骤的流程:

如果是launchtaskaction,则tasktracker会将该指令加入到一个启动task的队列中,进行一步加载处理,如下所示:

<code>1</code>

<code>private</code> <code>void</code> <code>addtotaskqueue(launchtaskaction action) {</code>

<code>2</code>

<code></code><code>if</code> <code>(action.gettask().ismaptask()) {</code>

<code>3</code>

<code></code><code>maplauncher.addtotaskqueue(action);</code>

<code>4</code>

<code></code><code>}</code><code>else</code> <code>{</code>

<code>5</code>

<code></code><code>reducelauncher.addtotaskqueue(action);</code>

<code>6</code>

<code></code><code>}</code>

<code>7</code>

<code>}</code>

根据task的类型,分别加入到对应类型的tasklauncher的队列中。这里需要了解一下tasklauncher线程类,在tasktracker中创建了2个tasklauncher线程,一个是为启动maptask,另一个是为启动reducetask。下面是tasklauncher类的构造方法:

<code>public</code> <code>tasklauncher(tasktype tasktype,</code><code>int</code> <code>numslots) {</code>

<code></code><code>this</code><code>.maxslots = numslots;</code>

<code></code><code>this</code><code>.numfreeslots =</code><code>new</code> <code>intwritable(numslots);</code>

<code></code><code>this</code><code>.taskstolaunch =</code><code>new</code> <code>linkedlist&lt;taskinprogress&gt;();</code>

<code></code><code>setdaemon(</code><code>true</code><code>);</code>

<code></code><code>setname(</code><code>"tasklauncher for "</code> <code>+ tasktype +</code><code>" tasks"</code><code>);</code>

构造方法中,参数tasktype表示task类型,分为maptask和reducetask,参数numslots表示对每一种类型的task每个tasktracker上最多可以启动的task的实例数,默认都是2个。在tasktracker初始化时,会读取mapred-site.xml配置文件,读取mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum配置的参数值,分别赋值给maxmapslots和maxreduceslots这2个属性,如下tasktracker构造方法中初始化这2个属性:

<code>maxmapslots = conf.getint(</code><code>"mapred.tasktracker.map.tasks.maximum"</code><code>,</code><code>2</code><code>);</code>

<code>maxreduceslots = conf.getint(</code><code>"mapred.tasktracker.reduce.tasks.maximum"</code><code>,</code><code>2</code><code>);</code>

然后,在tasktracker创建时,会根据上述maxmapslots和maxreduceslots的值来创建并启动2个tasklauncher线程:

<code>maplauncher =</code><code>new</code> <code>tasklauncher(tasktype.map, maxmapslots);</code>

<code>reducelauncher =</code><code>new</code> <code>tasklauncher(tasktype.reduce, maxreduceslots);</code>

<code>maplauncher.start();</code>

<code>reducelauncher.start();</code>

将launchtaskaction加入到tasklauncher的队列中,这个是调用tasklauncher的addtotaskqueue()方法:

<code>public</code> <code>void</code> <code>addtotaskqueue(launchtaskaction action) {</code>

<code></code><code>synchronized</code> <code>(taskstolaunch) {</code>

<code></code><code>taskinprogress tip = registertask(action,</code><code>this</code><code>);</code><code>// 注册task,初始化用来跟踪该待启动的task相关的数据结构</code>

<code></code><code>taskstolaunch.add(tip);</code><code>// 将tip加入队列</code>

<code></code><code>taskstolaunch.notifyall();</code><code>// 通知tasklauncher线程自己(在run()方法中会调用wait())启动task</code>

上面方法中,最关键的就是registertask()方法,调用该方法来初始化tasktracker端task对应taskinprogress结构,代码如下所示:

<code>01</code>

<code>private</code> <code>taskinprogress registertask(launchtaskaction action, tasklauncher launcher) {</code>

<code>02</code>

<code></code><code>task t = action.gettask();</code>

<code>03</code>

<code></code><code>log.info(</code><code>"launchtaskaction (registertask): "</code> <code>+ t.gettaskid() +</code><code>" task's state:"</code> <code>+ t.getstate());</code>

<code>04</code>

<code></code><code>taskinprogress tip =</code><code>new</code> <code>taskinprogress(t,</code><code>this</code><code>.fconf, launcher);</code><code>// 创建tip</code>

<code>05</code>

<code></code><code>synchronized</code> <code>(</code><code>this</code><code>) {</code>

<code>06</code>

<code></code><code>tasks.put(t.gettaskid(), tip);</code><code>// 加入到队列tasks:taskattemptid -&gt; taskinprogress</code>

<code>07</code>

<code></code><code>runningtasks.put(t.gettaskid(), tip);</code><code>// 加入到队列runningtasks:taskattemptid -&gt; taskinprogress</code>

<code>08</code>

<code></code><code>boolean</code> <code>ismap = t.ismaptask();</code>

<code>09</code>

<code></code><code>if</code> <code>(ismap) {</code>

<code>10</code>

<code></code><code>maptotal++;</code>

<code>11</code>

<code>12</code>

<code></code><code>reducetotal++;</code>

<code>13</code>

<code>14</code>

<code>15</code>

<code></code><code>return</code> <code>tip;</code>

<code>16</code>

上面方法中,tasks队列用来记录该tasktracker上所有的task,包括正在运行和已经完成的task,而队列runningtasks则表示当前tasktracker上正在运行的task。同时,通过maptotal和reducetotal来分别记录当前tasktracker上运行的总的maptask和reducetask的数量。

根据launchtaskaction创建的taskinprogress结构被加入到队列taskstolaunch中,然后通知tasklauncher线程,在方法run中检测并取出队列中taskinprogress对象,并判断当前tasktracker的资源状态能否启动一个task,如果可以则调用startnewtask()方法启动task,代码如下所示:

<code>taskinprogress tip;</code>

<code>task task;</code>

<code>synchronized</code> <code>(taskstolaunch) {</code>

<code></code><code>while</code> <code>(taskstolaunch.isempty()) {</code><code>// 队列为空,则没有task需要启动,等待向队列加入launchtaskaction指令及其通知</code>

<code></code><code>taskstolaunch.wait();</code>

<code></code><code>tip = taskstolaunch.remove(</code><code>0</code><code>);</code><code>// 队列不空,则取出launchtaskaction</code>

<code></code><code>task = tip.gettask();</code>

<code></code><code>log.info(</code><code>"trying to launch : "</code> <code>+ tip.gettask().gettaskid() +</code><code>" which needs "</code> <code>+ task.getnumslotsrequired() +</code><code>" slots"</code><code>);</code>

<code>synchronized</code> <code>(numfreeslots) {</code><code>// 检查当前是否存在空闲的slot,以便运行task</code>

<code></code><code>boolean</code> <code>canlaunch =</code><code>true</code><code>;</code>

<code></code><code>while</code> <code>(numfreeslots.get() &lt; task.getnumslotsrequired()) {</code><code>// 如果当前空闲slot小于该task运行所需的slot数量</code>

<code></code><code>if</code> <code>(!tip.canbelaunched()) {</code><code>// 如果tip状态不是下面3种状态:unassigned、failed_unclean、killed_unclean</code>

<code></code><code>canlaunch =</code><code>false</code><code>;</code><code>// 检查tip状态不能启动task,但也不能阻塞该方法</code>

<code></code><code>break</code><code>;</code>

<code>17</code>

<code>18</code>

<code></code><code>log.info(</code><code>"tasklauncher : waiting for "</code> <code>+ task.getnumslotsrequired() +</code><code>" to launch "</code> <code>+ task.gettaskid() +</code><code>", currently we have "</code> <code>+ numfreeslots.get() +</code><code>" free slots"</code><code>);</code>

<code>19</code>

<code></code><code>numfreeslots.wait();</code><code>// 如果没有空闲slot,则等待</code>

<code>20</code>

<code>21</code>

<code></code><code>if</code> <code>(!canlaunch) {</code>

<code>22</code>

<code></code><code>continue</code><code>;</code>

<code>23</code>

<code>24</code>

<code></code><code>log.info(</code><code>"in tasklauncher, current free slots : "</code> <code>+ numfreeslots.get()+</code><code>" and trying to launch "</code><code>+tip.gettask().gettaskid() +</code><code>" which needs "</code> <code>+ task.getnumslotsrequired() +</code><code>" slots"</code><code>);</code>

<code>25</code>

<code></code><code>numfreeslots.set(numfreeslots.get() - task.getnumslotsrequired());</code><code>// 标记将满足该task的slot数已经分配</code>

<code>26</code>

<code></code><code>assert</code> <code>(numfreeslots.get() &gt;=</code><code>0</code><code>);</code>

<code>27</code>

<code>28</code>

<code>synchronized</code> <code>(tip) {</code>

<code>29</code>

<code></code><code>// 到这里已经获取到了满足运行task要求的空闲slot,但还要检查该tip状态是否指示为被kill了</code>

<code>30</code>

<code></code><code>if</code> <code>(!tip.canbelaunched()) {</code>

<code>31</code>

<code></code><code>log.info(</code><code>"not launching task "</code> <code>+ task.gettaskid() +</code><code>" as it got killed externally. task's state is "</code> <code>+ tip.getrunstate());</code>

<code>32</code>

<code></code><code>addfreeslots(task.getnumslotsrequired());</code><code>// 如果task状态tip标识不能启动,则释放slot</code>

<code>33</code>

<code>34</code>

<code>35</code>

<code></code><code>tip.slottaken =</code><code>true</code><code>;</code>

<code>36</code>

<code>37</code>

<code>38</code>

<code>startnewtask(tip);</code><code>// 获取到了满足task启动所需的空闲slot,开始启动task</code>

这样,当前tasktracker所在节点的资源状态,和task对应的tip状态都已经满足启动task的要求,可以启动一个task去运行。

调用startnewtask()方法,异步地启动了一个单独的线程去启动task,该方法如下所示:

<code>void</code> <code>startnewtask(</code><code>final</code> <code>taskinprogress tip)</code><code>throws</code> <code>interruptedexception {</code>

<code></code><code>thread launchthread =</code><code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>

<code></code><code>@override</code>

<code></code><code>public</code> <code>void</code> <code>run() {</code>

<code></code><code>try</code> <code>{</code>

<code></code><code>runningjob rjob = localizejob(tip);</code><code>// 在tasktracker节点上初始化job信息</code>

<code></code><code>tip.gettask().setjobfile(rjob.getlocalizedjobconf().tostring());</code>

<code></code><code>launchtaskforjob(tip,</code><code>new</code> <code>jobconf(rjob.getjobconf()), rjob);</code><code>// 启动task</code>

<code></code><code>}</code><code>catch</code> <code>(throwable e) {</code>

<code></code><code>... ...</code>

<code></code><code>});</code>

<code></code><code>launchthread.start();</code>

如果在一个tasktracker节点上运行的多个task都属于同一个job(一个tasktracker上运行的task按照job来分组,每一组task都属于同一个job),那么第一次初始化时,还没有建立一个task到job的映射关系,也就是说,在tasktracker端也要维护job的状态,以及属于该job的所有task的状态信息。比如,如果用户提交了一个kill掉job的请求,那么正在运行的属于该job的所有task都应该被kill掉。

上面代码中调用localizejob()方法,执行了如下处理:

创建一个runningjob对象,并加入到tasktracker维护的runningjobs队列(包含了jobid到runningjob的映射关系)中,同时将task对应的tip对象加入到runningjob所维护的tasks队列中。

一个job完成初始化,还需要将job相关的信息,如job配置信息从hdfs上下载到tasktracker所在节点本地,供该job的一组task运行共享。我们知道,在jobclient提交job时,会将相关资源拷贝到hdfs上的指定目录中,例如,在hdfs上的/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/目录下存储job相关的资源文件,拷贝到tasktracker本地目录下,例如/tmp/mapred/local/ttprivate/tasktracker/shirdrn/jobcache/job_200912121733_0002/目录。

调用taskcontroller的initializejob()方法初始化job所包含的相关资源信息,为属于该job的一组task所共享。

这里,taskcontroller使用的linuxtaskcontroller实现类,通过调用该方法,实际上构造了一个shell命令行,用来在tasktracker节点上初始化目录和拷贝相关资源,该命令行示例如下所示:

<code>/usr/</code><code>local</code><code>/java/bin/java -classpath .:/usr/</code><code>local</code><code>/java/lib/*.jar;/usr/</code><code>local</code><code>/java/jre/lib/*.jar -dhadoop.log.</code><code>dir</code><code>=/tmp/hadoop/logs -dhadoop.root.logger=info,console -djava.library.path= org.apache.hadoop.mapred.joblocalizer shirdrn job_200912121733_0002</code>

通过工具shellcommandexecutor来执行上述命令行,启动一个单独的jvm实例,完成job资源初始化,完成即退出。通过上述命令行可以看到,主要的初始化工作都在joblocalizer中完成的,需要传入2个参数:用户、jobid,在joblocalizer中创建了一个job所包含的各种资源,供task在tasktracker节点上运行共享,这些相关的目录或资源文件包括:

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/jobcache</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/jobcache/${jobid}/work</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/jobcache/${jobid}/jars</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/jobcache/${jobid}/jars/job.jar</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/jobcache/${jobid}/job.xml</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/jobcache/${jobid}/jobtoken</code>

<code>8</code>

<code>${mapred.</code><code>local</code><code>.</code><code>dir</code><code>}/tasktracker/${user}/distcache</code>

这样,在一个tasktracker节点上运行的一组task所共享的对应唯一job相关的资源已经满足,接下来就可以启动task了。

启动task的流程相对复杂一些,我们分几个阶段/要点来进行说明:

启动task准备

在startnewtask()方法中调用localizejob()方法,完成了job资源在tasktracker节点上的初始化,接着就可以调用launchtaskforjob()方法进入启动task的处理流程,代码如下所示:

<code>protected</code> <code>void</code> <code>launchtaskforjob(taskinprogress tip, jobconf jobconf,</code>

<code></code><code>runningjob rjob)</code><code>throws</code> <code>ioexception {</code>

<code></code><code>synchronized</code> <code>(tip) {</code>

<code></code><code>jobconf.set(jobconf.mapred_local_dir_property, localstorage.getdirsstring());</code>

<code></code><code>tip.setjobconf(jobconf);</code>

<code></code><code>tip.setugi(rjob.ugi);</code>

<code></code><code>tip.launchtask(rjob);</code><code>// 这里才是启动task的核心方法</code>

<code>9</code>

通过调用taskinprogress tip的launchtask()方法来启动task,我们看一下该方法实现代码:

<code>public</code> <code>synchronized</code> <code>void</code> <code>launchtask(runningjob rjob)</code><code>throws</code> <code>ioexception {</code>

<code></code><code>if</code> <code>(</code><code>this</code><code>.taskstatus.getrunstate() == taskstatus.state.unassigned ||</code>

<code></code><code>this</code><code>.taskstatus.getrunstate() == taskstatus.state.failed_unclean ||</code>

<code></code><code>this</code><code>.taskstatus.getrunstate() == taskstatus.state.killed_unclean) {</code>

<code></code><code>localizetask(task);</code>

<code></code><code>if</code> <code>(</code><code>this</code><code>.taskstatus.getrunstate() == taskstatus.state.unassigned) {</code><code>// 如果状态unassigned,则初始化完成后,将当前状态改为running</code>

<code></code><code>this</code><code>.taskstatus.setrunstate(taskstatus.state.running);</code>

<code></code><code>settaskrunner(task.createrunner(tasktracker.</code><code>this</code><code>,</code><code>this</code><code>, rjob));</code><code>// 启动一个taskrunner线程</code>

<code></code><code>this</code><code>.runner.start();</code>

<code></code><code>long</code> <code>now = system.currenttimemillis();</code>

<code></code><code>this</code><code>.taskstatus.setstarttime(now);</code>

<code></code><code>this</code><code>.lastprogressreport = now;</code>

<code></code><code>log.info(</code><code>"not launching task: "</code> <code>+ task.gettaskid() +</code><code>" since it's state is "</code> <code>+</code><code>this</code><code>.taskstatus.getrunstate());</code>

taskinprogress里面taskstatus维护了一个tip的状态,通过上述代码可以看出,一个task只有具备下面3个状态之一:unassigned、failed_unclean、killed_unclean,才能够被启动。

首先要进行task的初始化,调用localizetask()方法,如下所示:

<code>void</code> <code>localizetask(task task)</code><code>throws</code> <code>ioexception{</code>

<code></code><code>task.localizeconfiguration(localjobconf);</code>

<code></code><code>task.setconf(localjobconf);</code>

在这里,task可能是maptask,也可能是reducetask,所以调用task.localizeconfiguration()的初始化逻辑稍微有些不同,具体可以查看maptask和reducetask类实现。另外,对于不同类型的task,也会创建不同类型的taskrunner线程,分别对应于maptaskrunner和reducetaskrunner,实际所有task启动的相关逻辑都是在这2个taskrunner中实现的。

在taskrunner中,主要逻辑是在run()方法中实现的,其中在调用launchjvmandwait(setupcmds, vargs, stdout, stderr, logsize, workdir)之前,做了一些准备工作:

构建setupcmds:读取系统环境变量,或者hadoop设置的环境变量,ld_library_path、ld_library_path、user、shell、logname、home、hadoop_token_file_location、hadoop_root_logger、hadoop_client_opts、hadoop_client_opts,这些变量都是键值对的形式,最后会通过export在当前环境下导出这些变量配置

构建vargs:设置启动child vm的配置,读取mapred-site.xml配置文件中mapred.map.child.java.opts和mapred.reduce.child.java.opts的配置内容,最终会使用org.apache.hadoop.mapred.child创建一个jvm实例来启动task

目录文件设置:包括2个日志文件stdout和stderr,以及当前启动jvm所在的目录workdir

使用jvmmanager管理启动task相关数据

完成上述准备工作以后,调用launchjvmandwait()方法,创建child vm实例,如下所示:

<code>void</code> <code>launchjvmandwait(list &lt;string&gt; setup, vector&lt;string&gt; vargs, file stdout,</code>

<code></code><code>file stderr,</code><code>long</code> <code>logsize, file workdir)</code>

<code></code><code>throws</code> <code>interruptedexception, ioexception {</code>

<code></code><code>jvmmanager.launchjvm(</code><code>this</code><code>, jvmmanager.constructjvmenv(setup, vargs, stdout, stderr, logsize, workdir, conf));</code>

<code></code><code>synchronized</code> <code>(lock) {</code>

<code></code><code>while</code> <code>(!done) {</code>

<code></code><code>lock.wait();</code>

最终是通过jvmmanager来实现jvm实例的创建,下面是jvmmanager保存的一些数据结构,用来维护jvm相关数据的数据结构,如下图所示:

MapReduce V1:TaskTracker端启动Task流程分析

可以看到,一个jvmmanager对应2个jvmmanagerfortype,分别负责管理maptask和reducetask启动对应的child vm等数据,jvmmanager的构造方法,如下所示:

<code>public</code> <code>jvmmanager(tasktracker tracker) {</code>

<code></code><code>mapjvmmanager =</code><code>new</code> <code>jvmmanagerfortype(tracker.getmaxcurrentmaptasks(),</code><code>true</code><code>, tracker);</code>

<code></code><code>reducejvmmanager =</code><code>new</code> <code>jvmmanagerfortype(tracker.getmaxcurrentreducetasks(),</code><code>false</code><code>, tracker);</code>

上面调用了jvmmanager.launchjvm()方法,其中内部根据task类型,选择调用mapjvmmanager或reducejvmmanager的reapjvm()方法,如下所示:

<code>private</code> <code>synchronized</code> <code>void</code> <code>reapjvm(taskrunner t, jvmenv env)</code><code>throws</code> <code>ioexception, interruptedexception {</code>

<code></code><code>if</code> <code>(t.gettaskinprogress().waskilled()) {</code><code>// 检查当前准备启动的task是否已经被kill掉,如果是则直接返回</code>

<code></code><code>return</code><code>;</code>

<code></code><code>boolean</code> <code>spawnnewjvm =</code><code>false</code><code>;</code>

<code></code><code>jobid jobid = t.gettask().getjobid();</code>

<code></code><code>int</code> <code>numjvmsspawned = jvmidtorunner.size();</code>

<code></code><code>jvmrunner runnertokill =</code><code>null</code><code>;</code>

<code></code><code>// 检查是否存在空闲slot来启动task</code>

<code></code><code>if</code> <code>(numjvmsspawned &gt;= maxjvms) {</code><code>// 检查当前tasktracker上运行的某类task对应的jvm实例数是否大于全局设置允许的最大slot数</code>

<code></code><code>iterator&lt;map.entry&lt;jvmid, jvmrunner&gt;&gt; jvmiter = jvmidtorunner.entryset().iterator();</code>

<code></code><code>while</code> <code>(jvmiter.hasnext()) {</code><code>// 遍历当前存在的&lt;jvmid, jvmrunner&gt;队列,检查每个jvmrunner的状态</code>

<code></code><code>jvmrunner jvmrunner = jvmiter.next().getvalue();</code>

<code></code><code>jobid jid = jvmrunner.jvmid.getjobid();</code>

<code></code><code>if</code> <code>(jid.equals(jobid) &amp;&amp; !jvmrunner.isbusy() &amp;&amp; !jvmrunner.ranall()){</code><code>// 如果在当前要启动的task之前,已经有该task对应的job的其他task运行完成,则预留该jvm以重用</code>

<code></code><code>setrunningtaskforjvm(jvmrunner.jvmid, t);</code><code>// 将该jvmrunner映射到当前task</code>

<code></code><code>log.info(</code><code>"no new jvm spawned for jobid/taskid: "</code> <code>+ jobid+</code><code>"/"</code><code>+t.gettask().gettaskid() +</code><code>". attempting to reuse: "</code> <code>+ jvmrunner.jvmid);</code>

<code></code><code>//一个jvm实例需要kill掉,需要满足下面条件之一:</code>

<code></code><code>// (1) 如果属于当前要启动的task对应的job,该job对应的其他task都已经运行完成;</code>

<code></code><code>// (2) 如果不属于当前要启动的task所对应的job,那些job对应的task都已经运行完成。</code>

<code></code><code>if</code> <code>((jid.equals(jobid) &amp;&amp; jvmrunner.ranall()) || (!jid.equals(jobid) &amp;&amp; !jvmrunner.isbusy())) {</code>

<code></code><code>runnertokill = jvmrunner;</code>

<code></code><code>spawnnewjvm =</code><code>true</code><code>;</code>

<code></code><code>if</code> <code>(spawnnewjvm) {</code>

<code></code><code>if</code> <code>(runnertokill !=</code><code>null</code><code>) {</code>

<code></code><code>log.info(</code><code>"killing jvm: "</code> <code>+ runnertokill.jvmid);</code>

<code></code><code>killjvmrunner(runnertokill);</code><code>// kill掉该jvmrunner</code>

<code></code><code>spawnnewjvm(jobid, env, t);</code><code>// 创建一个新的jvm实例来启动该task</code>

<code>39</code>

<code>40</code>

<code></code><code>}</code><code>catch</code> <code>(exception e) {</code>

<code>41</code>

<code></code><code>log.fatal(e);</code>

<code>42</code>

<code></code><code>}</code><code>finally</code> <code>{</code>

<code>43</code>

<code></code><code>system.exit(-</code><code>1</code><code>);</code>

<code>44</code>

<code>45</code>

上面代码中,调用setrunningtaskforjvm()很关键,实际上把需要启动的task与jvmrunner建立映射关系,更新相应的内存数据结构(队列),如下所示:

<code>synchronized</code> <code>public</code> <code>void</code> <code>setrunningtaskforjvm(jvmid jvmid,</code>

<code></code><code>taskrunner t) {</code>

<code></code><code>jvmtorunningtask.put(jvmid, t);</code>

<code></code><code>runningtasktojvm.put(t,jvmid);</code>

<code></code><code>jvmidtorunner.get(jvmid).setbusy(</code><code>true</code><code>);</code><code>// 设置当前jvmrunner被占用,不允许释放该资源</code>

该方法,在spawnnewjvm()方法也调用了,spawnnewjvm()方法创建了一个新的jvm,代码如下所示:

<code>private</code> <code>void</code> <code>spawnnewjvm(jobid jobid, jvmenv env, taskrunner t) {</code>

<code></code><code>jvmrunner jvmrunner =</code><code>new</code> <code>jvmrunner(env, jobid, t.gettask());</code>

<code></code><code>jvmidtorunner.put(jvmrunner.jvmid, jvmrunner);</code>

<code></code><code>jvmrunner.setdaemon(</code><code>true</code><code>);</code>

<code></code><code>jvmrunner.setname(</code><code>"jvm runner "</code> <code>+ jvmrunner.jvmid +</code><code>" spawned."</code><code>);</code>

<code></code><code>setrunningtaskforjvm(jvmrunner.jvmid, t);</code><code>// 调用setrunningtaskforjvm()方法</code>

<code></code><code>log.info(jvmrunner.getname());</code>

<code></code><code>jvmrunner.start();</code><code>// 启动jvmrunner线程,用来启动child vm</code>

接下来,我们看一下jvmrunner线程类,该线程体run()方法中直接调用了runchild()方法,该方法实现代码,如下所示:

<code>public</code> <code>void</code> <code>runchild(jvmenv env)</code><code>throws</code> <code>ioexception, interruptedexception{</code>

<code></code><code>int</code> <code>exitcode =</code><code>0</code><code>;</code>

<code></code><code>env.vargs.add(integer.tostring(jvmid.getid()));</code>

<code></code><code>taskrunner runner = jvmtorunningtask.get(jvmid);</code><code>// 从队列jvmtorunningtask中取出taskrunner</code>

<code></code><code>if</code> <code>(runner !=</code><code>null</code><code>) {</code>

<code></code><code>task task = runner.gettask();</code>

<code></code><code>string user = task.getuser();</code>

<code></code><code>taskattemptid taskattemptid = task.gettaskid();</code>

<code></code><code>string taskattemptidstr = task.istaskcleanuptask() ? (taskattemptid.tostring() + tasktracker.task_cleanup_suffix) : taskattemptid.tostring();</code>

<code></code><code>exitcode = tracker.gettaskcontroller().launchtask(user, jvmid.jobid.tostring(), taskattemptidstr, env.setup, env.vargs, env.workdir, env.stdout.tostring(), env.stderr.tostring());</code><code>// 通过taskcontroller来启动task</code>

<code></code><code>}</code><code>catch</code> <code>(ioexception ioe) {</code>

<code></code><code>// do nothing</code>

<code></code><code>}</code><code>finally</code> <code>{</code><code>// handle the exit code</code>

<code></code><code>// although the process has exited before we get here,</code>

<code></code><code>// make sure the entire process group has also been killed.</code>

<code></code><code>kill();</code><code>// task运行完成,kill掉运行task的child vm实例</code>

<code></code><code>updateonjvmexit(jvmid, exitcode);</code>

<code></code><code>log.info(</code><code>"jvm : "</code> <code>+ jvmid +</code><code>" exited with exit code "</code> <code>+ exitcode +</code><code>". number of tasks it ran: "</code> <code>+ numtasksran);</code>

<code></code><code>deleteworkdir(tracker, firsttask);</code><code>// 清理临时目录</code>

在jvmrunner线程类中,其中委托taskcontroller来控制task的实际启动。

使用taskcontroller控制启动child vm

下面,我们看taskcontroller启动task的实现方法launchtask(),代码如下所示:

<code>@override</code>

<code>public</code> <code>int</code> <code>launchtask(string user, string jobid, string attemptid, list&lt;string&gt; setup, list&lt;string&gt; jvmarguments,</code>

<code></code><code>file currentworkdirectory, string stdout, string stderr)</code><code>throws</code> <code>ioexception {</code>

<code></code><code>shellcommandexecutor shexec =</code><code>null</code><code>;</code>

<code></code><code>filesystem rawfs = filesystem.getlocal(getconf()).getraw();</code>

<code></code><code>long</code> <code>logsize =</code><code>0</code><code>;</code><code>//todo mapreduce-1100</code>

<code></code><code>string cmdline = tasklog.buildcommandline(setup, jvmarguments,</code><code>new</code> <code>file(stdout),</code><code>new</code> <code>file(stderr), logsize,</code><code>true</code><code>);</code>

<code></code><code>// 将上述命令行写入本地缓存的目录中</code>

<code></code><code>path p =</code><code>new</code> <code>path(allocator.getlocalpathforwrite( tasktracker.getprivatedirtaskscriptlocation(user, jobid, attemptid), getconf()), command_file);</code>

<code></code><code>string commandfile = writecommand(cmdline, rawfs, p);</code>

<code></code><code>string[] command =</code>

<code></code><code>new</code> <code>string[]{taskcontrollerexe,</code>

<code></code><code>user,</code>

<code></code><code>localstorage.getdirsstring(),</code>

<code></code><code>integer.tostring(commands.launch_task_jvm.getvalue()),</code>

<code></code><code>jobid,</code>

<code></code><code>attemptid,</code>

<code></code><code>currentworkdirectory.tostring(),</code>

<code></code><code>commandfile};</code>

<code></code><code>shexec =</code><code>new</code> <code>shellcommandexecutor(command);</code>

<code></code><code>if</code> <code>(log.isdebugenabled()) {</code>

<code></code><code>log.debug(</code><code>"launchtask: "</code> <code>+ arrays.tostring(command));</code>

<code></code><code>shexec.execute();</code><code>// 执行启动task的命令</code>

<code></code><code>if</code> <code>(shexec ==</code><code>null</code><code>) {</code>

<code></code><code>return</code> <code>-</code><code>1</code><code>;</code>

<code></code><code>int</code> <code>exitcode = shexec.getexitcode();</code>

<code></code><code>log.warn(</code><code>"exit code from task is : "</code> <code>+ exitcode);</code>

<code></code><code>// 143 (sigterm) and 137 (sigkill) exit codes means the task was</code>

<code></code><code>// terminated/killed forcefully. in all other cases, log the</code>

<code></code><code>// task-controller output</code>

<code></code><code>if</code> <code>(exitcode !=</code><code>143</code> <code>&amp;&amp; exitcode !=</code><code>137</code><code>) {</code>

<code></code><code>log.warn(</code><code>"exception thrown while launching task jvm : "</code> <code>+ stringutils.stringifyexception(e));</code>

<code></code><code>log.info(</code><code>"output from linuxtaskcontroller's launchtaskjvm follows:"</code><code>);</code>

<code></code><code>logoutput(shexec.getoutput());</code>

<code></code><code>return</code> <code>exitcode;</code>

<code></code><code>log.debug(</code><code>"output from linuxtaskcontroller's launchtask follows:"</code><code>);</code>

<code>46</code>

<code>47</code>

<code></code><code>return</code> <code>0</code><code>;</code>

<code>48</code>

将构造好的启动child的命令行写入到本地目录下的文件中,该脚本文件的绝对路径,示例如下所示:

<code>/tmp/mapred/local/ttprivate/tasktracker/shirdrn/jobcache/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/taskjvm.sh</code>

在taskcontroller(实际上是linuxtaskcontroller)的launchtask()方法中,使用shellcommandexecutor工具执行的命令行,类似如下这样:

<code>/usr/</code><code>local</code><code>/hadoop/bin/task-controller shirdrn /tmp/mapred/</code><code>local</code> <code>1 job_200912121733_0002 attempt_200912121733_0002_m_000005_0 /tmp/mapred/</code><code>local</code><code>/ttprivate/tasktracker/shirdrn/jobcache/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/taskjvm.sh</code>

在taskjvm.sh脚本中的内容,才是真正启动child vm的命令行,示例如下所示:

<code>/usr/</code><code>local</code><code>/bin/java -xmx 512m -verbose:gc -xloggc:/tmp/attempt_200912121733_0002_m_000005_0.gc -dcom.sun.management.jmxremote.authenticate=</code><code>false</code> <code>-dcom.sun.management.jmxremote.ssl=</code><code>false</code> <code>-djava.library.path= -djava.io.tmpdir= -classpath .:/usr/</code><code>local</code><code>/java/lib/*.jar:/usr/</code><code>local</code><code>/java/jre/lib/*.jar -dlog4j.configuration=task-log4j.properties -dhadoop.log.</code><code>dir</code><code>=/tmp/hadoop/logs -dhadoop.root.logger=info,tla -dhadoop.tasklog.taskid=attempt_200912121733_0002_m_000005_0 -dhadoop.tasklog.iscleanup=</code><code>false</code> <code>-dhadoop.tasklog.totallogfilesize= org.apache.hadoop.mapred.child 127.0.0.1 0 attempt_200912121733_0002_m_000005_0 /tmp/hadoop/logs/userlogs/job_200912121733_0002/attempt_200912121733_0002_m_000005_0/ 2134</code>

至此,一个task通过child vm的加载已经启动,就可以运行一个task了,我们后续再详细介绍。