原文位址:
<a href="http://labs.google.com/papers/mapreduce.html">http://labs.google.com/papers/mapreduce.html</a>
譯者: alex
mapreduce是一個程式設計模型,也是一個處理和生成超大資料集的算法模型的相關實作。使用者首先建立一個map函數處理一個基于key/value pair的資料集合,輸出中間的基于key/value pair的資料集合;然後再建立一個reduce函數用來合并所有的具有相同中間key值的中間value值。現實世界中有很多滿足上述處理模型的例子,本論文将較長的描述這個模型。
mapreduce架構的程式能夠在大量的普通配置的計算機上實作并行化處理。這個系統在運作時隻關心:如何分割輸入資料,在大量計算機組成的叢集上的排程,叢集中計算機的錯誤處理,管理叢集中計算機之間必要的通信。采用mapreduce架構可以使那些沒有并行計算和分布式處理系統開發經驗的程式員有效利用分布式系統的豐富資源。
我們的mapreduce實作運作在規模可以靈活調整的由普通機器組成的叢集上:一個典型的mapreduce計算往往由幾千台機器組成、處理以tb計算的資料。程式員發現這個系統非常好用:已經實作了數以百計的mapreduce程式,在google的叢集上,每天都有1000多個mapreduce程式在執行。
在過去的5年裡,包括本文作者在内的google的很多程式員,為了處理海量的原始資料,已經實作了數以百計的、專用的計算方法。這些計算方法用來處理大量的原始資料,比如,文檔抓取(類似網絡爬蟲的程式)、web請求日志等等;也為了計算處理各種類型的衍生資料,比如反向索引、web文檔的圖結構的各種表示形勢、每台主機上網絡爬蟲抓取的頁面數量的彙總、每天被請求的最多的查詢的集合等等。大多數這樣的資料處理運算在概念上很容易了解。然而由于輸入的資料量巨大,是以要想在可接受的時間内完成運算,隻有将這些計算分布在成百上千的主機上。如何處理并行計算、如何分發資料、如何處理錯誤?所有這些問題綜合在一起,需要大量的代碼處理,是以也使得原本簡單的運算變得難以處理。
為了解決上述複雜的問題,我們設計一個新的抽象模型,使用這個抽象模型,我們隻要表述我們想要執行的簡單運算即可,而不必關心并行計算、容錯、資料分布、負載均衡等複雜的細節,這些問題都被封裝在了一個庫裡面。設計這個抽象模型的靈感來自lisp和許多其他函數式語言的map和reduce的原語。我們意識到我們大多數的運算都包含這樣的操作:在輸入資料的“邏輯”記錄上應用map操作得出一個中間key/value pair集合,然後在所有具有相同key值的value值上應用reduce操作,進而達到合并中間的資料,得到一個想要的結果的目的。使用mapreduce模型,再結合使用者實作的map和reduce函數,我們就可以非常容易的實作大規模并行化計算;通過mapreduce模型自帶的“再次執行”(re-execution)功能,也提供了初級的容災實作方案。
這個工作(實作一個mapreduce架構模型)的主要貢獻是通過簡單的接口來實作自動的并行化和大規模的分布式計算,通過使用mapreduce模型接口實作在大量普通的pc機上高性能計算。
第二部分描述基本的程式設計模型和一些使用案例。第三部分描述了一個經過裁剪的、适合我們的基于叢集的計算環境的mapreduce實作。第四部分描述我們認為在mapreduce程式設計模型中一些實用的技巧。第五部分對于各種不同的任務,測量我們mapreduce實作的性能。第六部分揭示了在google内部如何使用mapreduce作為基礎重寫我們的索引系統産品,包括其它一些使用mapreduce的經驗。第七部分讨論相關的和未來的工作。
mapreduce程式設計模型的原理是:利用一個輸入key/value pair集合來産生一個輸出的key/value pair集合。mapreduce庫的使用者用兩個函數表達這個計算:map和reduce。
使用者自定義的map函數接受一個輸入的key/value pair值,然後産生一個中間key/value pair值的集合。mapreduce庫把所有具有相同中間key值i的中間value值集合在一起後傳遞給reduce函數。
使用者自定義的reduce函數接受一個中間key的值i和相關的一個value值的集合。reduce函數合并這些value值,形成一個較小的value值的集合。一般的,每次reduce函數調用隻産生0或1個輸出value值。通常我們通過一個疊代器把中間value值提供給reduce函數,這樣我們就可以處理無法全部放入記憶體中的大量的value值的集合。
例如,計算一個大的文檔集合中每個單詞出現的次數,下面是僞代碼段:
map函數輸出文檔中的每個詞、以及這個詞的出現次數(在這個簡單的例子裡就是1)。reduce函數把map函數産生的每一個特定的詞的計數累加起來。
另外,使用者編寫代碼,使用輸入和輸出檔案的名字、可選的調節參數來完成一個符合mapreduce模型規範的對象,然後調用mapreduce函數,并把這個規範對象傳遞給它。使用者的代碼和mapreduce庫連結在一起(用c++實作)。附錄a包含了這個執行個體的全部程式代碼。
盡管在前面例子的僞代碼中使用了以字元串表示的輸入輸出值,但是在概念上,使用者定義的map和reduce函數都有相關聯的類型:
比如,輸入的key和value值與輸出的key和value值在類型上推導的域不同。此外,中間key和value值與輸出key和value值在類型上推導的域相同。
(alex注:原文中這個domain的含義不是很清楚,我參考hadoop、kfs等實作,map和reduce都使用了泛型,是以,我把domain翻譯成類型推導的域)。
我們的c++中使用字元串類型作為使用者自定義函數的輸入輸出,使用者在自己的代碼中對字元串進行适當的類型轉換。
這裡還有一些有趣的簡單例子,可以很容易的使用mapreduce模型來表示:
分布式的grep:map函數輸出比對某個模式的一行,reduce函數是一個恒等函數,即把中間資料複制到輸出。
計算url通路頻率:map函數處理日志中web頁面請求的記錄,然後輸出(url,1)。reduce函數把相同url的value值都累加起來,産生(url,記錄總數)結果。
倒轉網絡連結圖:map函數在源頁面(source)中搜尋所有的連結目标(target)并輸出為(target,source)。reduce函數把給定連結目标(target)的連結組合成一個清單,輸出(target,list(source))。
每個主機的檢索詞向量:檢索詞向量用一個(詞,頻率)清單來概述出現在文檔或文檔集中的最重要的一些詞。map函數為每一個輸入文檔輸出(主機名,檢索詞向量),其中主機名來自文檔的url。reduce函數接收給定主機的所有文檔的檢索詞向量,并把這些檢索詞向量加在一起,丢棄掉低頻的檢索詞,輸出一個最終的(主機名,檢索詞向量)。
反向索引:map函數分析每個文檔輸出一個(詞,文檔号)的清單,reduce函數的輸入是一個給定詞的所有(詞,文檔号),排序所有的文檔号,輸出(詞,list(文檔号))。所有的輸出集合形成一個簡單的反向索引,它以一種簡單的算法跟蹤詞在文檔中的位置。
分布式排序:map函數從每個記錄提取key,輸出(key,record)。reduce函數不改變任何的值。這個運算依賴分區機制(在4.1描述)和排序屬性(在4.2描述)。
mapreduce模型可以有多種不同的實作方式。如何正确選擇取決于具體的環境。例如,一種實作方式适用于小型的共享記憶體方式的機器,另外一種實作方式則适用于大型numa架構的多處理器的主機,而有的實作方式更适合大型的網絡連接配接叢集。
本章節描述一個适用于google内部廣泛使用的運算環境的實作:用以太網交換機連接配接、由普通pc機組成的大型叢集。在我們的環境裡包括:
x86架構、運作linux作業系統、雙處理器、2-4gb記憶體的機器。
普通的網絡硬體裝置,每個機器的帶寬為百兆或者千兆,但是遠小于網絡的平均帶寬的一半。 (alex注:這裡需要網絡專家解釋一下了)
叢集中包含成百上千的機器,是以,機器故障是常态。
存儲為廉價的内置ide硬碟。一個内部分布式檔案系統用來管理存儲在這些磁盤上的資料。檔案系統通過資料複制來在不可靠的硬體上保證資料的可靠性和有效性。
使用者送出工作(job)給排程系統。每個工作(job)都包含一系列的任務(task),排程系統将這些任務排程到叢集中多台可用的機器上。
通過将map調用的輸入資料自動分割為m個資料片段的集合,map調用被分布到多台機器上執行。輸入的資料片段能夠在不同的機器上并行處理。使用分區函數将map調用産生的中間key值分成r個不同分區(例如,hash(key) mod r),reduce調用也被分布到多台機器上執行。分區數量(r)和分區函數由使用者來指定。
圖1展示了我們的mapreduce實作中操作的全部流程。當使用者調用mapreduce函數時,将發生下面的一系列動作(下面的序号和圖1中的序号一一對應):
使用者程式首先調用的mapreduce庫将輸入檔案分成m個資料片度,每個資料片段的大小一般從 16mb到64mb(可以通過可選的參數來控制每個資料片段的大小)。然後使用者程式在機群中建立大量的程式副本。 (alex:copies of the program還真難翻譯)
這些程式副本中的有一個特殊的程式–master。副本中其它的程式都是worker程式,由master配置設定任務。有m個map任務和r個reduce任務将被配置設定,master将一個map任務或reduce任務配置設定給一個空閑的worker。
被配置設定了map任務的worker程式讀取相關的輸入資料片段,從輸入的資料片段中解析出key/value pair,然後把key/value pair傳遞給使用者自定義的map函數,由map函數生成并輸出的中間key/value pair,并緩存在記憶體中。
緩存中的key/value pair通過分區函數分成r個區域,之後周期性的寫入到本地磁盤上。緩存的key/value pair在本地磁盤上的存儲位置将被回傳給master,由master負責把這些存儲位置再傳送給reduce worker。
當reduce worker程式接收到master程式發來的資料存儲位置資訊後,使用rpc從map worker所在主機的磁盤上讀取這些緩存資料。當reduce worker讀取了所有的中間資料後,通過對key進行排序後使得具有相同key值的資料聚合在一起。由于許多不同的key值會映射到相同的reduce任務上,是以必須進行排序。如果中間資料太大無法在記憶體中完成排序,那麼就要在外部進行排序。
reduce worker程式周遊排序後的中間資料,對于每一個唯一的中間key值,reduce worker程式将這個key值和它相關的中間value值的集合傳遞給使用者自定義的reduce函數。reduce函數的輸出被追加到所屬分區的輸出檔案。
當所有的map和reduce任務都完成之後,master喚醒使用者程式。在這個時候,在使用者程式裡的對mapreduce調用才傳回。
在成功完成任務之後,mapreduce的輸出存放在r個輸出檔案中(對應每個reduce任務産生一個輸出檔案,檔案名由使用者指定)。一般情況下,使用者不需要将這r個輸出檔案合并成一個檔案–他們經常把這些檔案作為另外一個mapreduce的輸入,或者在另外一個可以處理多個分割檔案的分布式應用中使用。
master持有一些資料結構,它存儲每一個map和reduce任務的狀态(空閑、工作中或完成),以及worker機器(非空閑任務的機器)的辨別。
master就像一個資料管道,中間檔案存儲區域的位置資訊通過這個管道從map傳遞到reduce。是以,對于每個已經完成的map任務,master存儲了map任務産生的r個中間檔案存儲區域的大小和位置。當map任務完成時,master接收到位置和大小的更新資訊,這些資訊被逐漸遞增的推送給那些正在工作的reduce任務。
因為mapreduce庫的設計初衷是使用由成百上千的機器組成的叢集來處理超大規模的資料,是以,這個庫必須要能很好的處理機器故障。
master周期性的ping每個worker。如果在一個約定的時間範圍内沒有收到worker傳回的資訊,master将把這個worker标記為失效。所有由這個失效的worker完成的map任務被重設為初始的空閑狀态,之後這些任務就可以被安排給其他的worker。同樣的,worker失效時正在運作的map或reduce任務也将被重新置為空閑狀态,等待重新排程。
當worker故障時,由于已經完成的map任務的輸出存儲在這台機器上,map任務的輸出已不可通路了,是以必須重新執行。而已經完成的reduce任務的輸出存儲在全局檔案系統上,是以不需要再次執行。
當一個map任務首先被worker a執行,之後由于worker a失效了又被排程到worker b執行,這個“重新執行”的動作會被通知給所有執行reduce任務的worker。任何還沒有從worker a讀取資料的reduce任務将從worker b讀取資料。
mapreduce可以處理大規模worker失效的情況。比如,在一個mapreduce操作執行期間,在正在運作的叢集上進行網絡維護引起80台機器在幾分鐘内不可通路了,mapreduce master隻需要簡單的再次執行那些不可通路的worker完成的工作,之後繼續執行未完成的任務,直到最終完成這個mapreduce操作。
一個簡單的解決辦法是讓master周期性的将上面描述的資料結構(alex注:指3.2節)的寫入磁盤,即檢查點(checkpoint)。如果這個master任務失效了,可以從最後一個檢查點(checkpoint)開始啟動另一個master程序。然而,由于隻有一個master程序,master失效後再恢複是比較麻煩的,是以我們現在的實作是如果master失效,就中止mapreduce運算。客戶可以檢查到這個狀态,并且可以根據需要重新執行mapreduce操作。
(alex注:原文為”semantics in the presence of failures”)
當使用者提供的map和reduce操作是輸入确定性函數(即相同的輸入産生相同的輸出)時,我們的分布式實作在任何情況下的輸出都和所有程式沒有出現任何錯誤、順序的執行産生的輸出是一樣的。
我們依賴對map和reduce任務的輸出是原子送出的來完成這個特性。每個工作中的任務把它的輸出寫到私有的臨時檔案中。每個reduce任務生成一個這樣的檔案,而每個map任務則生成r個這樣的檔案(一個reduce任務對應一個檔案)。當一個map任務完成的時,worker發送一個包含r個臨時檔案名的完成消息給master。如果master從一個已經完成的map任務再次接收到到一個完成消息,master将忽略這個消息;否則,master将這r個檔案的名字記錄在資料結構裡。
當reduce任務完成時,reduce worker程序以原子的方式把臨時檔案重命名為最終的輸出檔案。如果同一個reduce任務在多台機器上執行,針對同一個最終的輸出檔案将有多個重命名操作執行。我們依賴底層檔案系統提供的重命名操作的原子性來保證最終的檔案系統狀态僅僅包含一個reduce任務産生的資料。
使用mapreduce模型的程式員可以很容易的了解他們程式的行為,因為我們絕大多數的map和reduce操作是确定性的,而且存在這樣的一個事實:我們的失效處理機制等價于一個順序的執行的操作。當map或/和reduce操作是不确定性的時候,我們提供雖然較弱但是依然合理的處理機制。當使用非确定操作的時候,一個reduce任務r1的輸出等價于一個非确定性程式順序執行産生時的輸出。但是,另一個reduce任務r2的輸出也許符合一個不同的非确定順序程式執行産生的r2的輸出。
考慮map任務m和reduce任務r1、r2的情況。我們設定e(ri)是ri已經送出的執行過程(有且僅有一個這樣的執行過程)。當e(r1)讀取了由m一次執行産生的輸出,而e(r2)讀取了由m的另一次執行産生的輸出,導緻了較弱的失效處理。
在我們的計算運作環境中,網絡帶寬是一個相當匮乏的資源。我們通過盡量把輸入資料(由gfs管理)存儲在叢集中機器的本地磁盤上來節省網絡帶寬。gfs把每個檔案按64mb一個block分隔,每個block儲存在多台機器上,環境中就存放了多份拷貝(一般是3個拷貝)。mapreduce的master在排程map任務時會考慮輸入檔案的位置資訊,盡量将一個map任務排程在包含相關輸入資料拷貝的機器上執行;如果上述努力失敗了,master将嘗試在儲存有輸入資料拷貝的機器附近的機器上執行map任務(例如,配置設定到一個和包含輸入資料的機器在一個switch裡的worker機器上執行)。當在一個足夠大的cluster叢集上運作大型mapreduce操作的時候,大部分的輸入資料都能從本地機器讀取,是以消耗非常少的網絡帶寬。
如前所述,我們把map拆分成了m個片段、把reduce拆分成r個片段執行。理想情況下,m和r應當比叢集中worker的機器數量要多得多。在每台worker機器都執行大量的不同任務能夠提高叢集的動态的負載均衡能力,并且能夠加快故障恢複的速度:失效機器上執行的大量map任務都可以分布到所有其他的worker機器上去執行。
但是實際上,在我們的具體實作中對m和r的取值都有一定的客觀限制,因為master必須執行o(m+r)次排程,并且在記憶體中儲存o(m*r)個狀态(對影響記憶體使用的因素還是比較小的:o(m*r)塊狀态,大概每對map任務/reduce任務1個位元組就可以了)。
更進一步,r值通常是由使用者指定的,因為每個reduce任務最終都會生成一個獨立的輸出檔案。實際使用時我們也傾向于選擇合适的m值,以使得每一個獨立任務都是處理大約16m到64m的輸入資料(這樣,上面描寫的輸入資料本地存儲優化政策才最有效),另外,我們把r值設定為我們想使用的worker機器數量的小的倍數。我們通常會用這樣的比例來執行mapreduce:m=200000,r=5000,使用2000台worker機器。
影響一個mapreduce的總執行時間最通常的因素是“落伍者”:在運算過程中,如果有一台機器花了很長的時間才完成最後幾個map或reduce任務,導緻mapreduce操作總的執行時間超過預期。出現“落伍者”的原因非常多。比如:如果一個機器的硬碟出了問題,在讀取的時候要經常的進行讀取糾錯操作,導緻讀取資料的速度從30m/s降低到1m/s。如果cluster的排程系統在這台機器上又排程了其他的任務,由于cpu、記憶體、本地硬碟和網絡帶寬等競争因素的存在,導緻執行mapreduce代碼的執行效率更加緩慢。我們最近遇到的一個問題是由于機器的初始化代碼有bug,導緻關閉了的處理器的緩存:在這些機器上執行任務的性能和正常情況相差上百倍。
我們有一個通用的機制來減少“落伍者”出現的情況。當一個mapreduce操作接近完成的時候,master排程備用(backup)任務程序來執行剩下的、處于進行中狀态(in-progress)的任務。無論是最初的執行程序、還是備用(backup)任務程序完成了任務,我們都把這個任務标記成為已經完成。我們調優了這個機制,通常隻會占用比正常操作多幾個百分點的計算資源。我們發現采用這樣的機制對于減少超大mapreduce操作的總處理時間效果顯著。例如,在5.3節描述的排序任務,在關閉掉備用任務的情況下要多花44%的時間完成排序任務。
雖然簡單的map和reduce函數提供的基本功能已經能夠滿足大部分的計算需要,我們還是發掘出了一些有價值的擴充功能。本節将描述這些擴充功能。
mapreduce的使用者通常會指定reduce任務和reduce任務輸出檔案的數量(r)。我們在中間key上使用分區函數來對資料進行分區,之後再輸入到後續任務執行程序。一個預設的分區函數是使用hash方法(比如,hash(key) mod r)進行分區。hash方法能産生非常平衡的分區。然而,有的時候,其它的一些分區函數對key值進行的分區将非常有用。比如,輸出的key值是urls,我們希望每個主機的所有條目保持在同一個輸出檔案中。為了支援類似的情況,mapreduce庫的使用者需要提供專門的分區函數。例如,使用“hash(hostname(urlkey)) mod r”作為分區函數就可以把所有來自同一個主機的urls儲存在同一個輸出檔案中。
我們確定在給定的分區中,中間key/value pair資料的處理順序是按照key值增量順序處理的。這樣的順序保證對每個分成生成一個有序的輸出檔案,這對于需要對輸出檔案按key值随機存取的應用非常有意義,對在排序輸出的資料集也很有幫助。
在某些情況下,map函數産生的中間key值的重複資料會占很大的比重,并且,使用者自定義的reduce函數滿足結合律和交換律。在2.1節的詞數統計程式是個很好的例子。由于詞頻率傾向于一個zipf分布(齊夫分布),每個map任務将産生成千上萬個這樣的記錄the,1.所有的這些記錄将通過網絡被發送到一個單獨的reduce任務,然後由這個reduce任務把所有這些記錄累加起來産生一個數字。我們允許使用者指定一個可選的combiner函數,combiner函數首先在本地将這些記錄進行一次合并,然後将合并的結果再通過網絡發送出去。
combiner函數在每台執行map任務的機器上都會被執行一次。一般情況下,combiner和reduce函數是一樣的。combiner函數和reduce函數之間唯一的差別是mapreduce庫怎樣控制函數的輸出。reduce函數的輸出被儲存在最終的輸出檔案裡,而combiner函數的輸出被寫到中間檔案裡,然後被發送給reduce任務。
部分的合并中間結果可以顯著的提高一些mapreduce操作的速度。附錄a包含一個使用combiner函數的例子。
mapreduce庫支援幾種不同的格式的輸入資料。比如,文本模式的輸入資料的每一行被視為是一個key/value pair。key是檔案的偏移量,value是那一行的内容。另外一種常見的格式是以key進行排序來存儲的key/value pair的序列。每種輸入類型的實作都必須能夠把輸入資料分割成資料片段,該資料片段能夠由單獨的map任務來進行後續處理(例如,文本模式的範圍分割必須確定僅僅在每行的邊界進行範圍分割)。雖然大多數mapreduce的使用者僅僅使用很少的預定義輸入類型就滿足要求了,但是使用者依然可以通過提供一個簡單的reader接口實作就能夠支援一個新的輸入類型。
reader并非一定要從檔案中讀取資料,比如,我們可以很容易的實作一個從資料庫裡讀記錄的reader,或者從記憶體中的資料結構讀取資料的reader。
類似的,我們提供了一些預定義的輸出資料的類型,通過這些預定義類型能夠産生不同格式的資料。使用者采用類似添加新的輸入資料類型的方式增加新的輸出類型。
在某些情況下,mapreduce的使用者發現,如果在map和/或reduce操作過程中增加輔助的輸出檔案會比較省事。我們依靠程式writer把這種“副作用”變成原子的和幂等的(alex注:幂等的指一個總是産生相同結果的數學運算)。通常應用程式首先把輸出結果寫到一個臨時檔案中,在輸出全部資料之後,在使用系統級的原子操作rename重新命名這個臨時檔案。
如果一個任務産生了多個輸出檔案,我們沒有提供類似兩階段送出的原子操作支援這種情況。是以,對于會産生多個輸出檔案、并且對于跨檔案有一緻性要求的任務,都必須是确定性的任務。但是在實際應用過程中,這個限制還沒有給我們帶來過麻煩。
有時候,使用者程式中的bug導緻map或者reduce函數在處理某些記錄的時候crash掉,mapreduce操作無法順利完成。慣常的做法是修複bug後再次執行mapreduce操作,但是,有時候找出這些bug并修複它們不是一件容易的事情;這些bug也許是在第三方庫裡邊,而我們手頭沒有這些庫的源代碼。而且在很多時候,忽略一些有問題的記錄也是可以接受的,比如在一個巨大的資料集上進行統計分析的時候。我們提供了一種執行模式,在這種模式下,為了保證保證整個處理能繼續進行,mapreduce會檢測哪些記錄導緻确定性的crash,并且跳過這些記錄不處理。
每個worker程序都設定了信号處理函數捕獲記憶體段異常(segmentation violation)和總線錯誤(bus error)。在執行map或者reduce操作之前,mapreduce庫通過全局變量儲存記錄序号。如果使用者程式觸發了一個系統信号,消息處理函數将用“最後一口氣”通過udp包向master發送處理的最後一條記錄的序号。當master看到在處理某條特定記錄不止失敗一次時,master就标志着條記錄需要被跳過,并且在下次重新執行相關的map或者reduce任務的時候跳過這條記錄。
調試map和reduce函數的bug是非常困難的,因為實際執行操作時不但是分布在系統中執行的,而且通常是在好幾千台計算機上執行,具體的執行位置是由master進行動态排程的,這又大大增加了調試的難度。為了簡化調試、profile和小規模測試,我們開發了一套mapreduce庫的本地實作版本,通過使用本地版本的mapreduce庫,mapreduce操作在本地計算機上順序的執行。使用者可以控制mapreduce操作的執行,可以把操作限制到特定的map任務上。使用者通過設定特别的标志來在本地執行他們的程式,之後就可以很容易的使用本地調試和測試工具(比如gdb)。
master使用嵌入式的http伺服器(如jetty)顯示一組狀态資訊頁面,使用者可以監控各種執行狀态。狀态資訊頁面顯示了包括計算執行的進度,比如已經完成了多少任務、有多少任務正在處理、輸入的位元組數、中間資料的位元組數、輸出的位元組數、處理百分比等等。頁面還包含了指向每個任務的stderr和stdout檔案的連結。使用者根據這些資料預測計算需要執行大約多長時間、是否需要增加額外的計算資源。這些頁面也可以用來分析什麼時候計算執行的比預期的要慢。
另外,處于最頂層的狀态頁面顯示了哪些worker失效了,以及他們失效的時候正在運作的map和reduce任務。這些資訊對于調試使用者代碼中的bug很有幫助。
mapreduce庫使用計數器統計不同僚件發生次數。比如,使用者可能想統計已經處理了多少個單詞、已經索引的多少篇german文檔等等。
為了使用這個特性,使用者在程式中建立一個命名的計數器對象,在map和reduce函數中相應的增加計數器的值。例如:
這些計數器的值周期性的從各個單獨的worker機器上傳遞給master(附加在ping的應答包中傳遞)。master把執行成功的map和reduce任務的計數器值進行累計,當mapreduce操作完成之後,傳回給使用者代碼。
計數器目前的值也會顯示在master的狀态頁面上,這樣使用者就可以看到目前計算的進度。當累加計數器的值的時候,master要檢查重複運作的map或者reduce任務,避免重複累加(之前提到的備用任務和失效後重新執行任務這兩種情況會導緻相同的任務被多次執行)。
有些計數器的值是由mapreduce庫自動維持的,比如已經處理的輸入的key/value pair的數量、輸出的key/value pair的數量等等。
計數器機制對于mapreduce操作的完整性檢查非常有用。比如,在某些mapreduce操作中,使用者需要確定輸出的key value pair精确的等于輸入的key value pair,或者處理的german文檔數量在處理的整個文檔數量中屬于合理範圍。
本節我們用在一個大型叢集上運作的兩個計算來衡量mapreduce的性能。一個計算在大約1tb的資料中進行特定的模式比對,另一個計算對大約1tb的資料進行排序。
這兩個程式在大量的使用mapreduce的實際應用中是非常典型的 — 一類是對資料格式進行轉換,從一種表現形式轉換為另外一種表現形式;另一類是從海量資料中抽取少部分的使用者感興趣的資料。
所有這些程式都運作在一個大約由1800台機器構成的叢集上。每台機器配置2個2g主頻、支援超線程的intel xeon處理器,4gb的實體記憶體,兩個160gb的ide硬碟和一個千兆以太網卡。這些機器部署在一個兩層的樹形交換網絡中,在root節點大概有100-200gbps的傳輸帶寬。所有這些機器都采用相同的部署(對等部署),是以任意兩點之間的網絡來回時間小于1毫秒。
在4gb記憶體裡,大概有1-1.5g用于運作在叢集上的其他任務。測試程式在周末下午開始執行,這時主機的cpu、磁盤和網絡基本上處于空閑狀态。
這個分布式的grep程式需要掃描大概10的10次方個由100個位元組組成的記錄,查找出現機率較小的3個字元的模式(這個模式在92337個記錄中出現)。輸入資料被拆分成大約64m的block(m=15000),整個輸出資料存放在一個檔案中(r=1)。
圖2顯示了這個運算随時間的處理過程。其中y軸表示輸入資料的處理速度。處理速度随着參與mapreduce計算的機器數量的增加而增加,當1764台worker參與計算的時,處理速度達到了30gb/s。當map任務結束的時候,即在計算開始後80秒,輸入的處理速度降到0。整個計算過程從開始到結束一共花了大概150秒。這包括了大約一分鐘的初始啟動階段。初始啟動階段消耗的時間包括了是把這個程式傳送到各個worker機器上的時間、等待gfs檔案系統打開1000個輸入檔案集合的時間、擷取相關的檔案本地位置優化資訊的時間。
排序程式處理10的10次方個100個位元組組成的記錄(大概1tb的資料)。這個程式模仿terasort benchmark[10]。
排序程式由不到50行代碼組成。隻有三行的map函數從文本行中解析出10個位元組的key值作為排序的key,并且把這個key和原始文本行作為中間的key/value pair值輸出。我們使用了一個内置的恒等函數作為reduce操作函數。這個函數把中間的key/value pair值不作任何改變輸出。最終排序結果輸出到兩路複制的gfs檔案系統(也就是說,程式輸出2tb的資料)。
如前所述,輸入資料被分成64mb的block(m=15000)。我們把排序後的輸出結果分區後存儲到4000個檔案(r=4000)。分區函數使用key的原始位元組來把資料分區到r個片段中。
在這個benchmark測試中,我們使用的分區函數知道key的分區情況。通常對于排序程式來說,我們會增加一個預處理的mapreduce操作用于采樣key值的分布情況,通過采樣的資料來計算對最終排序處理的分區點。
圖三(a)顯示了這個排序程式的正常執行過程。左上的圖顯示了輸入資料讀取的速度。資料讀取速度峰值會達到13gb/s,并且所有map任務完成之後,即大約200秒之後迅速滑落到0。值得注意的是,排序程式輸入資料讀取速度小于分布式grep程式。這是因為排序程式的map任務花了大約一半的處理時間和i/o帶寬把中間輸出結果寫到本地硬碟。相應的分布式grep程式的中間結果輸出幾乎可以忽略不計。
左邊中間的圖顯示了中間資料從map任務發送到reduce任務的網絡速度。這個過程從第一個map任務完成之後就開始緩慢啟動了。圖示的第一個高峰是啟動了第一批大概1700個reduce任務(整個mapreduce分布到大概1700台機器上,每台機器1次最多執行1個reduce任務)。排序程式運作大約300秒後,第一批啟動的reduce任務有些完成了,我們開始執行剩下的reduce任務。所有的處理在大約600秒後結束。
左下圖表示reduce任務把排序後的資料寫到最終的輸出檔案的速度。在第一個排序階段結束和資料開始寫入磁盤之間有一個小的延時,這是因為worker機器正在忙于排序中間資料。磁盤寫入速度在2-4gb/s持續一段時間。輸出資料寫入磁盤大約持續850秒。計入初始啟動部分的時間,整個運算消耗了891秒。這個速度和terasort benchmark[18]的最高紀錄1057秒相差不多。
還有一些值得注意的現象:輸入資料的讀取速度比排序速度和輸出資料寫入磁盤速度要高不少,這是因為我們的輸入資料本地化優化政策起了作用 — 絕大部分資料都是從本地硬碟讀取的,進而節省了網絡帶寬。排序速度比輸出資料寫入到磁盤的速度快,這是因為輸出資料寫了兩份(我們使用了2路的gfs檔案系統,寫入複制節點的原因是為了保證資料可靠性和可用性)。我們把輸出資料寫入到兩個複制節點的原因是因為這是底層檔案系統的保證資料可靠性和可用性的實作機制。如果底層檔案系統使用類似容錯編碼[14](erasure coding)的方式而不是複制的方式保證資料的可靠性和可用性,那麼在輸出資料寫入磁盤的時候,就可以降低網絡帶寬的使用。
圖三(b)顯示了關閉了備用任務後排序程式執行情況。執行的過程和圖3(a)很相似,除了輸出資料寫磁盤的動作在時間上拖了一個很長的尾巴,而且在這段時間裡,幾乎沒有什麼寫入動作。在960秒後,隻有5個reduce任務沒有完成。這些拖後腿的任務又執行了300秒才完成。整個計算消耗了1283秒,多了44%的執行時間。
在圖三(c)中示範的排序程式執行的過程中,我們在程式開始後幾分鐘有意的kill了1746個worker中的200個。叢集底層的排程立刻在這些機器上重新開始新的worker處理程序(因為隻是worker機器上的處理程序被kill了,機器本身還在工作)。
圖三(c)顯示出了一個“負”的輸入資料讀取速度,這是因為一些已經完成的map任務丢失了(由于相應的執行map任務的worker程序被kill了),需要重新執行這些任務。相關map任務很快就被重新執行了。整個運算在933秒内完成,包括了初始啟動時間(隻比正常執行多消耗了5%的時間)。
我們在2003年1月完成了第一個版本的mapreduce庫,在2003年8月的版本有了顯著的增強,這包括了輸入資料本地優化、worker機器之間的動态負載均衡等等。從那以後,我們驚喜的發現,mapreduce庫能廣泛應用于我們日常工作中遇到的各類問題。它現在在google内部各個領域得到廣泛應用,包括:
大規模機器學習問題
google news和froogle産品的叢集問題
從公衆查詢産品(比如google的zeitgeist)的報告中抽取資料。
從大量的新應用和新産品的網頁中提取有用資訊(比如,從大量的位置搜尋網頁中抽取地理位置資訊)。
大規模的圖形計算。
圖四顯示了在我們的源代碼管理系統中,随着時間推移,獨立的mapreduce程式數量的顯著增加。從2003年早些時候的0個增長到2004年9月份的差不多900個不同的程式。mapreduce的成功取決于采用mapreduce庫能夠在不到半個小時時間内寫出一個簡單的程式,這個簡單的程式能夠在上千台機器的組成的叢集上做大規模并發處理,這極大的加快了開發和原形設計的周期。另外,采用mapreduce庫,可以讓完全沒有分布式和/或并行系統開發經驗的程式員很容易的利用大量的資源,開發出分布式和/或并行處理的應用。
在每個任務結束的時候,mapreduce庫統計計算資源的使用狀況。在表1,我們列出了2004年8月份mapreduce運作的任務所占用的相關資源。
到目前為止,mapreduce最成功的應用就是重寫了google網絡搜尋服務所使用到的index系統。索引系統的輸入資料是網絡爬蟲抓取回來的海量的文檔,這些文檔資料都儲存在gfs檔案系統裡。這些文檔原始内容(alex注:raw contents,我認為就是網頁中的剔除html标記後的内容、pdf和word等有格式文檔中提取的文本内容等)的大小超過了20tb。索引程式是通過一系列的mapreduce操作(大約5到10次)來建立索引。使用mapreduce(替換上一個特别設計的、分布式處理的索引程式)帶來這些好處:
實作索引部分的代碼簡單、小巧、容易了解,因為對于容錯、分布式以及并行計算的處理都是mapreduce庫提供的。比如,使用mapreduce庫,計算的代碼行數從原來的3800行c++代碼減少到大概700行代碼。
mapreduce庫的性能已經足夠好了,是以我們可以把在概念上不相關的計算步驟分開處理,而不是混在一起以期減少資料傳遞的額外消耗。概念上不相關的計算步驟的隔離也使得我們可以很容易改變索引處理方式。比如,對之前的索引系統的一個小更改可能要耗費好幾個月的時間,但是在使用mapreduce的新系統上,這樣的更改隻需要花幾天時間就可以了。
索引系統的操作管理更容易了。因為由機器失效、機器處理速度緩慢、以及網絡的瞬間阻塞等引起的絕大部分問題都已經由mapreduce庫解決了,不再需要操作人員的介入了。另外,我們可以通過在索引系統叢集中增加機器的簡單方法提高整體處理性能。
很多系統都提供了嚴格的程式設計模式,并且通過對程式設計的嚴格限制來實作并行計算。例如,一個結合函數可以通過把n個元素的數組的字首在n個處理器上使用并行字首算法,在log n的時間内計算完[6,9,13](alex注:完全沒有明白作者在說啥,具體參考相關6、9、13文檔)。mapreduce可以看作是我們結合在真實環境下處理海量資料的經驗,對這些經典模型進行簡化和萃取的成果。更加值得驕傲的是,我們還實作了基于上千台處理器的叢集的容錯處理。相比而言,大部分并發處理系統都隻在小規模的叢集上實作,并且把容錯處理交給了程式員。
bulk synchronous programming[17]和一些mpi原語[11]提供了更進階别的并行處理抽象,可以更容易寫出并行處理的程式。mapreduce和這些系統的關鍵不同之處在于,mapreduce利用限制性程式設計模式實作了使用者程式的自動并發處理,并且提供了透明的容錯處理。
我們資料本地優化政策的靈感來源于active disks[12,15]等技術,在active disks中,計算任務是盡量推送到資料存儲的節點處理(alex注:即靠近資料源處理),這樣就減少了網絡和io子系統的吞吐量。我們在挂載幾個硬碟的普通機器上執行我們的運算,而不是在磁盤處理器上執行我們的工作,但是達到的目的一樣的。
我們的備用任務機制和charlotte system[3]提出的eager排程機制比較類似。eager排程機制的一個缺點是如果一個任務反複失效,那麼整個計算就不能完成。我們通過忽略引起故障的記錄的方式在某種程度上解決了這個問題。
mapreduce的實作依賴于一個内部的叢集管理系統,這個叢集管理系統負責在一個超大的、共享機器的叢集上分布和運作使用者任務。雖然這個不是本論文的重點,但是有必要提一下,這個叢集管理系統在理念上和其它系統,如condor[16]是一樣。
mapreduce庫的排序機制和now-sort[1]的操作上很類似。讀取輸入源的機器(map workers)把待排序的資料進行分區後,發送到r個reduce worker中的一個進行處理。每個reduce worker在本地對資料進行排序(盡可能在記憶體中排序)。當然,now-sort沒有給使用者自定義的map和reduce函數的機會,是以不具備mapreduce庫廣泛的實用性。
river[2]提供了一個程式設計模型:處理程序通過分布式隊列傳送資料的方式進行互相通訊。和mapreduce類似,river系統嘗試在不對等的硬體環境下,或者在系統颠簸的情況下也能提供近似平均的性能。river是通過精心排程硬碟和網絡的通訊來平衡任務的完成時間。mapreduce庫采用了其它的方法。通過對程式設計模型進行限制,mapreduce架構把問題分解成為大量的“小”任務。這些任務在可用的worker叢集上動态的排程,這樣快速的worker就可以執行更多的任務。通過對程式設計模型進行限制,我們可用在工作接近完成的時候排程備用任務,縮短在硬體配置不均衡的情況下縮小整個操作完成的時間(比如有的機器性能差、或者機器被某些操作阻塞了)。
bad-fs[5]采用了和mapreduce完全不同的程式設計模式,它是面向廣域網(alex注:wide-area network)的。不過,這兩個系統有兩個基礎功能很類似。(1)兩個系統采用重新執行的方式來防止由于失效導緻的資料丢失。(2)兩個都使用資料本地化排程政策,減少網絡通訊的資料量。
tacc[7]是一個用于簡化構造高可用性網絡服務的系統。和mapreduce一樣,它也依靠重新執行機制來實作的容錯處理。
mapreduce程式設計模型在google内部成功應用于多個領域。我們把這種成功歸結為幾個方面:首先,由于mapreduce封裝了并行處理、容錯處理、資料本地化優化、負載均衡等等技術難點的細節,這使得mapreduce庫易于使用。即便對于完全沒有并行或者分布式系統開發經驗的程式員而言;其次,大量不同類型的問題都可以通過mapreduce簡單的解決。比如,mapreduce用于生成google的網絡搜尋服務所需要的資料、用來排序、用來資料挖掘、用于機器學習,以及很多其它的系統;第三,我們實作了一個在數千台計算機組成的大型叢集上靈活部署運作的mapreduce。這個實作使得有效利用這些豐富的計算資源變得非常簡單,是以也适合用來解決google遇到的其他很多需要大量計算的問題。
我們也從mapreduce開發過程中學到了不少東西。首先,限制程式設計模式使得并行和分布式計算非常容易,也易于構造容錯的計算環境;其次,網絡帶寬是稀有資源。大量的系統優化是針對減少網絡傳輸量為目的的:本地優化政策使大量的資料從本地磁盤讀取,中間檔案寫入本地磁盤、并且隻寫一份中間檔案也節約了網絡帶寬;第三,多次執行相同的任務可以減少性能緩慢的機器帶來的負面影響(alex注:即硬體配置的不平衡),同時解決了由于機器失效導緻的資料丢失問題。
(alex注:還是原汁原味的感謝詞比較好,這個就不翻譯了)josh levenberg has been instrumental in revising and extending the user-level mapreduce api with a number of new features based on his experience with using mapreduce and other people’s suggestions for enhancements. mapreduce reads its input from and writes its output to the google file system [8]. we would like to thank mohit aron, howard gobioff, markus gutschke, david kramer, shun-tak leung, and josh redstone for their work in developing gfs. we would also like to thank percy liang and olcan sercinoglu for their work in developing the cluster management system used by mapreduce. mike burrows, wilson hsieh, josh levenberg, sharon perl, rob pike, and debby wallach provided helpful comments on earlier drafts of this paper.the anonymous osdi reviewers, and our shepherd, eric brewer, provided many useful suggestions of areas where the paper could be improved. finally, we thank all the users of mapreduce within google’s engineering organization for providing helpful feedback, suggestions, and bug reports.
10、參考資料
[1] andrea c. arpaci-dusseau, remzi h. arpaci-dusseau,david e. culler, joseph m. hellerstein, and david a. patterson.high-performance sorting on networks of workstations.in proceedings of the 1997 acm sigmod internationalconference on management of data, tucson,arizona, may 1997.
[2] remzi h. arpaci-dusseau, eric anderson, noahtreuhaft, david e. culler, joseph m. hellerstein, david patterson, and kathy yelick. cluster i/o with river:making the fast case common. in proceedings of the sixth workshop on input/output in parallel and distributed systems (iopads ’99), pages 10.22, atlanta, georgia, may 1999.
[3] arash baratloo, mehmet karaul, zvi kedem, and peter wyckoff. charlotte: metacomputing on the web. in proceedings of the 9th international conference on parallel and distributed computing systems, 1996. [4] luiz a. barroso, jeffrey dean, and urs h¨olzle. web search for a planet: the google cluster architecture. ieee micro, 23(2):22.28, april 2003.
[5] john bent, douglas thain, andrea c.arpaci-dusseau, remzi h. arpaci-dusseau, and miron livny. explicit control in a batch-aware distributed file system. in proceedings of the 1st usenix symposium on networked systems design and implementation nsdi, march 2004.
[6] guy e. blelloch. scans as primitive parallel operations.ieee transactions on computers, c-38(11), november 1989.
[7] armando fox, steven d. gribble, yatin chawathe, eric a. brewer, and paul gauthier. cluster-based scalable network services. in proceedings of the 16th acm symposium on operating system principles, pages 78. 91, saint-malo, france, 1997.
[8] sanjay ghemawat, howard gobioff, and shun-tak leung. the google file system. in 19th symposium on operating systems principles, pages 29.43, lake george, new york, 2003. to appear in osdi 2004 12
[9] s. gorlatch. systematic efficient parallelization of scan and other list homomorphisms. in l. bouge, p. fraigniaud, a. mignotte, and y. robert, editors, euro-par’96. parallel processing, lecture notes in computer science 1124, pages 401.408. springer-verlag, 1996.
[11] william gropp, ewing lusk, and anthony skjellum. using mpi: portable parallel programming with the message-passing interface. mit press, cambridge, ma, 1999.
[12] l. huston, r. sukthankar, r.wickremesinghe, m. satyanarayanan, g. r. ganger, e. riedel, and a. ailamaki. diamond: a storage architecture for early discard in interactive search. in proceedings of the 2004 usenix file and storage technologies fast conference, april 2004.
[13] richard e. ladner and michael j. fischer. parallel prefix computation. journal of the acm, 27(4):831.838, 1980.
[14] michael o. rabin. efficient dispersal of information for security, load balancing and fault tolerance. journal of the acm, 36(2):335.348, 1989.
[15] erik riedel, christos faloutsos, garth a. gibson, and david nagle. active disks for large-scale data processing. ieee computer, pages 68.74, june 2001.
[16] douglas thain, todd tannenbaum, and miron livny. distributed computing in practice: the condor experience. concurrency and computation: practice and experience, 2004.
[17] l. g. valiant. a bridging model for parallel computation. communications of the acm, 33(8):103.111, 1997.
本節包含了一個完整的程式,用于統計在一組指令行指定的輸入檔案中,每一個不同的單詞出現頻率。