你想數出一摞牌中有多少張黑桃。直覺方式是一張一張檢查并且數出有多少張是黑桃
mapreduce方法則是
給在座的所有玩家中配置設定這摞牌
讓每個玩家數自己手中的牌有幾張是黑桃然後把這個數目彙報給你
你把所有玩家告訴你的數字加起來得到最後的結論
mapreduce合并了兩種經典函數
映射mapping對集合裡的每個目标應用同一個操作。即如果你想把表單裡每個單元格乘以二那麼把這個函數單獨地應用在每個單元格上的操作就屬于mapping。
化簡reducing 周遊集合中的元素來傳回一個綜合的結果。即輸出表單裡一列數字的和這個任務屬于reducing。
重新審視我們原來那個分散紙牌的例子我們有mapreduce資料分析的基本方法。友情提示這不是個嚴謹的例子。在這個例子裡人代表計算機因為他們同時工作是以他們是個叢集。在大多數實際應用中我們假設資料已經在每台計算機上了 – 也就是說把牌分發出去并不是mapreduce的一步。事實上在計算機叢集中如何存儲檔案是hadoop的真正核心。
通過把牌分給多個玩家并且讓他們各自數數你就在并行執行運算因為每個玩家都在同時計數。這同時把這項工作變成了分布式的因為多個不同的人在解決同一個問題的過程中并不需要知道他們的鄰居在幹什麼。
通過告訴每個人去數數你對一項檢查每張牌的任務進行了映射。 你不會讓他們把黑桃牌遞給你而是讓他們把你想要的東西化簡為一個數字。
另外一個有意思的情況是牌配置設定得有多均勻。mapreduce假設資料是洗過的shuffled- 如果所有黑桃都分到了一個人手上那他數牌的過程可能比其他人要慢很多。
如果有足夠的人的話問一些更有趣的問題就相當簡單了 – 比如“一摞牌的平均值二十一點算法是什麼”。你可以通過合并“所有牌的值的和是什麼”及“我們有多少張牌”這兩個問題來得到答案。用這個和除以牌的張數就得到了平均值。
mapreduce算法的機制要遠比這複雜得多但是主體思想是一緻的 – 通過分散計算來分析大量資料。無論是facebook、nasa還是小創業公司mapreduce都是目前分析網際網路級别資料的主流方法。
大規模資料處理時mapreduce在三個層面上的基本構思
如何對付大資料處理分而治之
對互相間不具有計算依賴關系的大資料實作并行最自然的辦法就是采取分而治之的政策
上升到抽象模型mapper與reducer
mpi等并行計算方法缺少高層并行程式設計模型為了克服這一缺陷mapreduce借鑒了lisp函數式語言中的思想用map和reduce兩個函數提供了高層的并行程式設計抽象模型
上升到構架統一構架為程式員隐藏系統層細節
mpi等并行計算方法缺少統一的計算架構支援程式員需要考慮資料存儲、劃分、分發、結果收集、錯誤恢複等諸多細節為此mapreduce設計并提供了統一的計算架構為程式員隐藏了絕大多數系統層面的處理細節
什麼樣的計算任務可進行并行化計算
并行計算的第一個重要問題是如何劃分計算任務或者計算資料以便對劃分的子任務或資料塊同時進行計算。但一些計算問題恰恰無法進行這樣的劃分
nine women cannot have a baby in one month!
例如fibonacci函數: fk+2 = fk + fk+1
前後資料項之間存在很強的依賴關系隻能串行計算
結論不可分拆的計算任務或互相間有依賴關系的資料無法進行并行計算
大資料的并行化計算
一個大資料若可以分為具有同樣計算過程的資料塊并且這些資料塊之間不存在資料依賴關系則提高處理速度的最好辦法就是并行計算
例如假設有一個巨大的2維資料需要處理(比如求每個元素的開立方)其中對每個元素的處理是相同的,并且資料元素間不存在資料依賴關系,可以考慮不同的劃分方法将其劃分為子數組,由一組處理器并行處理
借鑒函數式設計語言lisp的設計思想
函數式程式設計(functional programming)語言lisp是一種清單處理 語言(list processing)是一種應用于人工智能處理的符号式語言由mit的人工智能專家、圖靈獎獲得者john mccarthy于1958年設計發明。
lisp定義了可對清單元素進行整體處理的各種操作如
如(add #(1 2 3 4) #(4 3 2 1)) 将産生結果 #(5 5 5 5)
lisp中也提供了類似于map和reduce的操作
如: (map ‘vector #+ #(1 2 3 4 5) #(10 11 12 13 14))
通過定義加法map運算将2個向量相加産生結果#(11 13 15 17 19)
(reduce #’+ #(11 13 15 17 19)) 通過加法歸并産生累加結果75
map: 對一組資料元素進行某種重複式的處理
reduce: 對map的中間結果進行某種進一步的結果整
關鍵思想為大資料處理過程中的兩個主要處理操作提供一種抽象機制
mapreduce中的map和reduce操作的抽象描述
mapreduce借鑒了函數式程式設計語言lisp中的思想定義了如下的map和reduce兩個抽象的程式設計接口由使用者去程式設計實作:
map: (k1; v1) → [(k2; v2)]
輸入鍵值對(k1; v1)表示的資料
處理文檔資料記錄(如文本檔案中的行或資料表格中的行)将以“鍵值對”形式傳入map函數map函數将處理這些鍵值對并以另一種鍵值對形式輸出處理的一組鍵值對中間結果 [(k2; v2)]
輸出鍵值對[(k2; v2)]表示的一組中間資料
reduce: (k2; [v2]) → [(k3; v3)]
輸入 由map輸出的一組鍵值對[(k2; v2)] 将被進行合并處理将同樣主鍵下的不同數值合并到一個清單[v2]中故reduce的輸入為(k2; [v2])
處理對傳入的中間結果清單資料進行某種整理或進一步的處理,并産生最終的某種形式的結果輸出[(k3; v3)] 。
輸出最終輸出結果[(k3; v3)]
map和reduce為程式員提供了一個清晰的操作接口抽象描述
各個map函數對所劃分的資料并行處理從不同的輸入資料産生不同的中間結果輸出
各個reduce也各自并行計算各自負責處理不同的中間結果資料集合進行reduce處理之前,必須等到所有的map函數做完是以,在進入reduce前需要有一個同步障(barrier);這個階段也負責對map的中間結果資料進行收集整理(aggregation & shuffle)處理,以便reduce更有效地計算最終結果最終彙總所有reduce的輸出結果即可獲得最終結果
基于mapreduce的處理過程示例—文檔詞頻統計wordcount
設有4組原始文本資料
text 1: the weather is good text 2: today is good
text 3: good weather is good text 4: today has good weather
傳統的串行處理方式(java)
java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
string[] text = new string[] { “hello world”, “hello every one”, “say hello to everyone in the world” ;
hashtable ht = new hashtable();
for(i = 0; i < 3; ++i) {
stringtokenizer st = new stringtokenizer(text[i]);
while (st.hasmoretokens()) {
string word = st.nexttoken();
if(!ht.containskey(word)) {
ht.put(word, new integer(1));
} else {
int wc = ((integer)ht.get(word)).intvalue() +1;// 計數加1
ht.put(word, new integer(wc));
}
}
}
for (iterator itr=ht.keyset().iterator(); itr.hasnext(); ) {
string word = (string)itr.next();
system.out.print(word+ “: ”+ (integer)ht.get(word)+“; ”);
輸出good: 5; has: 1; is: 3; the: 1; today: 2; weather: 3
mapreduce處理方式
使用4個map節點
map節點1:
輸入(text1, “the weather is good”)
輸出(the, 1), (weather, 1), (is, 1), (good, 1)
map節點2:
輸入(text2, “today is good”)
輸出(today, 1), (is, 1), (good, 1)
map節點3:
輸入(text3, “good weather is good”)
輸出(good, 1), (weather, 1), (is, 1), (good, 1)
map節點4:
輸入(text3, “today has good weather”)
輸出(today, 1), (has, 1), (good, 1), (weather, 1)
使用3個reduce節點
mapreduce僞代碼(實作map和reduce兩個函數)
c
class mapper method map(string input_key, string input_value):
// input_key: text document name
// input_value: document contents
for each word w in input_value:
emitintermediate(w, "1");
class reducer method reduce(string output_key, iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += parseint(v);
emit(output_key result);
如何提供統一的計算架構
mapreduce提供一個統一的計算架構可完成
計算任務的劃分和排程
資料的分布存儲和劃分
處理資料與計算任務的同步
結果資料的收集整理(sorting, combining, partitioning,…)
系統通信、負載平衡、計算性能優化處理
處理系統節點出錯檢測和失效恢複
mapreduce最大的亮點
通過抽象模型和計算架構把需要做什麼(what need to do)與具體怎麼做(how to do)分開了為程式員提供一個抽象和高層的程式設計接口和架構
程式員僅需要關心其應用層的具體計算問題僅需編寫少量的處理應用本身計算問題的程式代碼
如何具體完成這個并行計算任務所相關的諸多系統層細節被隐藏起來,交給計算架構去處理從分布代碼的執行到大到數千小到單個節點叢集的自動排程使用
mapreduce提供的主要功能
任務排程送出的一個計算作業(job)将被劃分為很多個計算任務(tasks), 任務排程功能主要負責為這些劃分後的計算任務配置設定和排程計算節點(map節點或reducer節點); 同時負責監控這些節點的執行狀态, 并負責map節點執行的同步控制(barrier); 也負責進行一些計算性能優化處理, 如對最慢的計算任務采用多備份執行、選最快完成者作為結果
資料/代碼互定位為了減少資料通信一個基本原則是本地化資料處理(locality)即一個計算節點盡可能處理其本地磁盤上所分布存儲的資料這實作了代碼向資料的遷移當無法進行這種本地化資料處理時再尋找其它可用節點并将資料從網絡上傳送給該節點(資料向代碼遷移)但将盡可能從資料所在的本地機架上尋找可用節點以減少通信延遲
出錯處理以低端商用伺服器構成的大規模mapreduce計算叢集中,節點硬體(主機、磁盤、記憶體等)出錯和軟體有bug是常态是以,mapreducer需要能檢測并隔離出錯節點并排程配置設定新的節點接管出錯節點的計算任務
分布式資料存儲與檔案管理海量資料處理需要一個良好的分布資料存儲和檔案管理系統支撐,該檔案系統能夠把海量資料分布存儲在各個節點的本地磁盤上,但保持整個資料在邏輯上成為一個完整的資料檔案為了提供資料存儲容錯機制,該檔案系統還要提供資料塊的多備份存儲管理能力
combiner和partitioner:為了減少資料通信開銷,中間結果資料進入reduce節點前需要進行合并(combine)處理,把具有同樣主鍵的資料合并到一起避免重複傳送; 一個reducer節點所處理的資料可能會來自多個map節點, 是以, map節點輸出的中間結果需使用一定的政策進行适當的劃分(partitioner)處理保證相關資料發送到同一個reducer節點
基于map和reduce的并行計算模型
1、向“外”橫向擴充而非向“上”縱向擴充scale “out”, not “up”
即mapreduce叢集的構築選用價格便宜、易于擴充的大量低端商用伺服器而非價格昂貴、不易擴充的高端伺服器smp低端伺服器市場與高容量desktop pc有重疊的市場是以由于互相間價格的競争、可互換的部件、和規模經濟效應使得低端伺服器保持較低的價格基于tpc-c在2007年底的性能評估結果,一個低端伺服器平台與高端的共享存儲器結構的伺服器平台相比,其成本效益大約要高4倍;如果把外存價格除外,低端伺服器成本效益大約提高12倍對于大規模資料處理由于有大量資料存儲需要顯而易見基于低端伺服器的叢集遠比基于高端伺服器的叢集優越這就是為什麼mapreduce并行計算叢集會基于低端伺服器實作
2、失效被認為是常态assume failures are common
mapreduce叢集中使用大量的低端伺服器(google目前在全球共使用百萬台以上的伺服器節點),是以節點硬體失效和軟體出錯是常态因而一個良好設計、具有容錯性的并行計算系統不能因為節點失效而影響計算服務的品質任何節點失效都不應當導緻結果的不一緻或不确定性任何一個節點失效時其它節點要能夠無縫接管失效節點的計算任務當失效節點恢複後應能自動無縫加入叢集而不需要管理者人工進行系統配置mapreduce并行計算軟體架構使用了多種有效的機制如節點自動重新開機技術使叢集和計算架構具有對付節點失效的健壯性能有效處理失效節點的檢測和恢複。
3、把處理向資料遷移moving processing to the data
傳統高性能計算系統通常有很多處理器節點與一些外存儲器節點相連如用區域存儲網絡(san,storage area network)連接配接的磁盤陣列是以大規模資料處理時外存檔案資料i/o通路會成為一個制約系統性能的瓶頸。為了減少大規模資料并行計算系統中的資料通信開銷代之以把資料傳送到處理節點(資料向處理器或代碼遷移)應當考慮将處理向資料靠攏和遷移。mapreduce采用了資料/代碼互定位的技術方法計算節點将首先将盡量負責計算其本地存儲的資料,以發揮資料本地化特點(locality),僅當節點無法處理本地資料時再采用就近原則尋找其它可用計算節點并把資料傳送到該可用計算節點。
4、順序處理資料、避免随機通路資料process data sequentially and avoid random access
大規模資料處理的特點決定了大量的資料記錄不可能存放在記憶體、而隻可能放在外存中進行處理。磁盤的順序通路和随即通路在性能上有巨大的差異
例100億(1010)個資料記錄(每記錄100b,共計1tb)的資料庫
更新1%的記錄(一定是随機通路)需要1個月時間而順序通路并重寫所有資料記錄僅需1天時間
mapreduce設計為面向大資料集批處理的并行計算系統所有計算都被組織成很長的流式操作以便能利用分布在叢集中大量節點上磁盤集合的高傳輸帶寬。
5、為應用開發者隐藏系統層細節hide system-level details from the application developer
軟體工程實踐指南中專業程式員認為之是以寫程式困難是因為程式員需要記住太多的程式設計細節(從變量名到複雜算法的邊界情況處理)這對大腦記憶是一個巨大的認知負擔,需要高度集中注意力而并行程式編寫有更多困難如需要考慮多線程中諸如同步等複雜繁瑣的細節由于并發執行中的不可預測性程式的調試查錯也十分困難大規模資料處理時程式員需要考慮諸如資料分布存儲管理、資料分發、資料通信和同步、計算結果收集等諸多細節問題mapreduce提供了一種抽象機制将程式員與系統層細節隔離開來程式員僅需描述需要計算什麼(what to compute), 而具體怎麼去做(how to compute)就交由系統的執行架構處理這樣程式員可從系統層細節中解放出來而緻力于其應用本身計算問題的算法設計
6、平滑無縫的可擴充性seamless scalability
主要包括兩層意義上的擴充性資料擴充和系統規模擴充。理想的軟體算法應當能随着資料規模的擴大而表現出持續的有效性性能上的下降程度應與資料規模擴大的倍數相當在叢集規模上要求算法的計算性能應能随着節點數的增加保持接近線性程度的增長絕大多數現有的單機算法都達不到以上理想的要求把中間結果資料維護在記憶體中的單機算法在大規模資料處理時很快失效從單機到基于大規模叢集的并行計算從根本上需要完全不同的算法設計奇妙的是mapreduce幾乎能實作以上理想的擴充性特征。 多項研究發現基于mapreduce的計算性能可随節點數目增長保持近似于線性的增長