天天看點

Spark Shuffle 原理Spark Shuffle 原理

Spark Shuffle 原理

文章目錄

  • Spark Shuffle 原理
    • Shuffle 中的兩個 stage
    • Shuffle 中的任務數
    • reduce 拉取資料的過程
    • HashShuffle
      • 未經優化的 HashShuffleManager
      • 優化的 HashShuffleManager
    • SortShuffle
      • SortShuffleManager
      • BypassSortShuffleManager

Shuffle 中的兩個 stage

在 stage 劃分時,最後一個 stage 為 ResultStage,其他的則都為 ShuffleMapStage;

ResultStage : 代表着由一個 action 算子的觸發,并代表 Spark 作業的結束;

ShuffleMapStage : 代表着一個引起 Shuffle 的算子的執行,為下遊的 stage 進行 shuffle write;

Shuffle 中的任務數

  • MapTask : 最開始的 MapTask 如果是讀的 hdfs 上的資料,那麼分區的數量由 split 的數量決定;
  • ReduceTask : reduce 端的分區預設取

    spark.default.parallelism

    的值; 如果沒有設定,那麼在沒有發生重分區的情況下,reduce 端的分區數等于 map 端最有一個 RDD 的分區數量; 如果發生重分區,以重分區數量為準;

reduce 拉取資料的過程

map task 為下遊 stage 進行 shuffle write,reduce task 進行 shuffle read; 那麼 reduce task 就需要知道在哪些地方拉取 map task 産生的資料:

  1. map task 執行完畢後會将計算狀态以及磁盤小檔案位置等資訊封裝到

    MapStatus

    對象中,然後由本程序中的

    MapOutPutTrackerWorker

    對象将

    MapStatus

    對象發送給Driver程序的

    MapOutPutTrackerMaster

    對象;
  2. 在reduce task開始執行之前會先讓本程序中的

    MapOutputTrackerWorker

    向Driver程序中的

    MapoutPutTrakcerMaster

    發動請求,請求磁盤小檔案位置資訊;
  3. 當所有的Map task執行完畢後,Driver程序中的

    MapOutPutTrackerMaster

    就掌握了所有的磁盤小檔案的位置資訊。此時

    MapOutPutTrackerMaster

    會告訴

    MapOutPutTrackerWorker

    磁盤小檔案的位置資訊;
  4. 完成之前的操作之後,由

    BlockTransforService

    去Executor所在的節點拉資料,預設會啟動五個子線程。每次拉取的資料量不能超過48M(reduce task每次最多拉取48M資料,将拉來的資料存儲到Executor記憶體的執行記憶體中)。

HashShuffle

未經優化的 HashShuffleManager

shuffle write 為下遊的 stage 進行資料的準備和劃分; 每個 task 将 key 按照 hash 進行資料的劃分,将相同 key 的資料寫入同一個磁盤檔案,而每一個磁盤檔案隻屬于下遊的一個 task。

而在将資料寫入磁盤之前,會先将資料寫入記憶體緩沖中,當記憶體緩沖填滿之後,才會溢寫到磁盤檔案中去。

使用未經優化的 HashShuffleManager 會産生大量的磁盤檔案主要展現在: 下一個 stage 的 task 有多少個,目前 stage 的每個 task 就要建立多少份磁盤檔案。

是以總共生成的檔案數為:目前task的數量 * 下個 stage 的 task 數量;

未優化的HashShuffleManager工作原理如圖所示:

Spark Shuffle 原理Spark Shuffle 原理

優化的 HashShuffleManager

為了優化 HashShuffleManager,可以通過設定

spark.shuffle.consolidateFiles

,預設為false,将其設定為 true 打開;

開啟了之後,和未經優化的 HashShuffleManager 的主要差別為 : 下一個 stage 的 task 有多少個,目前 stage 的每個CPU Core 就要建立多少份磁盤檔案;

每個Executor建立的磁盤檔案的數量的計算公式為 : CPU core的數量 * 下一個stage的task數量

Spark Shuffle 原理Spark Shuffle 原理

SortShuffle

SortShuffleManager

SortShuffleManager 的工作模式為,先将資料寫入記憶體資料結構; 如果是reduceByKey這種聚合類的shuffle算子,那麼會選用Map資料結構,一邊通過Map進行聚合,一邊寫入記憶體;如果是join這種普通的shuffle算子,那麼會選用Array資料結構,直接寫入記憶體。

每寫入一條會判斷是否達到門檻值,達到門檻值了之後會進行溢寫磁盤;

在溢寫磁盤之前會進行對 key 的排序,寫入到記憶體緩沖區,然後按照每批次預設 10000 條的數量進行多次溢寫磁盤; 每次溢寫會産生一個臨時檔案;

而最後結束的時候,會将所有的臨時檔案進行 merge,并産生一個索引檔案,辨別了下遊各個task的資料在檔案中的start offset與end offset;

是以每個 executor 上,隻會産生目前 executor 上的 task 數的磁盤檔案數量

Spark Shuffle 原理Spark Shuffle 原理

BypassSortShuffleManager

bypass運作機制的觸發條件如下:

  • shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值
  • 不是聚合類的shuffle算子

此時,每個 task 會為下遊的每一個 task 産生一份臨時檔案,按照 key 進行 hash 将 相同 key 放入同一個檔案; 在寫磁盤之前,會寫到記憶體的緩沖區,緩沖區滿了之後

才會進行資料的溢寫;

上面的步驟和 未經優化的 HashShuffleManager 相同,而 bypass 機制下,後續會進行臨時檔案的合并,并生成一份索引檔案;

和 SortShuffleManager的不同是

  1. 寫入磁盤的機制不同
    • bypass 是記憶體緩沖區滿了才溢寫
    • sort 是按照批次 10000 條進行溢寫
  2. 不需要排序

每個 executor 上,隻會産生目前 executor 上的 task 數的磁盤檔案數量

Spark Shuffle 原理Spark Shuffle 原理