任務目的
- 了解 Shuffle 的概念和作用
- 了解 Map 端 Shuffle 的詳細過程
- 了解 Reduce 端 Shuffle 的詳細過程
任務清單
- 任務1:Shuffle 簡介
- 任務2:Shuffle 主要流程
- 任務3:Map 端的 Shuffle 過程
- 任務4:Reduce 端的 Shuffle 過程
詳細任務步驟
任務1:Shuffle 簡介
在 Hadoop 中資料從 Map 階段傳遞給 Reduce 階段的過程就叫 Shuffle,Shuffle 機制是整個 MapReduce 架構中最核心的部分。
Shuffle的本義是洗牌、混洗,把一組有一定規則的資料盡量轉換成一組無規則的資料,越随機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規則的資料盡量轉換成一組具有一定規則的資料。
MapReduce計算模型為什麼需要 Shuffle 過程?
我們都知道MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責資料的過濾分發;Reduce是規約,負責資料的計算歸并。Reduce的資料來源于Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來擷取資料。
分區和排序。
任務2:Shuffle 主要流程
Shuffle 描述的是資料從 Map 端到 Reduce 端的過程,大緻分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge sort)這幾個過程,大體流程如下:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CN2czN4MTOmJzMwEWMjhTZyYzX4ADOxQTMyAzLchDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
圖1
Shuffle 是 MapReduce 處理流程中的一個過程,它的每一個處理步驟是分散在各個 MapTask 和 ReduceTask 節點上完成的,整體來看,分為 3個操作:
- Partition(分區,必要)
- Sort (根據 key 排序,必要)
- Combiner (進行局部 value 的合并,非必要)
任務3:Map 端的 Shuffle 過程
Map 端做了下圖所示的操作:
圖2
1. Collect (收集)階段
MapTask 收集我們的
map()
方法輸出的 kv 對,放到記憶體緩沖區Kvbuffer中。每一個 MapTask 都有一個環形記憶體緩沖區,用于存儲任務的輸出,預設大小100MB(
mapreduce.task.io.sort.mb
屬性)。
提問:什麼是環形緩沖區?
首尾相連的資料結構,專門用來存儲 Key-Value 格式的資料,可以叫做Kvbuffer:
圖4
舉例來說,下圖中總共涉及三個變量:kvstart、kvindex和kvend。kvstart表示目前已寫的資料的開始位置,kvindex表示寫一個下一個可寫的位置,是以,從kvstart到(kvindex-1)這部分資料就是已經寫的資料,另外一個線程來Spill的時候,讀取的資料就是這一部分。而寫線程仍然從kvindex位置開始,并不沖突(如果寫得太快而讀得太慢,追了一圈後可以通過變量值判斷,也無需加鎖,隻是等待)。
最初往環形緩沖區中添加資料的時候kvend=kvstart,但kvindex遞增;當觸發Spill的時候,kvend=kvindex,Spill的值涵蓋從kvstart到kvend-1區間的資料,kvindex不影響,繼續按照進入的資料遞增;當進行完Spill的時候,kvindex增加,kvstart移動到kvend處,在Spill這段時間,kvindex可能已經往前移動了,但并不影響資料的讀取,是以,kvend實際上一般情況下不變,隻有在要讀取環形緩沖區中的資料時發生一次改變(即設定kvend=kvindex):
圖5
提問:為什麼要用環形緩沖區?
使用環形緩沖區,便于寫入緩沖區和寫出緩沖區同時進行。
提問:為什麼不等緩沖區滿了再spill?
如果寫滿了才溢出到磁盤,那麼在溢出磁盤的過程中不能寫入,寫就被阻塞了,但是如果到了一定程度就溢出磁盤,那麼緩沖區就一直有剩餘空間可以寫,這樣就可以設計成讀寫不沖突,提高吞吐量。
2. sort (排序)階段
門檻值80%,即80MB(可通過
mapreduce.map.sort.spill.percent
配置),一個背景線程就會不斷地将資料溢出(spill)到本地磁盤檔案中,可能會溢出多個檔案;但溢寫之前會有一個 sort 操作,這個 sort 操作先把 Kvbuffer 中的資料按照 partition 值和 key 兩個關鍵字來排序,移動的隻是索引資料,排序結果是 Kvmeta 中資料按照 partition 為機關聚集在一起,同一 partition 内的按照 key 有序。如果有 Combiner,還要對排序後的資料進行 Combiner。
3. spill(溢寫)階段
分區為機關,一個分區寫完,寫下一個分區,分區内資料有序,最終實際上會多次溢寫,然後生成多個溢出檔案。
圖8
4. merge(合并)階段
每次溢寫會在磁盤上生成一個溢寫檔案,如果 Map 的輸出結果很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個溢寫檔案存在。因為最終的檔案隻有一個,是以需要将這些溢寫檔案歸并到一起,這個過程就叫做 Merge。Merge 的過程也是相同分區的合并成一個片段(segment),最終所有的 segment 組裝成一個最終檔案,那麼合并過程就完成了。如下圖所示:
圖3
至此,Map 的操作就已經完成,Reduce 端操作即将登場。
任務4:Reduce 端的 Shuffle 過程
圖6
1. fetch copy(拉取拷貝) 階段
Reduce 端預設有5個資料複制線程從 Map 端複制資料,其通過 Http 方式得到 Map 對應分區的輸出檔案。每個 MapTask 的完成時間可能不同,是以隻要有一個任務完成,ReduceTask 就開始複制(copy)其輸出。這些資料預設會儲存在記憶體的緩沖區中,當記憶體的緩沖區達到一定的門檻值的時候,就會将資料寫到磁盤之上。
2. merge sort(合并排序)階段
接下來就是sort階段,也稱為merge階段。因為這個階段的主要工作是執行了歸并排序。
在 ReduceTask 遠端複制資料的同時,會在背景開啟2個線程(一個是記憶體到磁盤的合并,一個是磁盤到磁盤的合并)對記憶體中和本地磁盤中的資料檔案進行合并操作,以防止記憶體使用過多或磁盤上檔案過多。
在對資料進行合并的同時,會進行排序操作,由于MapTask 階段已經對資料進行了局部的排序,是以,ReduceTask 隻需對所有資料進行一次歸并排序即可。