天天看點

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

MapReduce Shuffle原理 與 Spark Shuffle原理

MapReduce的Shuffle過程介紹

Shuffle的本義是洗牌、混洗,把一組有一定規則的資料盡量轉換成一組無規則的資料,越随機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規則的資料盡量轉換成一組具有一定規則的資料。

為什麼MapReduce計算模型需要Shuffle過程?我們都知道MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責資料的過濾分發;Reduce是規約,負責資料的計算歸并。Reduce的資料來源于Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來擷取資料。

從Map輸出到Reduce輸入的整個過程可以廣義地稱為Shuffle。Shuffle橫跨Map端和Reduce端,在Map端包括Spill過程,在Reduce端包括copy和sort過程,如圖所示:

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

Spill過程

Spill過程包括輸出、排序、溢寫、合并等步驟,如圖所示:

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

Collect

每個Map任務不斷地以<key, value>對的形式把資料輸出到在記憶體中構造的一個環形資料結構中。使用環形資料結構是為了更有效地使用記憶體空間,在記憶體中放置盡可能多的資料。

這個資料結構其實就是個位元組數組,叫Kvbuffer,名如其義,但是這裡面不光放置了<key, value>資料,還放置了一些索引資料,給放置索引資料的區域起了一個Kvmeta的别名,在Kvbuffer的一塊區域上穿了一個IntBuffer(位元組序采用的是平台自身的位元組序)的馬甲。<key, value>資料區域和索引資料區域在Kvbuffer中是相鄰不重疊的兩個區域,用一個分界點來劃分兩者,分界點不是亘古不變的,而是每次Spill之後都會更新一次。初始的分界點是0,<key, value>資料的存儲方向是向上增長,索引資料的存儲方向是向下增長,如圖所示:

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

Kvbuffer的存放指針bufindex是一直悶着頭地向上增長,比如bufindex初始值為0,一個Int型的key寫完之後,bufindex增長為4,一個Int型的value寫完之後,bufindex增長為8。

索引是對<key, value>在kvbuffer中的索引,是個四元組,包括:value的起始位置、key的起始位置、partition值、value的長度,占用四個Int長度,Kvmeta的存放指針Kvindex每次都是向下跳四個“格子”,然後再向上一個格子一個格子地填充四元組的資料。比如Kvindex初始位置是-4,當第一個<key, value>寫完之後,(Kvindex+0)的位置存放value的起始位置、(Kvindex+1)的位置存放key的起始位置、(Kvindex+2)的位置存放partition的值、(Kvindex+3)的位置存放value的長度,然後Kvindex跳到-8位置,等第二個<key, value>和索引寫完之後,Kvindex跳到-32位置。

Kvbuffer的大小雖然可以通過參數設定,但是總共就那麼大,<key, value>和索引不斷地增加,加着加着,Kvbuffer總有不夠用的那天,那怎麼辦?把資料從記憶體刷到磁盤上再接着往記憶體寫資料,把Kvbuffer中的資料刷到磁盤上的過程就叫Spill,多麼明了的叫法,記憶體中的資料滿了就自動地spill到具有更大空間的磁盤。

關于Spill觸發的條件,也就是Kvbuffer用到什麼程度開始Spill,還是要講究一下的。如果把Kvbuffer用得死死得,一點縫都不剩的時候再開始Spill,那Map任務就需要等Spill完成騰出空間之後才能繼續寫資料;如果Kvbuffer隻是滿到一定程度,比如80%的時候就開始Spill,那在Spill的同時,Map任務還能繼續寫資料,如果Spill夠快,Map可能都不需要為空閑空間而發愁。兩利相衡取其大,一般選擇後者。

Spill這個重要的過程是由Spill線程承擔,Spill線程從Map任務接到“指令”之後就開始正式幹活,幹的活叫SortAndSpill,原來不僅僅是Spill,在Spill之前還有個頗具争議性的Sort。

Sort

先把Kvbuffer中的資料按照partition值和key兩個關鍵字升序排序,移動的隻是索引資料,排序結果是Kvmeta中資料按照partition為機關聚集在一起,同一partition内的按照key有序。

Spill

Spill線程為這次Spill過程建立一個磁盤檔案:從所有的本地目錄中輪訓查找能存儲這麼大空間的目錄,找到之後在其中建立一個類似于“spill12.out”的檔案。Spill線程根據排過序的Kvmeta挨個partition的把<key, value>資料吐到這個檔案中,一個partition對應的資料吐完之後順序地吐下個partition,直到把所有的partition周遊完。一個partition在檔案中對應的資料也叫段(segment)。

所有的partition對應的資料都放在這個檔案裡,雖然是順序存放的,但是怎麼直接知道某個partition在這個檔案中存放的起始位置呢?強大的索引又出場了。有一個三元組記錄某個partition對應的資料在這個檔案中的索引:起始位置、原始資料長度、壓縮之後的資料長度,一個partition對應一個三元組。然後把這些索引資訊存放在記憶體中,如果記憶體中放不下了,後續的索引資訊就需要寫到磁盤檔案中了:從所有的本地目錄中輪訓查找能存儲這麼大空間的目錄,找到之後在其中建立一個類似于“spill12.out.index”的檔案,檔案中不光存儲了索引資料,還存儲了crc32的校驗資料。(spill12.out.index不一定在磁盤上建立,如果記憶體(預設1M空間)中能放得下就放在記憶體中,即使在磁盤上建立了,和spill12.out檔案也不一定在同一個目錄下。)

每一次Spill過程就會最少生成一個out檔案,有時還會生成index檔案,Spill的次數也烙印在檔案名中。索引檔案和資料檔案的對應關系如下圖所示:

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

話分兩端,在Spill線程如火如荼的進行SortAndSpill工作的同時,Map任務不會是以而停歇,而是一無既往地進行着資料輸出。Map還是把資料寫到kvbuffer中,那問題就來了:<key, value>隻顧着悶頭按照bufindex指針向上增長,kvmeta隻顧着按照Kvindex向下增長,是保持指針起始位置不變繼續跑呢,還是另謀它路?如果保持指針起始位置不變,很快bufindex和Kvindex就碰頭了,碰頭之後再重新開始或者移動記憶體都比較麻煩,不可取。Map取kvbuffer中剩餘空間的中間位置,用這個位置設定為新的分界點,bufindex指針移動到這個分界點,Kvindex移動到這個分界點的-16位置,然後兩者就可以和諧地按照自己既定的軌迹放置資料了,當Spill完成,空間騰出之後,不需要做任何改動繼續前進。分界點的轉換如下圖所示:

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

Map任務總要把輸出的資料寫到磁盤上,即使輸出資料量很小在記憶體中全部能裝得下,在最後也會把資料刷到磁盤上。

Merge

Map任務如果輸出資料量很大,可能會進行好幾次Spill,out檔案和Index檔案會産生很多,分布在不同的磁盤上。最後把這些檔案進行合并的merge過程閃亮登場。

Merge過程怎麼知道産生的Spill檔案都在哪了呢?從所有的本地目錄上掃描得到産生的Spill檔案,然後把路徑存儲在一個數組裡。Merge過程又怎麼知道Spill的索引資訊呢?沒錯,也是從所有的本地目錄上掃描得到Index檔案,然後把索引資訊存儲在一個清單裡。到這裡,又遇到了一個值得納悶的地方。在之前Spill過程中的時候為什麼不直接把這些資訊存儲在記憶體中呢,何必又多了這步掃描的操作?特别是Spill的索引資料,之前當記憶體超限之後就把資料寫到磁盤,現在又要從磁盤把這些資料讀出來,還是需要裝到更多的記憶體中。之是以多此一舉,是因為這時kvbuffer這個記憶體大戶已經不再使用可以回收,有記憶體空間來裝這些資料了。(對于記憶體空間較大的土豪來說,用記憶體來省卻這兩個io步驟還是值得考慮的。)

然後為merge過程建立一個叫file.out的檔案和一個叫file.out.Index的檔案用來存儲最終的輸出和索引。

一個partition一個partition的進行合并輸出。對于某個partition來說,從索引清單中查詢這個partition對應的所有索引資訊,每個對應一個段插入到段清單中。也就是這個partition對應一個段清單,記錄所有的Spill檔案中對應的這個partition那段資料的檔案名、起始位置、長度等等。

然後對這個partition對應的所有的segment進行合并,目标是合并成一個segment。當這個partition對應很多個segment時,會分批地進行合并:先從segment清單中把第一批取出來,以key為關鍵字放置成最小堆,然後從最小堆中每次取出最小的<key, value>輸出到一個臨時檔案中,這樣就把這一批段合并成一個臨時的段,把它加回到segment清單中;再從segment清單中把第二批取出來合并輸出到一個臨時segment,把其加入到清單中;這樣往複執行,直到剩下的段是一批,輸出到最終的檔案中。

最終的索引資料仍然輸出到Index檔案中。

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

Map端的Shuffle過程到此結束。

Copy

Reduce任務通過HTTP向各個Map任務拖取它所需要的資料。每個節點都會啟動一個常駐的HTTP server,其中一項服務就是響應Reduce拖取Map資料。當有MapOutput的HTTP請求過來的時候,HTTP server就讀取相應的Map輸出檔案中對應這個Reduce部分的資料通過網絡流輸出給Reduce。

Reduce任務拖取某個Map對應的資料,如果在記憶體中能放得下這次資料的話就直接把資料寫到記憶體中。Reduce要向每個Map去拖取資料,在記憶體中每個Map對應一塊資料,當記憶體中存儲的Map資料占用空間達到一定程度的時候,開始啟動記憶體中merge,把記憶體中的資料merge輸出到磁盤上一個檔案中。

如果在記憶體中不能放得下這個Map的資料的話,直接把Map資料寫到磁盤上,在本地目錄建立一個檔案,從HTTP流中讀取資料然後寫到磁盤,使用的緩存區大小是64K。拖一個Map資料過來就會建立一個檔案,當檔案數量達到一定門檻值時,開始啟動磁盤檔案merge,把這些檔案合并輸出到一個檔案。

有些Map的資料較小是可以放在記憶體中的,有些Map的資料較大需要放在磁盤上,這樣最後Reduce任務拖過來的資料有些放在記憶體中了有些放在磁盤上,最後會對這些來一個全局合并。

Merge Sort

這裡使用的Merge和Map端使用的Merge過程一樣。Map的輸出資料已經是有序的,Merge進行一次合并排序,所謂Reduce端的sort過程就是這個合并的過程。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。

Reduce端的Shuffle過程至此結束。

Spark的Shuffle過程介紹

Shuffle Writer

Spark豐富了任務類型,有些任務之間資料流轉不需要通過Shuffle,但是有些任務之間還是需要通過Shuffle來傳遞資料,比如wide dependency的group by key。

Spark中需要Shuffle輸出的Map任務會為每個Reduce建立對應的bucket,Map産生的結果會根據設定的partitioner得到對應的bucketId,然後填充到相應的bucket中去。每個Map的輸出結果可能包含所有的Reduce所需要的資料,是以每個Map會建立R個bucket(R是reduce的個數),M個Map總共會建立M*R個bucket。

Map建立的bucket其實對應磁盤上的一個檔案,Map的結果寫到每個bucket中其實就是寫到那個磁盤檔案中,這個檔案也被稱為blockFile,是Disk Block Manager管理器通過檔案名的Hash值對應到本地目錄的子目錄中建立的。每個Map要在節點上建立R個磁盤檔案用于結果輸出,Map的結果是直接輸出到磁盤檔案上的,100KB的記憶體緩沖是用來建立Fast Buffered OutputStream輸出流。這種方式一個問題就是Shuffle檔案過多。

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

針對上述Shuffle過程産生的檔案過多問題,Spark有另外一種改進的Shuffle過程:consolidation Shuffle,以期顯著減少Shuffle檔案的數量。在consolidation Shuffle中每個bucket并非對應一個檔案,而是對應檔案中的一個segment部分。Job的map在某個節點上第一次執行,為每個reduce建立bucket對應的輸出檔案,把這些檔案組織成ShuffleFileGroup,當這次map執行完之後,這個ShuffleFileGroup可以釋放為下次循環利用;當又有map在這個節點上執行時,不需要建立新的bucket檔案,而是在上次的ShuffleFileGroup中取得已經建立的檔案繼續追加寫一個segment;目前次map還沒執行完,ShuffleFileGroup還沒有釋放,這時如果有新的map在這個節點上執行,無法循環利用這個ShuffleFileGroup,而是隻能建立新的bucket檔案組成新的ShuffleFileGroup來寫輸出。

MapReduce Shuffle原理 與 Spark Shuffle原理MapReduce Shuffle原理 與 Spark Shuffle原理

比如一個Job有3個Map和2個reduce:(1) 如果此時叢集有3個節點有空槽,每個節點空閑了一個core,則3個Map會排程到這3個節點上執行,每個Map都會建立2個Shuffle檔案,總共建立6個Shuffle檔案;(2) 如果此時叢集有2個節點有空槽,每個節點空閑了一個core,則2個Map先排程到這2個節點上執行,每個Map都會建立2個Shuffle檔案,然後其中一個節點執行完Map之後又排程執行另一個Map,則這個Map不會建立新的Shuffle檔案,而是把結果輸出追加到之前Map建立的Shuffle檔案中;總共建立4個Shuffle檔案;(3) 如果此時叢集有2個節點有空槽,一個節點有2個空core一個節點有1個空core,則一個節點排程2個Map一個節點排程1個Map,排程2個Map的節點上,一個Map建立了Shuffle檔案,後面的Map還是會建立新的Shuffle檔案,因為上一個Map還正在寫,它建立的ShuffleFileGroup還沒有釋放;總共建立6個Shuffle檔案。

Shuffle Fetcher

Reduce去拖Map的輸出資料,Spark提供了兩套不同的拉取資料架構:通過socket連接配接去取資料;使用netty架構去取資料。

每個節點的Executor會建立一個BlockManager,其中會建立一個BlockManagerWorker用于響應請求。當Reduce的GET_BLOCK的請求過來時,讀取本地檔案将這個blockId的資料傳回給Reduce。如果使用的是Netty架構,BlockManager會建立ShuffleSender用于發送Shuffle資料。

并不是所有的資料都是通過網絡讀取,對于在本節點的Map資料,Reduce直接去磁盤上讀取而不再通過網絡架構。

Reduce拖過來資料之後以什麼方式存儲呢?Spark Map輸出的資料沒有經過排序,Spark Shuffle過來的資料也不會進行排序,Spark認為Shuffle過程中的排序不是必須的,并不是所有類型的Reduce需要的資料都需要排序,強制地進行排序隻會增加Shuffle的負擔。Reduce拖過來的資料會放在一個HashMap中,HashMap中存儲的也是<key, value>對,key是Map輸出的key,Map輸出對應這個key的所有value組成HashMap的value。Spark将Shuffle取過來的每一個<key, value>對插入或者更新到HashMap中,來一個處理一個。HashMap全部放在記憶體中。

Shuffle取過來的資料全部存放在記憶體中,對于資料量比較小或者已經在Map端做過合并處理的Shuffle資料,占用記憶體空間不會太大,但是對于比如group by key這樣的操作,Reduce需要得到key對應的所有value,并将這些value組一個數組放在記憶體中,這樣當資料量較大時,就需要較多記憶體。

當記憶體不夠時,要不就失敗,要不就用老辦法把記憶體中的資料移到磁盤上放着。Spark意識到在處理資料規模遠遠大于記憶體空間時所帶來的不足,引入了一個具有外部排序的方案。Shuffle過來的資料先放在記憶體中,當記憶體中存儲的<key, value>對超過1000并且記憶體使用超過70%時,判斷節點上可用記憶體如果還足夠,則把記憶體緩沖區大小翻倍,如果可用記憶體不再夠了,則把記憶體中的<key, value>對排序然後寫到磁盤檔案中。最後把記憶體緩沖區中的資料排序之後和那些磁盤檔案組成一個最小堆,每次從最小堆中讀取最小的資料,這個和MapReduce中的merge過程類似。

MapReduce和Spark的Shuffle過程對比

MapReduce Spark
collect 在記憶體中構造了一塊資料結構用于map輸出的緩沖 沒有在記憶體中構造一塊資料結構用于map輸出的緩沖,而是直接把輸出寫到磁盤檔案
sort map輸出的資料有排序 map輸出的資料沒有排序
merge 對磁盤上的多個spill檔案最後進行合并成一個輸出檔案 在map端沒有merge過程,在輸出時直接是對應一個reduce的資料寫到一個檔案中,這些檔案同時存在并發寫,最後不需要合并成一個
copy架構 jetty netty或者直接socket流
對于本節點上的檔案 仍然是通過網絡架構拖取資料 不通過網絡架構,對于在本節點上的map輸出檔案,采用本地讀取的方式
copy過來的資料存放位置 先放在記憶體,記憶體放不下時寫到磁盤

一種方式全部放在記憶體;

另一種方式先放在記憶體

merge sort 最後會對磁盤檔案和記憶體中的資料進行合并排序 對于采用另一種方式時也會有合并排序的過程

Shuffle後續優化方向

通過上面的介紹,我們了解到,Shuffle過程的主要存儲媒體是磁盤,盡量的減少IO是Shuffle的主要優化方向。我們腦海中都有那個經典的存儲金字塔體系,Shuffle過程為什麼把結果都放在磁盤上,那是因為現在記憶體再大也大不過磁盤,記憶體就那麼大,還這麼多張嘴吃,當然是配置設定給最需要的了。如果具有“土豪”記憶體節點,減少Shuffle IO的最有效方式無疑是盡量把資料放在記憶體中。下面列舉一些現在看可以優化的方面,期待經過我們不斷的努力,TDW計算引擎運作地更好。

MapReduce Shuffle後續優化方向

  • 壓縮:對資料進行壓縮,減少寫讀資料量;
  • 減少不必要的排序:并不是所有類型的Reduce需要的資料都是需要排序的,排序這個nb的過程如果不需要最好還是不要的好;
  • 記憶體化:Shuffle的資料不放在磁盤而是盡量放在記憶體中,除非逼不得已往磁盤上放;當然了如果有性能和記憶體相當的第三方存儲系統,那放在第三方存儲系統上也是很好的;這個是個大招;
  • 網絡架構:netty的性能據說要占優了;
  • 本節點上的資料不走網絡架構:對于本節點上的Map輸出,Reduce直接去讀吧,不需要繞道網絡架構。

Spark Shuffle後續優化方向

Spark作為MapReduce的進階架構,對于Shuffle過程已經是優化了的,特别是對于那些具有争議的步驟已經做了優化,但是Spark的Shuffle對于我們來說在一些方面還是需要優化的。

  • 壓縮:對資料進行壓縮,減少寫讀資料量;
  • 記憶體化:Spark曆史版本中是有這樣設計的:Map寫資料先把資料全部寫到記憶體中,寫完之後再把資料刷到磁盤上;考慮記憶體是緊缺資源,後來修改成把資料直接寫到磁盤了;對于具有較大記憶體的叢集來講,還是盡量地往記憶體上寫吧,記憶體放不下了再放磁盤。

轉載:http://www.csdn.net/article/2014-05-19/2819831-TDW-Shuffle/1

繼續閱讀