天天看點

MapReduce V1:TaskTracker設計要點概要分析

我們基于hadoop 1.2.1源碼分析mapreduce v1的處理流程。

本文不打算深入地詳細分析tasktracker某個具體的處理流程,而是概要地分析tasktracker在mapreduce架構中的主要負責處理那些事情,是我們能夠在宏觀上了解tasktracker端都做了哪些工作。我盡量将tasktracker端的全部要點内容提出來,但是涉及到詳細的分析,隻是點到為止,後續會對相應子產品的處理流程結合代碼進行分析。

tasktracker主要負責mapreduce計算叢集中task運作的管理,是以tasktracker要管理的事情比較多。一個mapreduce job由很多的task組成,而一個job的所有task被分成幾個相斥的子集,每個子集被配置設定到某一個tasktracker上去運作,是以一個tasktracker管理運作了一個job的所有task的一個子集,也就是說tasktracker不僅要維護每個job對應的一個task的子集,還要維護這些task所屬的job的運作狀态,對于job/task的狀态的管理都是與jobtracker通過rpc通信保持狀态的同步。

下面是tasktracker端的主要元件,如下圖所示:

MapReduce V1:TaskTracker設計要點概要分析

為了了解tasktracker中各個元件都負責處理哪些工作,我們通過下表來簡要地說明各個元件的功能,如下表所示:

<b>元件名稱</b>

<b>元件功能</b>

localfs: filesystem

tasktracker本地檔案系統,用來管理本地檔案和目錄

systemfs: filesystem

hdfs分布式檔案系統,可以通路hdfs,用來檢索job/task對應的資源檔案等。

trackerdistributedcachemanager

trackerdistributedcachemanager負責跨job的緩存的管理,每個job會對應一個taskdistributedcachemanager執行個體。比如,每次tasktracker被配置設定執行一個job的一組task,此時需要将該job對應的資源檔案和split相關資料從hdfs下載下傳到tasktracker本地,這些檔案都需要進行管理,包括位置查詢、檔案通路、檔案清理等。

tasktrackerinstrumentation

用來管理tasktracker上運作的一些task的監控資料,主要是采集某些點的資料,如task完成時、task失敗時、task逾時時等,目前該元件中都是空實作。

indexcache

map階段需要輸出臨時檔案,要對maptask的輸出寫入tasktracker本地檔案系統,需要對這些輸出資料進行分區(partition),indexcache負責管理分區檔案的相關資訊。

userlogmanager

負責管理tasktracker節點上執行task輸出的日志資訊,目前通過userlogevent定義了jvm_finished、job_started,、job_completed,、delete_job這4種事件,通過userlogmanager可以實作日志記錄的輸出。

aclsmanager

用來控制mapreduce管理者管理job和queue級别操作的通路權限。

nodehealthcheckerservice

用來檢測節點之間的心跳服務。

resourcecalculatorplugin

用來計算系統的資源的插件,預設使用的是linuxresourcecalculatorplugin實作,可以友善地通路系統中的資源資訊狀态,如記憶體、cpu。

jvmmanager

為了保證tasktracker與實際task(maptask/reducetask)運作的隔離性,會将task在單獨的jvm執行個體中運作,jvmmanager用來管理task運作所在的jvm執行個體的資訊,包括建立/銷毀jvm執行個體等操作。

localstorage

管理tasktracker本地檔案系統的存儲目錄資訊,如通路本地目錄失敗、檢測目錄可用性等。

localdirallocator

管理tasktracker本地目錄配置設定,初始化localdirallocator基于配置mapred.local.dir指定的目錄,它采用的round-robin方式,在task運作之前需要寫一個啟動task的腳本檔案,使用localdirallocator來控制對應檔案的讀寫。

jettybugmonitor

在map階段輸出中間結果,reduce階段會基于http協定(基于jetty)來拷貝屬于自己的分區,為了解決jetty已知的一些類存在的bug,它們可能會影響tasktracker,通過檢測jetty所在jvm執行個體使用cpu量,當超過配置的值時終止tasktracker程序。

mapoutputservlet

tasktracker上啟動一個jetty容器,該servlet用來負責暴露http接口,供其它運作reducetask的tasktracker拉取map輸出檔案。

jobclient: intertrackerprotocol

與jobtracker進行rpc通信的代理(proxy)對象。

taskreportserver: server

tasktracker節點上啟動的rpc server,在其上運作的task,在運作過程中會向tasktracker彙報狀态,使tasktracker知道task的運作狀态報告。

cleanupqueue

負責清理job或task運作完成後遺留下的一些不再使用的檔案或目錄。

tasktrackerstatus

維護tasktracker目前的狀态資訊,主要包括:tasktracker的配置資訊、tasktracker上資源狀态資訊、tasktracker上運作的task的狀态報告資訊。

jobtokensecretmanager

用來管理job運作的令牌相關資訊。

shuffleserverinstrumentation

管理job運作過程中,shuffle階段的監控資料,包括一組計數器:serverhandlerbusy、outputbytes、failedoutputs、successoutputs、exceptionscaught。

taskcontroller

用來管理task的初始化、完成、清理工作,還負責啟動和終止task運作所在的jvm執行個體。

httpserver

用來處理map輸出的jetty容器,其中mapoutputservlet會注冊到該http server中。

shuffleexceptiontracker

跟蹤shuffle階段出現異常情況的資訊。

mapeventsfetcherthread

跟蹤每個運作的job對應的reducetask的shuffle階段,如果有map完成,會對應着taskcompletionevent觸發該線程,從已經完成的map所在節點拷貝map輸出的中間結果資料,為reducetask運作做準備。

reducetasklauncher

啟動reducetask。

maptasklauncher

啟動maptask。

taskcleanupthread

負責清理job/task執行完成後遺留的檔案或目錄。

taskmemorymanagerthread

管理在該tasktracker上運作的task使用記憶體的資訊。

通過上表,我們可以了解到tasktracker端各個元件的基本功能,也稍微了解到元件之間的一些關系。下面,我們從tasktracker抽象層次的視角,來分析元件之間的關系和互動,概要地描述一些主要的處理流程:

tasktracker處理心跳響應

mapreduce job恢複運作

task隔離運作

啟動maptask過程

啟動reducetask過程

下面,我們分别分析上述列舉的5個處理流程:

tasktracker周期性地向jobtracker發送心跳報告,将tasktracker上運作的task的狀态資訊、節點資源資訊、節點健康狀況資訊封裝到tasktrackerstatus對象中,通過rpc調用heartbeat将心跳發送到jobtracker端,并傳回heartbeatresponse,其中心跳響應對象中包包含了jobtracker配置設定的任務,通過taskaction這種指令(包括:launchtaskaction/killtaskaction/committaskaction)清單的方式進行指派。tasktracker解析rpc調用傳回的心跳響應,根據taskaction指令清單,執行具體的操作。

tasktracker處理心跳響應的流程,如下序列圖所示:

MapReduce V1:TaskTracker設計要點概要分析

tasktracker收到心跳響應,首先會檢查是否存在需要恢複的job,如果存在,則會檢查要進行恢複的job的狀态,進而将需要進行恢複的job對應的task加入到恢複隊列中,等待排程運作。

接着,tasktracker會檢查taskaction指令的類型,根據其實際類型,執行對應的處理流程:

如果是launchtaskaction,則啟動task

如果是killtaskaction,則殺掉task,修改tasktracker維護的job及task狀态,并清理臨時資料

如果是killjobaction,則殺掉該job,說明該job已經完成(成功/失敗),修改tasktracker維護的job及task狀态,并清理臨時資料

如果是committaskaction,則說明對應的該task已經執行完成,修改tasktracker維護的task即job狀态,最後還要清理臨時資料

由于某些重要的處理流程,如啟動一個task的詳細流程,我們會在後續單獨寫幾篇文章,用更加合适的方式來詳細分析。

這裡,我們介紹一下mapreduce計算中是如何實作job的恢複的,包括jobtracker端和tasktracker端之間的簡單互動流程。

jobtracker存在一個系統目錄(system directory),預設值為/tmp/hadoop/mapred/system,也可以根據配置項mapred.system.dir指定該值。當jobclient送出一個job到jobtracker時,jobtracker會首先将該job的資訊寫入到jobtracker的系統目錄下,每個job對應一個以job id為名稱的子目錄,以便jobtracker因為重新開機,能夠恢複這些job的運作。我們可以看一下jobtracker中submitjob方法中儲存job資訊的實作,代碼如下所示:

<code>01</code>

<code>// store the job-info in a file so that the job can be recovered</code>

<code>02</code>

<code>// later (if at all)</code>

<code>03</code>

<code>// note: jobdir &amp; jobinfo are owned by jt user since we are using</code>

<code>04</code>

<code>// his fs object</code>

<code>05</code>

<code>if</code> <code>(!recovered) {</code>

<code>06</code>

<code></code><code>path jobdir = getsystemdirectoryforjob(jobid);</code><code>// job目錄,例如:/tmp/hadoop/mapred/system/job_200912121733_0002</code>

<code>07</code>

<code></code><code>filesystem.mkdirs(fs, jobdir,</code><code>new</code> <code>fspermission(system_dir_permission));</code>

<code>08</code>

<code></code><code>fsdataoutputstream out = fs.create(getsystemfileforjob(jobid));</code><code>// job檔案,例如:/tmp/hadoop/mapred/system/job_200912121733_0002/job-info</code>

<code>09</code>

<code></code><code>jobinfo.write(out);</code><code>// 将jobinfo結構對應的資料寫入到job檔案</code>

<code>10</code>

<code></code><code>out.close();</code>

<code>11</code>

<code>}</code>

上面代碼中,jobinfo主要包含了jobid資訊、使用者名稱、job送出目錄(例如,/tmp/hadoop/mapred/staging/shirdrn/.staging/job_200912121733_0002/ ,該目錄是在jobclient送出job時在hdfs上建立的,用于将該job所需要的資源都拷貝到該job對應的送出目錄下面,便于後續jobtracker能夠讀取這些資料)。

如果jobtracker因為某些原因重新啟動了,那麼在jobtracker重新開機之後,需要從jobtracker的系統目中讀取這些job的資訊,以便能夠恢複這些尚未完成的job的運作,并以heartbeatresponse的結構,在tasktracker發送heartbeat的時候響應給tasktracker,tasktracker解析響應資料,然後去恢複這些job的運作。

上面的序列圖中,我們可以看到,當tasktracker發送heartbeat并收到響應後,從heartbeatresponse中解析取出需要recovered的job,并進行處理,代碼如下所示:

<code>// check if the map-event list needs purging</code>

<code>set&lt;jobid&gt; jobs = heartbeatresponse.getrecoveredjobs();</code><code>// 擷取到需要recovered的job清單</code>

<code>if</code> <code>(jobs.size() &gt;</code><code>0</code><code>) {</code>

<code></code><code>synchronized</code> <code>(</code><code>this</code><code>) {</code>

<code></code><code>// purge the local map events list</code>

<code></code><code>for</code> <code>(jobid job : jobs) {</code>

<code></code><code>runningjob rjob;</code>

<code></code><code>synchronized</code> <code>(runningjobs) {</code><code>// tasktracker維護的目前在其上運作的job清單</code>

<code></code><code>rjob = runningjobs.get(job);</code>

<code></code><code>if</code> <code>(rjob !=</code><code>null</code><code>) {</code>

<code></code><code>synchronized</code> <code>(rjob) {</code>

<code>12</code>

<code></code><code>fetchstatus f = rjob.getfetchstatus();</code>

<code>13</code>

<code></code><code>if</code> <code>(f !=</code><code>null</code><code>) {</code>

<code>14</code>

<code></code><code>f.reset();</code>

<code>15</code>

<code></code><code>}</code>

<code>16</code>

<code>17</code>

<code>18</code>

<code>19</code>

<code>20</code>

<code>21</code>

<code></code><code>// mark the reducers in shuffle for rollback</code>

<code>22</code>

<code></code><code>synchronized</code> <code>(shouldreset) {</code>

<code>23</code>

<code></code><code>for</code> <code>(map.entry&lt;taskattemptid, taskinprogress&gt; entry</code>

<code>24</code>

<code></code><code>: runningtasks.entryset()) {</code>

<code>25</code>

<code></code><code>if</code> <code>(entry.getvalue().getstatus().getphase() == phase.shuffle) {</code>

<code>26</code>

<code></code><code>this</code><code>.shouldreset.add(entry.getkey());</code><code>// 将處于shuffle階段的task放到shouldreset集合中</code>

<code>27</code>

<code>28</code>

<code>29</code>

<code>30</code>

<code>31</code>

通過上面的代碼我們可以看到,當jobtracker重新開機的時候,已經在tasktracker上運作的屬于某些job的task可能無法立即感覺到,對應的job仍然存在于tasktracker的runningjobs集合中。在jobtracker重新開機之後,tasktracker所發送的第一個heartbeat傳回的響應資料中,應該會存在需要recovered的job清單,是以這時在tasktracker端隻需要從runningjobs中取出需要recovered的job,并檢視其是否存在fetch狀态,如果存在,應該重新設定狀态(主要對應于mapeventsfetcherthread 維護的taskcompletionevent清單,觸發reducetask拉取maptask的輸出中間結果),以便該job的各個task恢複運作。如果該reducetask正在運作于shuffle階段,需要将對應的job的maptask的輸出拷貝到該reducetask所在的節點上,通過調用fetchstatus的reset方法重置狀态,這樣就重新恢複了reducetask的運作。

由于mapreduce使用者程式包含使用者代碼,可能會存在bug,為了不因為使用者代碼存在的bug影響tasktracker服務,是以mapreduce采用了隔離task運作的方式來運作maptask/reducetask。在運作task時,會單獨建立一個獨立的jvm執行個體,讓task的代碼再該jvm執行個體中加載運作,tasktracker需要跟蹤該jvm執行個體中運作的task的狀态。在tasktracker端,加載一個運作task的jvm執行個體,是通過org.apache.hadoop.mapred.child類來實作。下面,我們看一下child類是如何實作task加載運作的,如下面序列圖所示:

MapReduce V1:TaskTracker設計要點概要分析

child類包含一個入口主方法main,在運作的時候需要傳遞對應的參數,來運作maptask和reducetask,通過上面序列圖我們可以看出,指令行輸入如下5個參數:

host:表示tasktracker節點的主機名稱

port:表示tasktracker節點rpc端口号

taskid:表示啟動的task對應的taskattemptid,辨別一個task的一個運作執行個體

log location:表示該task運作執行個體對應的日志檔案的路徑

jvm id:表示該task執行個體對應的jvmid資訊,包括jobid、task類型(maptask/reducetask)、jvm編号(辨別該jvm執行個體對應的id)

有了上述參數,就可以擷取到一個task運作所需要的全部資源,如一個task處理哪一個split,一個task對應的job配置資訊,還可以友善tasktracker監控該task執行個體所在的jvm的狀态。該child建立時,會建立一個到tasktracker的rpc代理對象,通過該rpc連接配接向tasktracker彙報task執行進度及其狀态資訊。然後,一切運作task的基本條件都已經具備,接下來從該task對應的job的代碼(job.jar)開始加載任務處理類,如果是maptask則執行maptask運作的處理流程,如果是reducetask則執行reducetask的處理流程,最後,斷開task彙報狀态的rpc連接配接,task運作結束。

在child類中加載啟動task,如果是maptask,則執行maptask對應的處理流程,如下序列圖所示:

MapReduce V1:TaskTracker設計要點概要分析

啟動一個maptask運作,包含4個階段,我們通過運作各個階段的方法來表示:

runjobcleanuptask():清理job對應的相關目錄和檔案

runjobsetuptask():建立job運作所需要的相關目錄和檔案

runtaskcleanuptask():清理一個task對應的工作目錄下與task相關的目錄或檔案

runnewmapper()/runoldmapper():調用使用者編寫的mapreduce程式中的mapper中的處理邏輯

在maptask運作過程中, 如果階段或者狀态發生變化,要與tasktracker進行通信,彙報狀态,并更新tasktracker維護的關于task和job對應的狀态資料。最後,task運作完成,也要通知tasktracker。

在child類中加載啟動task,如果是reducetask,則執行reducetask對應的處理流程,如下序列圖所示:

MapReduce V1:TaskTracker設計要點概要分析

啟動一個reducetask運作,與maptask的處理流程有很大的不同,它包含3個階段,如下所示:

copy階段:從運作maptask的tasktracker節點,拷貝屬于該reducetask對應的job所包含的maptask的輸出中間結果資料,這些資料存儲在該reduce所在tasktracker的本地檔案系統(可能會放在記憶體中),為後續階段準備資料

sort階段:對從maptask拉取過來的資料進行合并、排序

reduce階段:調用使用者編寫的mapreduce程式中的reducer中的處理邏輯

執行reduce階段,比maptask複雜的多。在reducetask運作過程中,也會周期性地與tasktracker通信,彙報task運作進度和狀态,以保證與tasktracker所維護的task的狀态資料同步。當reducetask完成後,如果有輸出的話,最終的結果資料會輸出到hdfs中儲存。