Map函數使用一個key和一個value作為參數。我們這裡說的函數是由普通程式設計語言編寫,例如C++,Java等,是以這裡的函數任何人都可以寫出來。入參中,key是輸入檔案的名字,通常會被忽略,因為我們不太關心檔案名是什麼,value是輸入檔案的内容。是以,對于一個單詞計數器來說,value包含了要統計的文本,我們會将這個文本拆分成單詞。之後對于每一個單詞,我們都會調用emit。emit由MapReduce架構提供,并且這裡的emit屬于Map函數。emit會接收兩個參數,其中一個是key,另一個是value。在單詞計數器的例子中,emit入參的key是單詞,value是字元串“1”。這就是一個Map函數。在一個單詞計數器的MapReduce Job中,Map函數實際就可以這麼簡單。而這個Map函數不需要知道任何分布式相關的資訊,不需要知道有多台計算機,不需要知道實際會通過網絡來移動資料。這裡非常直覺。
Reduce函數的入參是某個特定key的所有執行個體(Map輸出中的key-value對中,出現了一次特定的key就可以算作一個執行個體)。是以Reduce函數也是使用一個key和一個value作為參數,其中value是一個數組,裡面每一個元素是Map函數輸出的key的一個執行個體的value。對于單詞計數器來說,key就是單詞,value就是由字元串“1”組成的數組,是以,我們不需要關心value的内容是什麼,我們隻需要關心value數組的長度。Reduce函數也有一個屬于自己的emit函數。這裡的emit函數隻會接受一個參數value,這個value會作為Reduce函數入參的key的最終輸出。是以,對于單詞計數器,我們會給emit傳入數組的長度。這就是一個最簡單的Reduce函數。并且Reduce也不需要知道任何有關容錯或者其他有關分布式相關的資訊。
對于MapReduce的基本架構有什麼問題嗎?
學生提問:可以将Reduce函數的輸出再傳遞給Map函數嗎?
Robert教授:在現實中,這是很常見的。MapReduce使用者定義了一個MapReduce Job,接收一些輸入,生成一些輸出。之後可能會有第二個MapReduce Job來消費前一個Job的輸出。對于一些非常複雜的多階段分析或者疊代算法,比如說Google用來評價網頁的重要性和影響力的PageRank算法,這些算法是逐漸向答案收斂的。我認為Google最初就是這麼使用MapReduce的,他們運作MapReduce Job多次,每一次的輸出都是一個網頁的清單,其中包含了網頁的價值,權重或者重要性。是以将MapReduce的輸出作為另一個MapReduce Job的輸入這很正常。
學生提問:如果可以将Reduce的輸出作為Map的輸入,在生成Reduce函數的輸出時需要有什麼注意嗎?
Robert教授:是的,你需要設定一些内容。比如你需要這麼寫Reduce函數,使其在某種程度上知道應該按照下一個MapReduce Job需要的格式生成資料。這裡實際上帶出了一些MapReduce架構的缺點。如果你的算法可以很簡單的由Map函數、Map函數的中間輸出以及Reduce函數來表達,那是極好的。MapReduce對于能夠套用這種形式的算法是極好的。并且,Map函數必須是完全獨立的,它們是一些隻關心入參的函數。這裡就有一些限制了。事實上,很多人想要的更長的運算流程,這涉及到不同的處理。使用MapReduce的話,你不得不将多個MapReduce Job拼裝在一起。而在本課程後面會介紹的一些更進階的系統中,會讓你指定完整的計算流程,然後這些系統會做優化。這些系統會發現所有你想完成的工作,然後有效的組織更複雜的計算。
學生提問:MapReduce架構更重要還是Map/Reduce函數更重要?
Robert教授:從程式員的角度來看,隻需要關心Map函數和Reduce函數。從我們的角度來看,我們需要關心的是worker程序和worker伺服器。這些是MapReduce架構的一部分,它們與其它很多元件一起調用了Map函數和Reduce函數。是以是的,從我們的角度來看,我們更關心架構是如何組成的。從程式員的角度來看,所有的分布式的内容都被剝離了。
學生提問:當你調用emit時,資料會發生什麼變化?emit函數在哪運作?
Robert教授:首先看,這些函數在哪運作。這裡可以看MapReduce論文的圖1。現實中,MapReduce運作在大量的伺服器之上,我們稱之為worker伺服器或者worker。同時,也會有一個Master節點來組織整個計算過程。這裡實際發生的是,Master伺服器知道有多少輸入檔案,例如5000個輸入檔案,之後它将Map函數分發到不同的worker。是以,它會向worker伺服器發送一條消息說,請對這個輸入檔案執行Map函數吧。之後,MapReduce架構中的worker程序會讀取檔案的内容,調用Map函數并将檔案名和檔案内容作為參數傳給Map函數。worker程序還需要實作emit,這樣,每次Map函數調用emit,worker程序就會将資料寫入到本地磁盤的檔案中。是以,Map函數中調用emit的效果是在worker的本地磁盤上建立檔案,這些檔案包含了目前worker的Map函數生成的所有的key和value。
是以,Map階段結束時,我們看到的就是Map函數在worker上生成的一些檔案。之後,MapReduce的worker會将這些資料移動到Reduce所需要的位置。對于一個典型的大型運算,Reduce的入參包含了所有Map函數對于特定key的輸出。通常來說,每個Map函數都可能生成大量key。是以通常來說,在運作Reduce函數之前。運作在MapReduce的worker伺服器上的程序需要與叢集中每一個其他伺服器互動來詢問說,看,我需要對key=a運作Reduce,請看一下你本地磁盤中存儲的Map函數的中間輸出,找出所有key=a,并通過網絡将它們發給我。是以,Reduce worker需要從每一個worker擷取特定key的執行個體。這是通過由Master通知到Reduce worker的一條指令來觸發。一旦worker收集完所有的資料,它會調用Reduce函數,Reduce函數運算完了會調用自己的emit,這個emit與Map函數中的emit不一樣,它會将輸出寫入到一個Google使用的共享檔案服務中。
有關輸入和輸出檔案的存放位置,這是我之前沒有提到的,它們都存放在檔案中,但是因為我們想要靈活的在任意的worker上讀取任意的資料,這意味着我們需要某種網絡檔案系統(network file system)來存放輸入資料。是以實際上,MapReduce論文談到了GFS(Google File System)。GFS是一個共享檔案服務,并且它也運作在MapReduce的worker叢集的實體伺服器上。GFS會自動拆分你存儲的任何大檔案,并且以64MB的塊存儲在多個伺服器之上。是以,如果你有了10TB的網頁資料,你隻需要将它們寫入到GFS,甚至你寫入的時候是作為一個大檔案寫入的,GFS會自動将這個大檔案拆分成64MB的塊,并将這些塊平均的分布在所有的GFS伺服器之上,而這是極好的,這正是我們所需要的。如果我們接下來想要對剛剛那10TB的網頁資料運作MapReduce Job,資料已經均勻的分割存儲在所有的伺服器上了。如果我們有1000台伺服器,我們會啟動1000個Map worker,每個Map worker會讀取1/1000輸入資料。這些Map worker可以并行的從1000個GFS檔案伺服器讀取資料,并擷取巨大的讀取吞吐量,也就是1000台伺服器能提供的吞吐量。
學生提問:這裡的箭頭代表什麼意思?
Robert教授:随着Google這些年對MapReduce系統的改進,答案也略有不同。通常情況下,如果我們在一個例如GFS的檔案系統中存儲大的檔案,你的資料分散在大量伺服器之上,你需要通過網絡與這些伺服器通信以擷取你的資料。在這種情況下,這個箭頭表示MapReduce的worker需要通過網絡與存儲了輸入檔案的GFS伺服器通信,并通過網絡将資料讀取到MapReduce的worker節點,進而将資料傳遞給Map函數。這是最常見的情況。并且這是MapReduce論文中介紹的工作方式。但是如果你這麼做了,這裡就有很多網絡通信。如果資料總共是10TB,那麼相應的就需要在資料中心網絡上移動10TB的資料。而資料中心網絡通常是GB級别的帶寬,是以移動10TB的資料需要大量的時間。在論文發表的2004年,MapReduce系統最大的限制瓶頸是網絡吞吐。如果你讀到了論文的評估部分,你會發現,當時運作在一個有數千台機器的網絡上,每台計算機都接入到一個機架,機架上有以太網交換機,機架之間通過root交換機連接配接(最上面那個交換機)。
如果随機的選擇MapReduce的worker伺服器和GFS伺服器,那麼至少有一半的機會,它們之間的通信需要經過root交換機,而這個root交換機的吞吐量總是固定的。如果做一個除法,root交換機的總吞吐除以2000,那麼每台機器隻能分到50Mb/S的網絡容量。這個網絡容量相比磁盤或者CPU的速度來說,要小得多。是以,50Mb/S是一個巨大的限制。
在MapReduce論文中,讨論了大量的避免使用網絡的技巧。其中一個是将GFS和MapReduce混合運作在一組伺服器上。是以如果有1000台伺服器,那麼GFS和MapReduce都運作在那1000台伺服器之上。當MapReduce的Master節點拆分Map任務并分包到不同的worker伺服器上時,Master節點會找出輸入檔案具體存在哪台GFS伺服器上,并把對應于那個輸入檔案的Map Task排程到同一台伺服器上。是以,預設情況下,這裡的箭頭是指讀取本地檔案,而不會涉及網絡。雖然由于故障,負載或者其他原因,不能總是讓Map函數都讀取本地檔案,但是幾乎所有的Map函數都會運作在存儲了資料的相同機器上,并是以節省了大量的時間,否則通過網絡來讀取輸入資料将會耗費大量的時間。
我之前提過,Map函數會将輸出存儲到機器的本地磁盤,是以存儲Map函數的輸出不需要網絡通信,至少不需要實時的網絡通信。但是,我們可以确定的是,為了收集所有特定key的輸出,并将它們傳遞給某個機器的Reduce函數,還是需要網絡通信。假設現在我們想要讀取所有的相關資料,并通過網絡将這些資料傳遞給單台機器,資料最開始在運作Map Task的機器上按照行存儲(例如第一行代表第一個Map函數輸出a=1,b=1),
而我們最終需要這些資料在運作Reduce函數的機器上按照列存儲(例如,Reduce函數需要的是第一個Map函數的a=1和第三個Map函數的a=1)。
論文裡稱這種資料轉換之為洗牌(shuffle)。是以,這裡确實需要将每一份資料都通過網絡從建立它的Map節點傳輸到需要它的Reduce節點。是以,這也是MapReduce中代價較大的一部分。
學生提問:是否可以通過Streaming的方式加速Reduce的讀取?
Robert教授:你是對的。你可以設想一個不同的定義,其中Reduce通過streaming方式讀取資料。我沒有仔細想過這個方法,我也不知道這是否可行。作為一個程式接口,MapReduce的第一目标就是讓人們能夠簡單的程式設計,人們不需要知道MapReduce裡面發生了什麼。對于一個streaming方式的Reduce函數,或許就沒有之前的定義那麼簡單了。
不過或許可以這麼做。實際上,很多現代的系統中,會按照streaming的方式處理資料,而不是像MapReduce那樣通過批量的方式處理Reduce函數。在MapReduce中,需要一直要等到所有的資料都擷取到了才會進行Reduce處理,是以這是一種批量處理。現代系統通常會使用streaming并且效率會高一些。
是以這裡的shuffle的重點是,這裡實際上可能會有大量的網絡通信。假設你在進行排序,排序的輸入輸出會有相同的大小。這樣,如果你的輸入是10TB,為了能排序,你需要将10TB的資料在網絡上移動,并且輸出也會是10TB,是以這裡有大量的資料。這可能發生在任何MapReduce job中,盡管有一些MapReduce job在不同階段的資料沒有那麼大。
之前有人提過,想将Reduce的輸出傳給另一個MapReduce job,而這也是人們常做的事情。在一些場景中,Reduce的輸出可能會非常巨大,比如排序,比如網頁索引器。10TB的輸入對應的是10TB的輸出。是以,Reduce的輸出也會存儲在GFS上。但是Reduce隻會生成key-value對,MapReduce架構會收集這些資料,并将它們寫入到GFS的大檔案中。是以,這裡有需要一大輪的網絡通信,将每個Reduce的輸出傳輸到相應的GFS伺服器上。你或許會認為,這裡會使用相同的技巧,就将Reduce的輸出存儲在運作了Reduce Task的同一個GFS伺服器上(因為是混部的)。或許Google這麼做了,但是因為GFS會将資料做拆分,并且為了提高性能并保留容錯性,資料會有2-3份副本。這意味着,不論你寫什麼,你總是需要通過網絡将一份資料拷貝寫到2-3台伺服器上。是以,這裡會有大量的網絡通信。這裡的網絡通信,是2004年限制MapReduce的瓶頸。在2020年,因為之前的網絡架構成為了人們想在資料中心中做的很多事情的限制因素,現代資料中心中,root交換機比過去快了很多。并且,你或許已經見過,一個典型的現代資料中心網絡,會有很多的root交換機而不是一個交換機(spine-leaf架構)。每個機架交換機都與每個root交換機相連,網絡流量在多個root交換機之間做負載分擔。是以,現代資料中心網絡的吞吐大多了。
我認為Google幾年前就不再使用MapReduce了,不過在那之前,現代的MapReduce已經不再嘗試在GFS資料存儲的伺服器上運作Map函數了,它樂意從任何地方加載資料,因為網絡已經足夠快了。
好的,我們沒有時間聊MapReduce了,下周有一個lab,你會在lab中實作一個你自己的簡單版本的MapReduce。