MapReduce:大型叢集上的簡單資料處理
摘要
MapReduce是一個程式設計模型和一個處理和生成大資料集的相關實作。使用者指定一個map函數處理一個key-value對來生成一組中間key-value對;指定一個reduce函數合并所有和同一中間key值相聯系的中間value值。許多現實世界中的任務以這個模型展現,就像文中展示的那樣。
以這種函數類型編寫的程式在一群日常機器上自動并行化并執行。運作時系統關心劃分輸入資料的細節,在一組機器間排程程式的執行,處理機器失效,管理内部機器需要的通信。這使得那些沒有任何并行和分布式系統經驗的程式員可以容易地使用大型分布式系統的資源。
我們MapReduce的實作運作在一大群日常機器上并且高度可擴張:一個典型的MapReduce計算在成千上萬台機器上處理數TB的資料。程式員發現系統很易用:成百上千的MapReduce程式已經被實作,每天都有多餘1000個MapReduce作業在Google叢集上執行。
1.介紹
在過去的5年裡,作者以及Google裡的其他程式員已經實作了數以百計的,特殊目的的計算。這些計算處理海量原始資料,比如,文檔抓取(shijin:類似網絡爬蟲的程式)、web請求日志等;或者計算各種各樣的派生資料,比如反向索引、web文檔的圖結構的各種表示形勢、每台主機上網絡爬蟲抓取的頁面數量的彙總、列舉一天中一組最頻繁的查詢等。大多數這種計算概念上直截了當。然而輸入的資料量巨大,并且為了在合理的時間内完成,計算不得不分布到數百或數千台機器上。如何并行計算、分發資料、處理失效的問題湊在一起使原本簡單的計算晦澀難懂,需要大量複雜的代碼來處理這些問題。
作為對上述複雜性的應對,我們設計一個新的抽象模型,其使我們可以表達我們試圖執行的簡單運算,但是将并行、容錯、資料分布和負載均衡等散亂的細節隐藏在了一個庫裡面。我們抽象模型的靈感來自Lisp和許多其他函數式語言的map和reduce原語。我們意識到大多數我們的計算都涉及這樣的操作:在我們輸入中的每個邏輯“記錄”上應用map操作,以便計算出一組中間key/value對,然後在所有享有相同key值的value值應用reduce操作,以便合适地合并派生的資料。我們使用帶有使用者指定map和reduce操作的函數模型,就可以輕易地并行化大規模計算;并且可以使用“再次執行”(re-execution)作為基礎的容錯機制。
這項工作的主要貢獻是一個簡單強大的接口,該接口使自動地并行化和分布大規模計算成為了可能,接口連同該接口的實作,實作了在大群日常PC機上的高性能。
第二節描述基本的程式設計模型給出一些例子。第三節描述了為我們基于叢集的計算環境定做的MapReduce接口的實作。第四節描述一些我們發現有用的程式設計模型優化。第五節對我們各種不同任務實作的性能進行了測量。第六節探索了MapReduce在Google内部的使用,包含我們在使用其作為我們生産索引系統的重寫操作的基礎時的經驗。第七節讨論相關的和未來的工作。
2.程式設計模型
計算取出一組輸入key/value對,産生一組輸出key/value對。使用MapReduce庫的使用者用兩個函數表達這個計算:Map和Reduce。
使用者自己編寫的Map接受一個輸入對,然後産生一組中間key/value對。MapReduce庫把所有和相同中間key值I關聯的中間value值聚集在一起後傳遞給Reduce函數。
也是由使用者編寫的Reduce函數接受一個中間key值I和一組那個key值的value值。Reduce函數将這些value值合并在一起,形成一個可能更小的value值的集合。通常每次Reduce調用僅産生0或1個輸出value值。中間value值通過一個疊代器提供給使用者的Reduce函數,這樣我們就可以處理因太大而無法适應記憶體的value值清單。
2.1 例子
考慮這麼一個問題,在一個大的文檔集合中對每個單詞出現的次數進行計數,使用者可能要類似下面僞代碼的代碼:
map(String key, String value):
// key: document name
// value: documen t contents
for each word w in value:
EmitIntermediate(w, “1”);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in value s:
result += Parse Int(v);
Emit(AsString(result));
Map函數emit每個詞加上一個相關的出現計數(在這個簡單的例子裡就是1)。Reduce函數把為一個特定的詞emit的所有計數加起來。
另外,使用者編寫代碼用輸入和輸出檔案的名字以及可選的調節參數填充一個mapreduce說明對象,使用者之後調用MapReduce函數,并把這個說明對象傳遞給它。使用者的代碼和MapReduce庫連結在一起(用C++實作)。附錄A包含了這個例子的全部程式文本。
2.2 類型
盡管在前面的僞代碼按照字元串輸入輸出書寫,但是在概念上,使用者提供的map和reduce函數有相關的類型:
map (k1,v1) ->list(k2,v2)
reduce (k2,list(v2)) ->list(v2)
比如,輸入的key值和value值與輸出的key值和value值從不同的類型域得到。并且,中間key值和value值與輸出key值和value值來自同一個類型域。(alex注:原文中這個domain的含義不是很清楚,我參考Hadoop、KFS等實作,map和reduce都使用了泛型,是以,我把domain翻譯成類型域)。
我們的C++實作使用字元串類型作為使用者自定義函數的輸入輸出,并将字元串與适當類型的轉換工作交給了客戶代碼。
2.3 更多的例子
這裡有一些有趣程式的簡單例子,可以很容易地作為MapReduce計算來表示:
分布式的Grep:如果與提供的模式串比對,Map函數emit一行,Reduce函數是一個恒等函數,即僅僅把提供的中間資料複制到輸出。
URL通路頻率計數:Map函數處理web頁面請求的日志,然後輸出(URL,1)。Reduce函數把相同URL的value值加在一起,emit一個(URL, 總數)對。
倒轉網絡連結圖:Map函數為每個連結輸出(target,source)對,每個連結連接配接到在名字叫源的頁面中發現的目标URL。Reduce函數把與給定目标URL相關的所有源URL連接配接成清單,emit(target,list(source))對。
每個主機的檢索詞向量:檢索詞向量将一個文檔或者一組文檔中出現的嘴重要的詞彙總為一個(詞,頻)對清單。Map函數為每個輸入文檔emit(主機名, 檢索詞向量),其中主機名來自文檔的URL。Reduce函數為一個給定主機接收所有每文檔檢索詞向量,其将這些檢索詞向量加在一起,丢棄低頻的檢索詞,然後emit一個最終的(主機名, 檢索詞向量)對。
反向索引:Map函數解析每個文檔,emit一系列(詞, 文檔号)清單,Reduce函數接受一個給定詞的所有(詞, 文檔号)對,排序相關的文檔号,emit(詞,list(文檔号))。所有的輸出對集合形成一個簡單的反向索引,增加計算來跟蹤詞在文檔中的位置很簡單。
分布式排序:Map函數從每個記錄提取key值,emit(key,record)對。Reduce 函數原封不動地emit所有對。這個運算依賴4.1描述的分區裝置和4.2節描述的排序屬性。
3.實作
MapReduce有多種不同的可能實作。正确的選擇取決于環境。例如,一種實作可能适用于小型共享記憶體的機器,另外一種則适用于大型NUMA多處理器,然而還有的适合大型的網絡叢集。
本章節描述一個針對Google内部廣泛使用的運算環境的實作:大群日常機器用交換以太網連接配接在一起。在我們環境中:
1. 機器通常是x86雙核處理器、運作Linux系統、每台機器2-4GB記憶體。
2. 使用日常網絡硬體,通常在機器級别帶寬為百兆每分或者千兆每分,但是平均遠小于網絡整體帶寬的一半。(averaging considerably less in overall bisection bandwidth)
3. 叢集包含數百或數千台機器,是以機器失效是常态。
4. 存儲由直接附屬在個體機器上的廉價IDE硬碟提供。一個内部開發的分布式檔案系統用來管理存儲在這些磁盤上的資料。檔案系統使用副本來提供不可靠的硬體上的可用性和可靠性。
5. 使用者向一個排程系統送出作業。每個作業包含一組任務,排程系統将這些任務映射到一組叢集内部可用的機器上。
3.1 執行概覽
Map調用通過将輸入資料自動分為M個片段(a set of M splits)的方式被分布到多台機器上。輸入片段能夠被不同的機器并行處理。Reduce調用使用分區函數将中間key值空間分成R份(例如,hash(key) mod R),繼而被分布到多台機器上執行。分區數量(R)和分區函數由使用者指定。
圖1展示了我們的實作中MapReduce操作的整體流程。當使用者調用MapReduce函數時,下列動作發生(圖一中的數字标簽對應下面清單中的序号):
1. 使用者程式中的MapReduce庫首先将輸入檔案分成M份,每份通常在16MB到64MB之間(可以通過可選參數由使用者控制)。然後在機器叢集中啟動許多程式副本。
2. 程式副本中有一個是特殊-master。其它的是worker,由master配置設定工作。有M個map任務和R個reduce任務将被配置設定,master選擇空閑的worker然後為每一個worker配置設定map任務或reduce任務。
3. 被配置設定了map任務的worker讀取相關輸入片段的内容,它從輸入的資料片段中解析出key/value對,然後把key/value對傳遞給使用者自定義的Map函數,由Map函數生成的中間key/value對緩存在記憶體中。
4. 緩存對被定期地寫入本地磁盤,被分區函數分成R個域。緩存對在本地磁盤上的位置被傳回master,master負責将這些位置轉寄給reduce worker。
5. 當一個reduce worker被master告知位置資訊後,它使用遠端過程調用從map worker的本地磁盤讀取緩存資料。當一個reduce worker讀取了所有的中間資料後,它通過中間key值對緩沖資料排序,以便相同key值的出現組織在一起。由于通常許多不同的key值映射到同一reduce任務上,是以排序是需要的。如果中間資料量太大而無法适應記憶體,那麼就使用外部排序。
6.Reduce worker疊代排序後的中間資料,對于每一個遇到的唯一的中間key 值,Reduce worker将這個key值和與它相關的中間value值的集合傳遞給使用者的Reduce函數。Reduce函數的輸出被追加到這個reduce分區的一個最終輸出檔案。
7. 當所有的map和reduce任務完成之後,master喚醒使用者程式。此時此刻,使用者程式裡的對MapReduce調用傳回使用者代碼。
成功完成之後,mapreduce執行的輸出可以在R個輸出檔案中得到(每個檔案對應一個reduce任務,檔案名由使用者指定)。通常,使用者不需要将這R個輸出檔案合并成一個檔案-他們經常把這些檔案作為輸入傳遞給另外一個MapReduce調用,或者在另外一個分布式應用中使用它們,這種分布式應用能夠處理分成多個檔案的輸入。
3.2 Master資料結構
Master保持一些資料結構,對每一個map和reduce任務,它儲存其狀态(空閑、進行中或已完成),以及Worker機器(對于非空閑任務)的身份(identity)。
Master是一個管道,通過它中間檔案域的位置資訊從map任務傳播到reduce任務。是以,對于每個已經完成的map任務,master存儲了map任務産生的R個中間檔案域的位置和大小。當map任務完成時,接收到了位置和大小資訊的更新,這些資訊被遞進地推送給那些正在運作的reduce任務。
3.3 容錯
因為MapReduce庫是設計用來協助使用數百數千的機器處理超大規模資料的,這個庫必須優雅地處理機器故障。
worker故障
master周期性地ping每個worker。如果在一個确定的時間段内沒有收到worke的回應,master将這個worker标記為失效。任何由這個worker完成的map任務被重置回它們初始的空閑狀态,是以變得可以被排程到其它worker。同樣,在一個失效的worker上正在運作的map或reduce任務被重置為空閑狀态,變得可被重新排程。
故障時已完成的map任務必須重新執行是因為它們的輸出被存儲在失效機器的本地磁盤上,是以不可通路了。已經完成的reduce任務不需要再次執行,因為它們的輸出存儲在全局檔案系統。
當一個map任務首先被worker A執行,之後被worker B執行(因為A失效),所有執行reduce任務的worker會接到重新執行的通知。還沒有從worker A讀取資料的任何reduce任務将從worker B讀取資料。
MapReduce對規模worker失效很有彈性。例如,在一次MapReduce操作執行期間,在正在運作的叢集上進行的網絡維護一次造成一組80台機器在幾分鐘内無法通路,MapReduce master隻需簡單德再次執行那些不可通路的worker完成的工作,然後繼續執行,直終完成這個MapReduce操作。
master失敗
讓master定期對上面描述的master資料結構作檢查點很簡單。如果這個master任務失效了,一個新的備份可以從上一個檢查點狀态啟動。然而,考慮到隻有一個單獨master,master失效是不太可能的,是以我們現在的實作是如果master失效,就中止MapReduce運算。客戶可以檢查這個情況,并且如果需要可以重試MapReduce操作。
在失效面前的語義
(semantics in the presence of failures)
當使用者提供的map和reduce操作是輸入值的确定性函數,我們的分布式實作産生相同的輸出,就像沒有錯誤、順序執行整個程式産生的一樣。
我們依賴對map和reduce任務的輸出是原子送出來完成這個特性。每個工作中的任務把它的輸出寫到私有的臨時檔案中。一個reduce任務生成一個這樣的檔案,并且一個map任務生成R個這樣的檔案(一個reduce任務對應一個)。當一個map任務完成時,worker向master發送一個消息,消息中包含R個臨時檔案名。如果master收到一個已經完成的map任務的完成消息,它将忽略這個消息;否則,master将這R個檔案的名字記錄在一個master資料結構裡。
當一個reduce任務完成時,reduce worker原子性地将臨時檔案重命名為最終的輸出檔案。如果同一個reduce任務在多台機器上執行,針對同一個最終輸出檔案将有多個重命名調用執行。我們依賴底層檔案系統提供的原子重命名操作來保證最終的檔案系統狀态僅僅包含reduce任務一次執行産生的資料。
絕大多數map和人reduce操作是确定的,而且存在這樣的一個事實:我們的語義等同于順序的執行,在這種情況下,程式員可以很容易推斷他們程式的行為。當map或/和reduce操作是不确定性的時候,我們提供較弱但是依然合理的語義。在非确定性操作面前,一個特定reduce任務R1的輸出等價于一個非确定性程式順序執行産生的R1的輸出。然而,一個不同reduce任務R2的輸出可能相當于一個不同的非确定行程式順序執行産生的R2的輸出。
考慮map任務M和reduce任務R1、R2。設e(Ri)是Ri送出的執行過程(隻有一個這樣的執行過程)。由于e(R1)可能讀取了由M一次執行産生的輸出,而e(R2)可能讀取了由M的不同執行産生的輸出,較弱的語義随之而來。
3.4 存儲位置
在我們的計算環境中,網絡帶寬是一個相當稀缺的資源。我們通過充分利用輸入資料(由GFS管理)存儲組成叢集的機器的本地磁盤上這樣一個事實來節省網絡帶寬。GFS 把每個檔案分成64MB的塊,并且在不同機器上存儲每個塊的一些拷貝(通常是3個拷貝)。考慮到輸入檔案的位置資訊,MapReduce的master試圖将一個map任務排程到包含相關輸入資料拷貝的機器上;嘗試失敗的話,它将嘗試排程map任務到靠近任務輸入資料副本的機器上(例如,一個包含資料并在同一網關上的worker機器)。當在一個叢集的大部分worker上運作大型MapReduce操作的時候,大部分輸入資料從本地機器讀取,并且不消耗帶寬。
3.5 任務粒度
如上所述,我們把map階段細分成M個片段、把reduce 階段細分成R個片段。理想情況下,M和R應當比worker機器的數量要多得多。在每台worker上執行許多不同的任務提高動态負載平衡,并且在worker故障時加速恢複:失效機器上完成的很多map任務可以分布到所有其他的worker機器。
但是在我們的實作中對M和R的取值有實際的限制,因為master必須制定O(M+R)排程政策,并且如上所述在記憶體中儲存O(M*R)個狀态(然而記憶體使用的常數因子還是比較小的:O(M*R)片狀态大約包含每對map任務/reduce任務對1個位元組)。
此外,R值通常是被使用者制約的,因為每個reduce任務的輸出最終在一個獨立的輸出檔案中。實際上,我們傾向于選擇M值以使每個獨立任務有大約16M到64M的輸入資料(這樣如上所述的本地最優化最有效),我們把R值設定為我們想使用的worker機器數量的一個小的倍數。我們通常用,使用2000台worker機器,以M=200000,R=5000來執行MapReduce計算。
3.6 備用任務
延長一個MapReduce操作花費的總時間的常見因素之一是“落伍者”:一台機器花費了不同尋常的長時間才完成計算中最後幾個map或reduce任務之一,出現“落伍者”的原因非常多。比如:一個有壞磁盤的機器可能經曆頻繁的糾錯以緻将其讀性能從30M/s降低到1M/s。叢集排程系統可能已經降其他任務排程到這台機器上,由于CPU、記憶體、本地磁盤和網絡帶寬的競争導緻執行MapReduce代碼更慢。我們最近遇到的一個問題是機器初始化代碼中的bug,導緻處理器緩存失效:受影響機器上的計算減慢了超過百倍。
我們有一個通用的機制來減輕“落伍者”的問題。當一個MapReduce操作接近完成的時候,master排程剩餘正在運作任務的備份執行、無論主執行還是備份執行完成,任務被标記為已完成。我們調優了這個機制,以便其通常增加操作不多于幾個百分點的計算資源。作為示例,5.3節描述的排序程式在關掉備用任務機制時要多花44%的時間完成。
4.精細化
盡管簡單地書寫Map和Reduce函數提供的基本功能能夠滿足大多數需求,我們還是發現了一些有用的擴充。在本節做了描述。
4.1 分區函數
MapReduce的使用者指定他們需要的reduce任務/輸出檔案的數量(R)。我們在中間key值上使用分區函數将資料在這些任務間分開。一個預設的分區函數是使用哈希(比如,hash(key) mod R)提供的。它傾向于導緻相當均衡的分區。然而,在某些情況下,通過key值的其它函數分割資料是有用的。比如,輸出的key值是URLs,我們希望單個主機的所有條目以結束在同一個輸出檔案中。為了支援類似的情況,MapReduce庫的使用者可以提供一個專門的分區函數。例如,使用“hash(Hostname(urlkey)) mod R”作為分區函數就可以把所有來自同一個主機的URL結束在同一個輸出檔案中。
4.2 順序保證
我們保證在給定的分區中,中間key/value對是以key值遞增的順序處理的。這個順序保證每個分成生成一個有序的輸出檔案很容易,這在下列情況時很有用:輸出檔案的格式需要支援按key值高效地随機通路查找,或者輸出的使用者有序的資料很友善。
4.3 合并函數
某些情況下,每個map任務産生的中間key值有顯著的重複,并且使用者指定的Reduce函數滿足結合律和交換律。這個情況很好的例子是2.1節中的詞數統計示例。由于詞頻傾向于滿足zipf分布,每個map任務将産生數百數千形如<the,1>的記錄。所有這些記錄将通過網絡被發送到一個單獨的reduce任務,然後被Reduce函數累加起來産生一個數。我們允許使用者指定一個可選的合并函數,其在通過網絡發送資料之前對資料進行部分合并。
合并函數在每台執行map任務的機器上執行。通常使用相同的代碼實作合并函數和reduce函數。合并函數和reduce函數唯一的差別就是MapReduce庫如何處理函數的輸出。Reduce函數的輸出被寫入在最終的輸出檔案,合并函數的輸出被寫到中間檔案裡,該檔案被發送給reduce任務。
部分合并顯著加速了MapReduce操作中的某些類。附錄A包含一個使用合并函數的例子。
4.4 輸入輸出類型
MapReduce庫支援讀取一些不同的格式的輸入資料。比如,文本模式輸入将每一行視為一個key/value對。key是檔案中的偏移,value是那一行的内容。另外一種通常支援的格式以key值排序存儲了一系列key/value對。每種輸入類型的實作都懂得如何把自己分割成有意義的範圍,以便作為獨立的map處理(例如,文本模式的範圍分割確定分割隻在行邊界發生)。雖然大多數使用者僅僅使用少量預定義輸入類型之一,但是使用者可以通過提供一個簡單的Reader接口的實作支援一個新的輸入類型。
需要提供資料的reader不必從檔案中讀取,比如,我們可以容易地定義一個從資料庫裡讀記錄的reader,或者從映射在記憶體中的資料結構讀。
類似的,我們為生産不同格式的資料提供一組輸出類型,使用者代碼可以容易地為新的輸出類型添加支援。
4.5 副作用
某些情況下,MapReduce的使用者發現從map和/或reduce操作中産生作為附加輸出的輔助檔案比較友善。我們依賴程式writer把這種“副作用”變成原子的和幂等的(alex注:幂等的指一個總是産生相同結果的數學運算)。通常應用程式首先寫到一個臨時檔案,在全部生成之後,原子地将這個檔案重新命名。
我們不為單個任務産生的多個輸出檔案的原子兩步送出提供支援。是以,産生多個輸出檔案、并且具有跨檔案一緻性要求的任務,必須是确定性的。實際這個限制還沒有成為問題。
4.6 跳過受損記錄
有時候,使用者代碼中的bug導緻map或者reduce函數在某些記錄上确定性地崩潰。這樣的bug阻止MapReduce操作完成。應對的通常過程是修複bug,但是有時不可行;可能這個bug在第三方庫裡,其源碼是得不到的。而且有時忽略一些記錄是可以接受的,比如在一個大資料集上進行統計分析。我們提供了一種可選的執行模式,這種模式下,為了保證繼續進行,MapReduce庫探測哪些記錄導緻确定性的崩潰,并且跳過這些記錄。
每個worker程序都安裝了一個捕獲段例外和總線錯誤的信号句柄。在調用使用者的Map或Reduce操作之前,MapReduce庫在一個全局變量中儲存了參數的序号。如果使用者代碼産生了一個信号,信号句柄向MapReduce的master發送包含序列号的“奄奄一息”的UDP包。當master在特定記錄上看到多餘一次失敗時,當master釋出相關Map或者Reduce任務的下次重新執行時,它就指出這條記錄應該跳過。
4.7 本地執行
調試Map和Reduce函數的是非常棘手的,因為實際的執行發生在分布式系統中,通常是在數千台機器上,由master動态地制定工作配置設定政策。為了促進調試、性能剖析和小規模測試,我們開發了一套可選的MapReduce庫的實作, MapReduce操作在本地計算機上順序地執行所有工作。為使用者提供了控制以便把計算限制到特定的map任務上。使用者通過特殊的标志來調用他們的程式,然後可以容易地使用他們覺得有用的調試和測試工具(比如gdb)。
4.8 狀态資訊
Master運作着内部的HTTP伺服器并且輸出一組供人消費的狀态頁面。狀态頁面顯示包括計算的進展,比如已經完成了多少任務、有多少任務正在進行、輸入的位元組數、中間資料的位元組數、輸出的位元組數、進行的百分比等等。頁面還包含指向每個任務産生的stderr和stdout檔案的連結。使用者可以使用這些資料預測計算需要執行多長時間、是否需要增加向計算增加更多的資源。這些頁面也可以用來搞清楚什麼時候計算比預期的要慢。
另外,最頂層的狀态頁面顯示了哪些worker失效了,以及他們失效的時候正在運作的map和reduce任務是哪些。這些資訊在嘗試診斷使用者代碼中bug的時候很有用。
4.9 計數器
MapReduce庫提供了一個計數器促進統計各種事件發生次數。比如,使用者代碼可能想統計處理的總詞數、或者已經索引的德文文檔數等等。
為了使用這個附加功能,使用者代碼建立一個名叫計數器的對象,然後在在Map和Reduce函數中适當地增加計數。例如:
Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
for each word w in c ontents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w , “1″);
這些來自單個worker機器的計數值周期性傳播到master(附加在ping的應答包中)。master把來自成功執行的map和reduce任務的計數器值進行累加,當MapReduce操作完成後,傳回給使用者代碼。目前的計數器值也會顯示在master的狀态頁面上,這樣人們可以觀看目前計算的進度。當累加計數器值的時候,master排除同一map或者reduce任務重複執行的影響,避免重複計數(重複執行額可以起因于我們使用備用任務以及失效後任務的重新執行)。
有些計數器的值由MapReduce庫自動維護,比如已經處理的輸入key/value對的數量、已經産生的輸出key/value對的數量。
使用者發現計數器附加功能對MapReduce操作行為的完整性檢查有用。比如,在一些MapReduce操作中,使用者代碼可能需要確定産生的輸出對的數量精确的等于處理的輸入對的數量,或者處理的German文檔的比例在處理文檔的整體數量中在可以容忍的比例。
5.性能
本節我們用在一大群機器上運作的兩個計算來測量MapReduce的性能。一個計算搜尋大約1TB的資料查找特定的模式比對,另一個計算排序大約1TB的資料。
這兩個程式是由MapReduce使用者書寫的實際程式子集的代表-一類程式是将從一種表現形式打亂為另外一種表現形式;另一類是從大資料集中抽取少量使用者感興趣的資料。
5.1 叢集配置
所有程式都運作在一個大約由1800台機器組成的叢集上。每台機器有兩個2G主頻、支援超線程的Intel Xeon處理器,4GB的記憶體,兩個160GB的IDE硬碟和一個千兆以太網。這些機器被安排在一個兩層樹形交換網絡中,在root節點大約支援100-200GBPS的合計帶寬。所有機器使用相同主機裝置,是以任意對之間的往返時間小于1毫秒。
4GB的記憶體中,大約1-1.5G被運作在叢集上的其他任務預訂。程式在周末下午執行,這時CPU、磁盤和網絡大多數空閑。
5.2 查找
這個grep程式從頭到尾掃描1010個100位元組的記錄,查找相當稀少的3個字元的模式(這個模式出現在92337個記錄中)。輸入資料被分割成大約64M的塊(M=15000,shijin:1010*100/(64*1024*1024)),整個輸出被放在一個檔案中(R=1)。
圖2 顯示了運算随時間的進展。Y軸表示輸入資料的掃描速度。這個速度随着更多的機器被配置設定到MapReduce計算中而增加,當1764台worker被配置設定,速度達到超過30GB/s的峰值。當Map任務結束時,速度開始降低并在計算到80秒時達到0。整個計算從開始到結束大約花了150秒。這包括大約一分鐘的啟動開銷。開銷起因于程式傳播到所有worker機器、與GFS互動打開1000個輸入檔案集合的延遲、擷取檔案優化所需資訊的時間。
5.3 排序
排序程式對1010個100位元組的記錄(大約1TB的資料)排序。這個程式模仿TeraSort基準測試程式[10]。
排序程式由不到50行代碼組成。一個三行Map函數從一個文本行中提取出10位元組的排序key值,并且将key值和原始文本行作為中間key/value對emit。我們使用了一個内置的恒等函數作為Reduce操作。這個函數将中間key/value對作為輸出輸出key/value對不加修改地傳送。最終已排序的輸出寫入到一組雙備份(2-way replicated)的GFS檔案(也就是說,作為程式輸出,2TB 的資料被寫入)。
像以前一樣,輸入資料被分割成64MB的片(M=15000)。我們把已排序的輸分到4000個檔案(R=4000)。分區函數使用key值的原始位元組将其分離到R個片段其中之一。
我們這個基準測試的分區函數具有key值分布的内置knowledge。在一個普通排序程式中,我們會增加一個預處理的MapReduce操作,用于key值的樣本并使用key值樣本的分布計算最終排序過程的分割點。
圖三(a)顯示了這個排序程式的正常執行過程。左上的圖顯示了輸入資料的讀取速度。速度達到大約13GB/s的峰值,并且自從所有map任務完成後,在200秒消逝前速度下降的相當快。值得注意的是,輸入速度小于grep。這是因為排序map任務花了大約一半的時間和I/O帶寬把中間輸出寫到本地硬碟。Grep的相應中間輸出大小可以忽略不計。
左邊中間的圖顯示了資料通過網絡從map任務發送到reduce任務的速度。這個重排從第一個ma任務完成就開始了。圖中的第一個峰值是第一批大約1700個reduce任務(整個MapReduce大約配置設定了1700台機器,每台機器一次至多執行1個reduce任務)。計算進行到大約300秒後,第一批reduce任務中的其中一些結束,我們開始為剩餘的reduce任務重排資料。所有的重排大約在計算進行到600秒時結束。
左下圖顯示已排序的資料由reduce任務寫入最終輸出檔案的速度。在第一個重排階段結束和寫階段開始之間有一個延遲,這是因為機器正忙于排序中間資料。寫操作以2-4GB/s的速度持續了一段時間。所有的寫操作在計算進行到850 秒時結束。計入啟動的開銷,整個運花費了891秒。這和目前已報道的最好結果類似,該記錄是TeraSort基準測試[18]的1057秒。
一些要注意的事情:輸入速度高于重排速度高于輸出速度是因為我們的本地化優化政策-大部分資料從本地硬碟讀取,繞考了我們的相關帶寬限制網絡。重排速度比輸出速度高是因為輸出過程寫了兩份已排序資料(我們基于可靠性和可用性的原因生成了輸出的兩個副本)。我們寫了兩個副本是因為這是底層檔案系統為可靠性和可用性提供的機制。如果底層檔案系統使用糾删碼編碼[14]而不是備份的方式,寫資料需要的網絡帶寬将會減少。
5.4 備用任務的影響
圖三(b)顯示了關閉備用任務的一個排序程式的執行情況。執行的流程和圖3(a)顯示的類似,除了在沒有任何寫活動出現的地方有一個很長的尾巴。960秒後,隻有5個reduce任務沒有完成。然而這些最後的少量落伍者300秒之後才結束。整個計算花費了1283秒,在消逝的時間中增加了44%(?)。
5.5 機器故障
在圖三(c)顯示這樣一個排序程式的執行,其中我們在計算進行幾分鐘後故意殺掉了1746個worker程序中的200個。底層叢集排程器立刻在這些機器上重新開機新的worker程序(因為隻是程序被殺死了,機器仍在正确運作)。
Worker程序死亡顯示為“負”的輸入速度,因為之前一些完成的map工作消失了(由于相應的map worker程序被殺掉了)并且需要重做。這個map工作的重新執行相當快。整個計算,包含啟動開銷,在933秒内完成(隻比正常執行時間增加了5%)。
6.經驗
我們在2003年1月寫了MapReduce庫的第一個版本,然後在2003年8月做了顯著增強,包括本地優化、任務執行在worker機器之間的動态負載均衡等等。從那以後,我們驚喜于MapReduce庫能如此廣泛地應用于于我們從事工作中的各種問題。它已經用于Google内部很廣的區域範圍,包括:
·大規模機器學習問題問題,
·Google News和Froogle産品的叢集問題
·用于生産受歡迎查詢(比如Google Zeitgeist)的報告的資料提取。
·為新的實驗或者産品提取網頁屬性(例如,為本地搜尋從大型網頁庫中抽取地理位置資訊)。
·大規模的圖形計算。
圖四顯示了随着時間推移,我們的基礎源碼管理系統中經驗證的獨立的MapReduce程式數量的顯著增加。從2003年早期的0個增長截止2004年9月底的幾乎900獨立的執行個體。MapReduce如此成功是因為,使用MapReduce庫能夠在半個小時過程内寫出一個能夠在上千台機器上高效運作的簡單程式,這極大地加快了開發和原形設計的周期。另外,它允許沒有分布式和/或并行系統經驗的程式員很容易地開發大量資源。
在每個作業結束的時候,MapReduce庫對作業使用的計算資源的相關統計資料進行記錄。在表1中,我們列出了2004年8月份運作在Google上的MapReduce任務的子集的統計資料。
6.1 大規模索引
目前為止我們對MapReduce最重要的使用之一就是重寫了産生Google網絡搜尋服務所使用的資料結構的生産索引系統。索引系統将通過爬蟲系統檢索的海量文檔作為輸入,這些文檔存儲為一組GFS檔案。這些文檔的原始内容的大小超過了20TB。索引程式作為一系列五到十個MapReduce操作運作。使用MapReduce(而不是索引系統的優先版本中自組分布式傳送)提供了一些好處:
·索引代碼更簡單、小巧、容易了解,因為處理容錯、分布式和并行的代碼隐藏在MapReduce庫内部。比如,當使用MapReduce表達時,一個計算過程的大小從大約3800行C++代碼減少到大約700行代碼。
·MapReduce庫的性能足夠好,是以我們可以把概念上不相關的計算分開,而不是混在一起以期避免資料傳送。這使修改索引程式相當簡單。比如,在舊的索引系統耗費幾個月的一次改變在新系統中隻需幾天實作。
·索引程式變得更易操作。因為由機器失效、慢速機器以及網絡臨時阻塞引起的大部分問題自動地由MapReduce庫解決,不需要操作人員幹預。另外,通過在索引叢集中增加機器可以容易地提高索引程式的性能。
7.相關工作
許多系統提供了嚴格的程式設計模型,并且使用這些限制來自動地并行化計算。例如,一個associative函數可以在logN的時間内在N個處理器上使用并行字首計算[6,9,13]來計算N個元素的數組的所有字首。MapReduce可以看作是基于我們在真實世界大型計算的經驗,對其中一些模型的簡化和淨化。更重要的是,我們提供了可擴充為上千台處理器的容錯實作。相比而言,大部并行處理系統隻是在小規模上實作,并且将處理機器故障的細節留給了程式員。
大型同步程式設計[17]和一些MPI原語[11]提供了更進階别的抽象,可以使程式員容易地編寫并行程式。MapReduce和這些系統的關鍵差別在于,MapReduce開發了一個受限的程式設計模型自動地并行化使用者程式,并提供透明的容錯處理。
我們本地優化的靈感來源于活躍硬碟[12,15],其中計算被推送到靠近本地磁盤的計算元素,繼而減少了通過網絡或IO子系統傳輸的資料量。我們運作在直接連接配接少量硬碟的日常機器上,而不是直接運作在在磁盤處理器上,但是大體的方法是類似的。
我們的備用任務機制類似于Charlotte System[3]應用的迫切排程機制。簡單迫切排程機制的其中一個缺點是如果一個給定任務反複失效,整個計算無法完成。我們通過跳過壞記錄的方式在我們機制中修正了這個問題的一些執行個體。
MapReduce的實作依賴于一個内部的叢集管理系統,該系統負責在一大群共享機器上分布和運作使用者任務。盡管不是本文的側重點,這個叢集管理系統在理念上和其它系統是一樣的,如Condor[16]。
MapReduce庫的一部分-排序附加功能和類似于對NOW-Sort[1]的操作。源機器(map workers)把待排序的資料分區後,發送到R個reduce worker中的一個。每個reduce worker在本地對資料進行排序(盡可能在記憶體中)。當然,NOW-Sort沒有使用者自定義的Map和Reduce函數,這倆函數使我們的庫具有廣泛的适用性。
River[2]提供了一個程式設計模型,其中程式通過分布式隊列發送資料進行互相通信。和MapReduce類似,River系統試圖即使在由異構硬體和系統擾動引入的不均勻面前也提供良好的平均情況性能。River通過精心排程硬碟和網絡傳輸來均衡完成時間以達到這個目的。通過限制程式設計模型,MapReduce架構能夠把問題分成大量精細任務。這些任務在可用的worker上動态排程,是以速度快的worker執行更多的任務。受限的程式設計模型也使我們可以排程接近作業末尾的任務備援執行,減少在不均勻面前的完成時間(比如有較慢或者受阻的worker)。
BAD-FS[5]有一個和MapReduce不同的程式設計模型,不像MapReduce那樣,它是針對廣域網上的作業執行。然而,有兩個基本相似點。(1)兩個系統都使用備援執行的方式從失效導緻的資料丢失中恢複。(2)兩個都使用本地化排程減少通過擁堵網絡連接配接發送的資料量。
TACC[7] 是一個設計用來高可用性網絡服務的構造的系統。與MapReduce類似,它也依賴備援執行作為實作容錯的機制。
8.總結
MapReduce程式設計模型在Google内部成功用于多個目的。我們把這種成功歸因為幾個方面:首先,由于它隐藏了并行、容錯、本地優化、負載均衡的細節,即便對于沒有并行和分布式系統經驗的程式員而言,模型也易于使用;其次,大量不同種類的問題都可以作為MapReduce計算簡單的表達。比如,MapReduce用于生成Google的網絡搜尋服務産品所需要的資料、用來排序的資料、用來資料挖掘的資料、用于機器學習的資料,以及許多其它系統;第三,我們開發了一個擴充到由數千台機器組成的大型叢集上的MapReduce實作。這個實作使得使用這些機器資源很有效,是以也适合用在Google内部遇到的許多大型計算問題中。
我們也從這項工作中學到了一些事。首先,限制這個程式設計模型使得并行和分布式計算非常容易,對計算進行容錯也非常容易;其次,網絡帶寬是稀缺資源。大量的系統優化是以是旨在減少通過網絡傳輸的資料量:本地優化允許我們從本地磁盤讀取資料,将中間資料的單獨一份拷貝寫入本地磁盤節約了網絡帶寬;第三,備援執行可以用來降低慢速機器的影響(alex注:即硬體配置的不平衡),并且可以用來處理機器失效和資料丢失。
附錄:詞頻程式
本節包含這樣一個程式:程式統計在一組由指令行指定的輸入檔案中每個唯一的詞出現的次數。
#include "mapreduce/mapreduce.h"
//使用者的map函數
Class WordCounter : public Mapper{
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for(int I = 0;I < n;) {
//跳過導緻空格的i
while((i < n) & & isspace(text[i]))
i++;
//找到詞尾
int start = i;
while((i < n) && !isspace(text[i]))
i++;
if(start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//使用者的reduce函數
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//周遊所有key值相同的條目并累加value值
int64 value = 0;
while(!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc,char** argv){
ParseCommandLineFlags(argc,argv);
MapReduceSpecification spec;
//将輸入檔案清單存入"spec"
for(int i = 1;I < argc;i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定輸出檔案:
///gfs/test/freq-00000-of-00100
///gfs/test/freq-00001-of-00100
//...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
//可選:在map任務内部進行并行加和以節省網絡帶寬
out->set_combiner_class("Adder");
//調節參數:每個任務使用至多2000台機器和100M的記憶體
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
//運作
MapReduceResult result;
if(!MapReduce(spec,&result))
abort();
//Done:result結構包含計數器,花費的時間以及用到的機器數目等
Return 0;
}