天天看點

8.1 Shuffle 工作機制

任務目的

  • 了解 Shuffle 的概念和作用
  • 了解 Map 端 Shuffle 的詳細過程
  • 了解 Reduce 端 Shuffle 的詳細過程

任務清單

  • 任務1:Shuffle 簡介
  • 任務2:Shuffle 主要流程
  • 任務3:Map 端的 Shuffle 過程
  • 任務4:Reduce 端的 Shuffle 過程

詳細任務步驟

任務1:Shuffle 簡介

  在 Hadoop 中資料從 Map 階段傳遞給 Reduce 階段的過程就叫 Shuffle,Shuffle 機制是整個 MapReduce 架構中最核心的部分。

  Shuffle的本義是洗牌、混洗,把一組有一定規則的資料盡量轉換成一組無規則的資料,越随機越好。MapReduce中的Shuffle更像是洗牌的逆過程,把一組無規則的資料盡量轉換成一組具有一定規則的資料。

  MapReduce計算模型為什麼需要 Shuffle 過程?

  我們都知道MapReduce計算模型一般包括兩個重要的階段:Map是映射,負責資料的過濾分發;Reduce是規約,負責資料的計算歸并。Reduce的資料來源于Map,Map的輸出即是Reduce的輸入,Reduce需要通過Shuffle來擷取資料。

分區和排序。

任務2:Shuffle 主要流程

  Shuffle 描述的是資料從 Map 端到 Reduce 端的過程,大緻分為排序(sort)、溢寫(spill)、合并(merge)、拉取拷貝(Copy)、合并排序(merge sort)這幾個過程,大體流程如下:

8.1 Shuffle 工作機制

圖1

 

  Shuffle 是 MapReduce 處理流程中的一個過程,它的每一個處理步驟是分散在各個 MapTask 和 ReduceTask 節點上完成的,整體來看,分為 3個操作:

  • Partition(分區,必要)
  • Sort (根據 key 排序,必要)
  • Combiner (進行局部 value 的合并,非必要)

任務3:Map 端的 Shuffle 過程

  Map 端做了下圖所示的操作:

8.1 Shuffle 工作機制

圖2

 

  1. Collect (收集)階段

  MapTask 收集我們的 ​

​map()​

​ 方法輸出的 kv 對,放到記憶體緩沖區Kvbuffer中。每一個 MapTask 都有一個環形記憶體緩沖區,用于存儲任務的輸出,預設大小100MB( ​

​mapreduce.task.io.sort.mb​

​屬性)。

  提問:什麼是環形緩沖區?

首尾相連的資料結構,專門用來存儲 Key-Value 格式的資料,可以叫做Kvbuffer:

8.1 Shuffle 工作機制

圖4

 

  舉例來說,下圖中總共涉及三個變量:kvstart、kvindex和kvend。kvstart表示目前已寫的資料的開始位置,kvindex表示寫一個下一個可寫的位置,是以,從kvstart到(kvindex-1)這部分資料就是已經寫的資料,另外一個線程來Spill的時候,讀取的資料就是這一部分。而寫線程仍然從kvindex位置開始,并不沖突(如果寫得太快而讀得太慢,追了一圈後可以通過變量值判斷,也無需加鎖,隻是等待)。

  最初往環形緩沖區中添加資料的時候kvend=kvstart,但kvindex遞增;當觸發Spill的時候,kvend=kvindex,Spill的值涵蓋從kvstart到kvend-1區間的資料,kvindex不影響,繼續按照進入的資料遞增;當進行完Spill的時候,kvindex增加,kvstart移動到kvend處,在Spill這段時間,kvindex可能已經往前移動了,但并不影響資料的讀取,是以,kvend實際上一般情況下不變,隻有在要讀取環形緩沖區中的資料時發生一次改變(即設定kvend=kvindex):

8.1 Shuffle 工作機制

圖5

 

  提問:為什麼要用環形緩沖區?

  使用環形緩沖區,便于寫入緩沖區和寫出緩沖區同時進行。

  提問:為什麼不等緩沖區滿了再spill?

  如果寫滿了才溢出到磁盤,那麼在溢出磁盤的過程中不能寫入,寫就被阻塞了,但是如果到了一定程度就溢出磁盤,那麼緩沖區就一直有剩餘空間可以寫,這樣就可以設計成讀寫不沖突,提高吞吐量。

  2. sort (排序)階段

門檻值80%,即80MB(可通過 ​

​mapreduce.map.sort.spill.percent​

​ 配置),一個背景線程就會不斷地将資料溢出(spill)到本地磁盤檔案中,可能會溢出多個檔案;但溢寫之前會有一個 sort 操作,這個 sort 操作先把 Kvbuffer 中的資料按照 partition 值和 key 兩個關鍵字來排序,移動的隻是索引資料,排序結果是 Kvmeta 中資料按照 partition 為機關聚集在一起,同一 partition 内的按照 key 有序。如果有 Combiner,還要對排序後的資料進行 Combiner。

  3. spill(溢寫)階段

分區為機關,一個分區寫完,寫下一個分區,分區内資料有序,最終實際上會多次溢寫,然後生成多個溢出檔案。

8.1 Shuffle 工作機制

圖8

 

  4. merge(合并)階段

  每次溢寫會在磁盤上生成一個溢寫檔案,如果 Map 的輸出結果很大,有多次這樣的溢寫發生,磁盤上相應的就會有多個溢寫檔案存在。因為最終的檔案隻有一個,是以需要将這些溢寫檔案歸并到一起,這個過程就叫做 Merge。Merge 的過程也是相同分區的合并成一個片段(segment),最終所有的 segment 組裝成一個最終檔案,那麼合并過程就完成了。如下圖所示:

8.1 Shuffle 工作機制

圖3

 

  至此,Map 的操作就已經完成,Reduce 端操作即将登場。

任務4:Reduce 端的 Shuffle 過程

8.1 Shuffle 工作機制

圖6

 

  1. fetch copy(拉取拷貝) 階段

  Reduce 端預設有5個資料複制線程從 Map 端複制資料,其通過 Http 方式得到 Map 對應分區的輸出檔案。每個 MapTask 的完成時間可能不同,是以隻要有一個任務完成,ReduceTask 就開始複制(copy)其輸出。這些資料預設會儲存在記憶體的緩沖區中,當記憶體的緩沖區達到一定的門檻值的時候,就會将資料寫到磁盤之上。

  2. merge sort(合并排序)階段

  接下來就是sort階段,也稱為merge階段。因為這個階段的主要工作是執行了歸并排序。

  在 ReduceTask 遠端複制資料的同時,會在背景開啟2個線程(一個是記憶體到磁盤的合并,一個是磁盤到磁盤的合并)對記憶體中和本地磁盤中的資料檔案進行合并操作,以防止記憶體使用過多或磁盤上檔案過多。

  在對資料進行合并的同時,會進行排序操作,由于MapTask 階段已經對資料進行了局部的排序,是以,ReduceTask 隻需對所有資料進行一次歸并排序即可。

繼續閱讀