天天看點

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

前言

最近一直在做項目之餘研究MapReduce的資料,自己剛開始幾天嘗試直接讀源碼進行分析,但是嘗試了幾天之後發現非常的艱難,因為整個代碼還是比較多的,如果直接讀很容易抓不住重點,被淹沒了在了海一般的代碼中。是以又開始找相關的資料。比較幸運的是,再看了很多個資料之後,發現了下面這本我認為是目前自己看到的寫的最清晰的一本關于MapReduce的書。這本書的作者目前是Hulu的架構師,如果大家對MapReduce感興趣的話,強烈推薦一下。需要說明的是,這本書成書時間比較早,當時還處于Hadoop2.x出來沒多久的時候,是以這本書中分析的代碼還主要是以1.x版本為主。但我覺得這個不影響對整個MR架構和設計的了解,如果想要對着源碼去學習的同學,可以去下載下傳下來代碼。作為參考,我個人是對着hadoop-1.2.1版本的源碼來閱讀這本書的。

《Hadoop技術内幕:深入解析MapReduce架構設計與實作原理》(董西成)電子書下載下傳、線上閱讀、内容簡介、評論 - 京東電子書頻道​e.jd.com

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

MapReduce代碼太多了,輪廓性質的内容我覺得這本書寫的已經很不錯了,感興趣可以直接讀。這篇專欄的系列文章更傾向于從一個個小問題入手,把每個小問題說明白,并希望借此能對整個MapReduce的部分懂得更加明白。

我們都知道MapReduce是計算向資料移動的, 任務配置設定時是會考慮資料本地性的(locality)。MR把使用者送出得Job檔案劃分成了很多個部分(splits),每一個資料塊(split)對應一個Map任務(Task),然後MR會把這些Map任務配置設定給叢集中的機器進行執行。所謂計算向資料移動就是我們盡可能的把Map任務配置設定到該任務所需要處理的資料塊(split)所在的機器節點上,這樣該任務執行時就可以盡可能地從本地進行資料讀取,進而避免了跨機器地資料傳輸。那麼本篇文章,我們就從使用者送出任務,一直到JobTracker排程任務來介紹MapReduce是如何實作計算向資料遷移的。

同時,我們需要注意的是,計算本地性考慮隻是針對Map任務的,Reduce任務排程不需要考慮資料的本地性。

MR1.0核心元件介紹

由于接下來會可能涉及到許多的概念,是以先在這裡把MR1.0的整體架構和一些核心概念介紹下。

1.0的架構中三個核心元件為

JobTracker

TaskTracker

TaskScheduler

JobTracker

是整個MR的核心元件,使用者會向

JobTracker

送出作業(Job),

JobTracker

會将使用者的作業(Job)分解成一個個任務(Task)。

TaskTracker

部署運作在叢集中的機器節點上,并定時向

JobTracker

通過心跳資訊彙報自己目前的情況,包括自己所在節點的資源資訊和自己所負責的所有任務的運作狀态資訊。正常情況下,一般是一個機器上部署運作一個TaskTracker。除了彙報資訊之外,

TaskTracker

也會在自己還有空餘資源來運作任務的時候,向

JobTracker

請求新的任務。當

JobTracker

接收到請求之後,會借助

TaskScheduler

來為該

TaskTracker

排程指派任務。

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

除了分解作業、處理TaskTracker的心跳資訊之外,JobTracker很重要的工作就是監控、維護各種狀态。包括每一個作業的狀态,每一個任務的狀态,每一個TaskTracker的狀态。在JobTracker中,每一個作業的所有狀态資訊用

JobInProgress

類來表示(JIP), 每一個任務的所有狀态資訊用

TaskInProgress

來表示(TIP)。顯然,一個JIP裡面應該包含很多個TIP,因為一個作業裡面包含很多個任務。同時為了更好的容錯性,Hadoop種引入了

TaskAttempt

(TA)來表示每一個任務的執行示例。

TaskInProgress

TaskAttempt的關系可以了解為程式和程序的關系。

即一個

TaskInProgress

對象可以對應多個

TaskAttempt

對象。每啟動一個執行該Task的執行個體,就會多一個

TaskAttempt

。不管其中哪一個

TaskAttempt

先執行完,該Task被認為執行結束。

實際上真正在機器上運作的是TaskAttempt

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

那麼有了上面的知識,我們就知道在Hadoop中

一台機器上

機器節點-TaskTracker-Task-Job的對應關系了,

我們用Node-TaskTracker-TA-TIP-JIP來進行表示。

而在JobTracker中也用了很多的Map結構體來儲存這樣的對應關系,用于快速查詢。

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

MapReduce整體任務排程介紹

接下來我們進入真正的任務排程部分。首先我們需要了解Hadoop中機器節點的三層拓撲結構表示:資料中心-機架-節點。比如下圖中的H1節點,其拓撲結構位址為/D1/R1/H1,

即1号資料中心的1号機架中的1号節點

。同時下圖中的拓撲結構為樹結構,

R1為H1節點的父節點。

一般而言我們都使用機架-機器兩層架構,任務在同一個資料中心内進行配置設定、排程執行。

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

Hadoop根據輸入資料與實際配置設定的計算資源之間的距離将任務分成三類:

node-local

(輸入資料與計算資源同節點),

rack-local

(同機架)和

off-switch

(跨機架)。當輸入資料與計算資源位于不同節點上時,Hadoop需将輸入資料遠端複制到計算資源所在節點進行處理。兩者距離越遠,需要的網絡開銷越大,是以排程器進行任務配置設定時盡量選擇離輸入資料近的節點資源。任務選擇的優先級從高到低依次為:

node-local

rack-local

off-switch

我們以上面的拓撲圖為例說明三種排程優先級。假設某一時刻,TaskTracker X出現空閑的計算資源,向JobTracker彙報心跳請求新的任務,排程器根據一定的排程政策為之選擇了任務Y。

  • 場景1 如果X是H1,任務Y輸入資料塊為b1,則該任務為node-local。
  • 場景2 如果X是H1,任務Y輸入資料塊為b2,則該任務為rack-local。
  • 場景3 如果X是H1,任務Y輸入資料塊為b4,則該任務為off-switch。

接下來我們從一個作業送出到排程執行的流程來介紹任務排程是如何考慮本地性的。

1、作業送出時的Split生成

在Hadoop中,使用者送出任務之前會先在本地進行資料的邏輯切分成一個個的Split。我們以Hadoop自帶的FileSplit為例。一個FileSplit包含的資訊就是檔案名(

file

)、split資料在整個檔案中的開始位置的偏移(

start

), split資料的長度(

length

), split所在的機器節點清單(

hosts

)。

public 
           

一個可能的FileSplit例子如下。

這裡面hosts資訊的确定對後續的計算本地性實作至關重要。
file 
           

我們提過hosts是該split所在的機器節點位置資訊,但是事實上Split是可能跨節點的。比如HDFS中的block大小是64M,而該Split大小是128M, 那麼該Split是一定會跨block的。跨block也就大機率會跨機器的,那麼這個時候hosts清單該是哪些節點資訊呢?Hadoop選擇的政策是

選擇包含該Split資料量最大的前K個節點列入hosts資訊中。

那麼在到時候配置設定該split對應的任務的時候,Hadoop更傾向于配置設定給hosts中包含的節點來執行,這樣可以盡可能的減少資料地網絡傳輸。

為此,FileInputFormat設計了一個簡單有效的啟發式算法:

首先按照rack包含的資料量對rack進行排序,然後在rack内部按照每個node包含的資料量對node排序,最後取前N個node的host作為InputSplit的host清單,這裡的N為block副本數。

這樣,當任務排程器排程Task時,隻要将Task排程給位于host清單的節點,就認為該Task滿足本地性。

舉個例子

, 某個Hadoop叢集的網絡拓撲結構如圖所示,HDFS中block副本數為3,某個InputSplit包含3個block,大小依次是100、150和75,很容易計算,4個rack包含的(該InputSplit的)資料量分别是175、250、150和75。rack2中的node3和node4,rack1中的node1将被添加到該InputSplit的host清單中。

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

具體Hadoop中對于一個Split的hosts資訊确定主要借助的就是下面的NodeInfo結構體。其中

value

就代表該節點包含該split的資料的總量。前面提過,節點拓撲結構構成一個樹結構,

Hadoop中父節點所包含的split的資料總量為所有子節點包含的Split資料量總和。

統計完每個節點的包含該Split的資料總量之後,我們先對所有的機架節點進行排序,然後對包含資料量最多的機架節點的所有葉子節點(leaves)按照其包含Split的資料量進行排序,最後取出排序最靠前的N個節點。具體的代碼比較長,就不貼在這裡了,如果要去讀,就去讀

FileInputFormat.java
// FileInputFormat.java
           

2. 作業分解——任務生成

當使用者把作業送出給JobTracker之後,JobTracker會根據使用者提供的所有split資訊來預先生成所有需要執行任務,每個split對應生成一個Map任務。在Hadoop中,所有的任務都用TaskInProgress類來表示。根據上述過程中生成的split資訊,Hadoop用一個

Map<Node, List<TaskInProgress>>

資料結構體來儲存對于每一個Node來說符合計算本地性的所有任務。

比如對應下面所述的一個split,就會在三個節點的任務清單中都加入一個該split對應的Map 任務。

file 
           

同時,拓撲結構中機架節點的任務清單為其中所有的處于該機架的機器節點符合計算本地性的任務清單的并集。這樣,當一個任務沒辦法配置設定給最适合他的節點之後,Hadoop傾向于把任務配置設定給和該節點處于相同機架的其他節點。

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

有了上面的Map<Node, List<TaskInProgress>>,當一個機器節點向JobTracker請求任務的時候,JobTracker可以通過查詢該構體查找該節點符合計算本地性的所有任務清單,如果适合該節點的所有任務已經執行完了,那麼JobTracker就會從其父節點——機架節點擷取所有的任務,然後将其中未執行的任務配置設定該機器節點。

該結構體建立的最核心的兩個函數為initTasks()和createCache()函數

// JobTracker.java
           

3、任務排程

上述的作業分解完成後,當TaskTracker來請求任務時,JobTracker就會排程指派任務。我們以下圖所示的例子來介紹任務排程的過程:

當運作在

Node1

上的TaskTracker向JobTracker請求任務的時候,JobTracker會查詢步驟2中所建構的Map結構體,從Node1中的符合計算本地性的任務清單中選擇一個任務配置設定給該TaskTracker,如果配置設定成功,此時該任務的本地性級别為

node-local

但如果該任務清單中沒有任務可以被排程到Node1上時,JobTracker會查詢Node1的父節點,即Rack節點。從Rack節點中的符合計算本地性的任務清單中選擇一個任務配置設定給Node1上的TaskTracker,這時配置設定的任務的本地性級别就為

rack-local。
mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

上面的描述是一個很粗糙的圖景描述,目的是為了讓大家明白整個任務排程的全貌。知道如何按照本地性排程任務的。

接下來重點介紹下任務排程的細節。

作業排程主要分為下圖所示的兩層架構:

作業排程和任務排程

。TaskSchduler主要決定Job之間的排程順序,比如按照FIFO順序。

  • JobTracker通過調用 assignTasks(TaskTracker) 來擷取為該TaskTracker配置設定的新的任務。
  • TaskTracker首先會計算該TaskTracker所在的節點還能夠執行幾個Map任務和幾個Reduce任務。在1.0版本的Hadoop中,每個節點的計算資源被均勻劃分成了一個個的槽(slot), 一般情況下一個Map任務占據一個Map slot, 一個Reduce任務占據一個Reduce slot。計算能配置設定給該TaskSchduler的任務數目取決于該taskSchduler總共的任務槽數和現有正在執行的任務數目。二者的內插補點便是我們還能向該TaskSchduler配置設定的任務最大數目。

比如我們現在計算得到請求的TaskTracker還能夠執行兩個Map任務,和一個Reduce任務。那麼TaskSchduler會按照一定的順序去周遊詢問每個作業(Job)。常用的一個周遊順序就是FIFO,即先到的作業更容易被第一個周遊詢問。

當TaskSchduler周遊第一個Job的時候,會按照下圖所示的順序去分别調用Job的三個擷取任務的接口。當擷取Map任務的時候,首先調用

obtainNewNodeOrRackLocalMapTask()

詢問該Job有沒有符合該TaskTracker所在的節點的計算本地性的任務,這裡的計算本地性包括

node-local

rack-local

。如果有而且該任務可以被配置設定給該TaskTracker,那麼就配置設定給它。如果沒有,那麼就會調用obtainNewNonLocalMapTask()詢問該Job是否有其他的Map任務可以配置設定給該TaskTracker,即使這個任務所需要的資料和目前TaskTracker所在的節點不在同一個機架内。

當TaskSchduler需要排程Reduce任務的時候,還是按照順序去周遊所有的作業(Job), 然後調用每個Job的obtainNewReduceTask()來詢問是否有可以配置設定給該TaskTracker的Reduce任務。如果有則配置設定給它。如果沒有,那麼TaskSchduler就會周遊下一個Job來進行詢問。

mapreduce工作流程_MapReduce(1)——MapReduce是如何實作計算向資料遷移的

總結

本篇文章我們主要介紹了Hadoop1.0中是如何實作計算向資料移動的。需要再次說明的是,任務排程配置設定時的計算本地性考慮隻是針對Map任務,對Reduce的任務配置設定,我們不需要考慮計算本地性。希望上述的文章,對大家了解MapReduce的任務排程能有個大緻的了解。後續還會繼續寫一系列的MapReduce文章,如果感興趣,歡迎關注我的專欄~