Spark Shuffle 原理
文章目錄
- Spark Shuffle 原理
-
- Shuffle 中的兩個 stage
- Shuffle 中的任務數
- reduce 拉取資料的過程
- HashShuffle
-
- 未經優化的 HashShuffleManager
- 優化的 HashShuffleManager
- SortShuffle
-
- SortShuffleManager
- BypassSortShuffleManager
Shuffle 中的兩個 stage
在 stage 劃分時,最後一個 stage 為 ResultStage,其他的則都為 ShuffleMapStage;
ResultStage : 代表着由一個 action 算子的觸發,并代表 Spark 作業的結束;
ShuffleMapStage : 代表着一個引起 Shuffle 的算子的執行,為下遊的 stage 進行 shuffle write;
Shuffle 中的任務數
- MapTask : 最開始的 MapTask 如果是讀的 hdfs 上的資料,那麼分區的數量由 split 的數量決定;
- ReduceTask : reduce 端的分區預設取
的值; 如果沒有設定,那麼在沒有發生重分區的情況下,reduce 端的分區數等于 map 端最有一個 RDD 的分區數量; 如果發生重分區,以重分區數量為準;spark.default.parallelism
reduce 拉取資料的過程
map task 為下遊 stage 進行 shuffle write,reduce task 進行 shuffle read; 那麼 reduce task 就需要知道在哪些地方拉取 map task 産生的資料:
- map task 執行完畢後會将計算狀态以及磁盤小檔案位置等資訊封裝到
對象中,然後由本程序中的
MapStatus
對象将
MapOutPutTrackerWorker
對象發送給Driver程序的
MapStatus
對象;
MapOutPutTrackerMaster
- 在reduce task開始執行之前會先讓本程序中的
向Driver程序中的
MapOutputTrackerWorker
發動請求,請求磁盤小檔案位置資訊;
MapoutPutTrakcerMaster
- 當所有的Map task執行完畢後,Driver程序中的
就掌握了所有的磁盤小檔案的位置資訊。此時
MapOutPutTrackerMaster
會告訴
MapOutPutTrackerMaster
磁盤小檔案的位置資訊;
MapOutPutTrackerWorker
- 完成之前的操作之後,由
去Executor所在的節點拉資料,預設會啟動五個子線程。每次拉取的資料量不能超過48M(reduce task每次最多拉取48M資料,将拉來的資料存儲到Executor記憶體的執行記憶體中)。
BlockTransforService
HashShuffle
未經優化的 HashShuffleManager
shuffle write 為下遊的 stage 進行資料的準備和劃分; 每個 task 将 key 按照 hash 進行資料的劃分,将相同 key 的資料寫入同一個磁盤檔案,而每一個磁盤檔案隻屬于下遊的一個 task。
而在将資料寫入磁盤之前,會先将資料寫入記憶體緩沖中,當記憶體緩沖填滿之後,才會溢寫到磁盤檔案中去。
使用未經優化的 HashShuffleManager 會産生大量的磁盤檔案主要展現在: 下一個 stage 的 task 有多少個,目前 stage 的每個 task 就要建立多少份磁盤檔案。
是以總共生成的檔案數為:目前task的數量 * 下個 stage 的 task 數量;
未優化的HashShuffleManager工作原理如圖所示:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5CMyYTMmVjN0IjZllzYkFzY4ITO4EzNkNmN1ADZkNDZy8CX0JXZ252bj91Ztl2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
優化的 HashShuffleManager
為了優化 HashShuffleManager,可以通過設定
spark.shuffle.consolidateFiles
,預設為false,将其設定為 true 打開;
開啟了之後,和未經優化的 HashShuffleManager 的主要差別為 : 下一個 stage 的 task 有多少個,目前 stage 的每個CPU Core 就要建立多少份磁盤檔案;
每個Executor建立的磁盤檔案的數量的計算公式為 : CPU core的數量 * 下一個stage的task數量
SortShuffle
SortShuffleManager
SortShuffleManager 的工作模式為,先将資料寫入記憶體資料結構; 如果是reduceByKey這種聚合類的shuffle算子,那麼會選用Map資料結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle算子,那麼會選用Array資料結構,直接寫入記憶體。
每寫入一條會判斷是否達到門檻值,達到門檻值了之後會進行溢寫磁盤;
在溢寫磁盤之前會進行對 key 的排序,寫入到記憶體緩沖區,然後按照每批次預設 10000 條的數量進行多次溢寫磁盤; 每次溢寫會産生一個臨時檔案;
而最後結束的時候,會将所有的臨時檔案進行 merge,并産生一個索引檔案,辨別了下遊各個task的資料在檔案中的start offset與end offset;
是以每個 executor 上,隻會産生目前 executor 上的 task 數的磁盤檔案數量
BypassSortShuffleManager
bypass運作機制的觸發條件如下:
- shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值
- 不是聚合類的shuffle算子
此時,每個 task 會為下遊的每一個 task 産生一份臨時檔案,按照 key 進行 hash 将 相同 key 放入同一個檔案; 在寫磁盤之前,會寫到記憶體的緩沖區,緩沖區滿了之後
才會進行資料的溢寫;
上面的步驟和 未經優化的 HashShuffleManager 相同,而 bypass 機制下,後續會進行臨時檔案的合并,并生成一份索引檔案;
和 SortShuffleManager的不同是
- 寫入磁盤的機制不同
- bypass 是記憶體緩沖區滿了才溢寫
- sort 是按照批次 10000 條進行溢寫
- 不需要排序
每個 executor 上,隻會産生目前 executor 上的 task 數的磁盤檔案數量