天天看點

Spark 的Shuffle過程詳解

一、Shuffle的作用是什麼?

Shuffle的中文解釋為“洗牌操作”,可以了解成将叢集中所有節點上的資料進行重新整合分類的過程。其思想來源于hadoop的mapReduce,Shuffle是連接配接map階段和reduce階段的橋梁。由于分布式計算中,每個階段的各個計算節點隻處理任務的一部分資料,若下一個階段需要依賴前面階段的所有計算結果時,則需要對前面階段的所有計算結果進行重新整合和分類,這就需要經曆shuffle過程。

在spark中,RDD之間的關系包含窄依賴和寬依賴,其中寬依賴涉及shuffle操作。是以在spark程式的每個job中,都是根據是否有shuffle操作進行階段(stage)劃分,每個stage都是一系列的RDD map操作。

二、shuffle操作為什麼耗時?

shuffle操作需要将資料進行重新聚合和劃分,然後配置設定到叢集的各個節點上進行下一個stage操作,這裡會涉及叢集不同節點間的大量資料交換。由于不同節點間的資料通過網絡進行傳輸時需要先将資料寫入磁盤,是以叢集中每個節點均有大量的檔案讀寫操作,進而導緻shuffle操作十分耗時(相對于map操作)。

三、Spark目前的ShuffleManage模式及處理機制

Spark程式中的Shuffle操作是通過shuffleManage對象進行管理。Spark目前支援的ShuffleMange模式主要有兩種:HashShuffleMagnage 和SortShuffleManage

Shuffle操作包含目前階段的Shuffle Write(存盤)和下一階段的Shuffle Read(fetch),兩種模式的主要差異是在Shuffle Write階段,下面将着重介紹。

1、HashShuffleMagnage

HashShuffleMagnage是Spark1.2之前版本的預設模式,在叢集中的每個executor上,其具體流程如下圖所示:

Spark 的Shuffle過程詳解

從圖中可知,在executor中處理每個task後的結果均會通過buffler緩存的方式寫入到多個磁盤檔案中,其中檔案的個數由shuffle算子的numPartition參數指定(圖中partition為3)。是以Shuffle Write 階段會産生大量的磁盤檔案,整個Shuffle Write 階段的檔案總數為: Write階段的task數目* Read階段的task數目。

由于HashShuffleManage方式會産生很多的磁盤檔案,Spark對其進行了優化,具體優化點為:

(1)executor處理多個task的時候隻會根據Read階段的task數目(設為m)生成對應的檔案數,具體做法是:處理第一個task時生成m個檔案,後續task的結果追加到對應的m個檔案中。

(2)考慮到executor的并行計算能力(core數量),處理任務的每個core均會生成m個檔案。

是以,優化後的HashShuffleManage最終的總檔案數:Write階段的core數量* Read階段的task數目。

2、SortShuffleManage

SortShuffleManage是Spark1.2及以上版本預設的ShuffleManage模式,具體包含普通模式和bypass模式。

1、普通模式

在叢集中的每個executor上,其普通模式的具體流程如下圖所示:

Spark 的Shuffle過程詳解

從圖中可知,SortShuffleManage在資料寫入磁盤檔案前有兩個重要操作:

(1)資料聚合,針對可聚合的shuffle操作(比如reduceBykey()),會基于key值進行資料的聚合操作,以此減少資料量。

(2)資料聚合之後會對資料進行排序操作。

(問題:基于key排序?排序的目的是什麼?),

最後對每個task生成的檔案進行合并,通過索引檔案标注key值在檔案中的位置。

是以,SortShuffleManage産生的總檔案數為:Writer 階段的task數*2

2、bypass模式

bypass模式與HashShuffleMagnage基本一緻,隻是Shuffle Write 階段在最後有一個檔案合并的過程,最終輸出的檔案個數為:Writer階段的task數目*2

spark.shuffle.sort.bypassMergeThreshold預設值為200,即Read階段的task數目小于等于該門檻值時以及Write端是非聚合操作(比如join),會啟用bypass模式,其他情況下采用普通機制。

四、Spark 程式的shuffle調優

Shuffle階段需要将資料寫入磁盤,這其中涉及大量的讀寫檔案操作和檔案傳輸操作,是以對節點的系統IO有比較大的影響,是以可通過調整參數,減少shuffle階段的檔案數和IO讀寫次數來提高性能,具體參數主要有以下幾個:

1)spark.shuffle.manager

設定Spark任務的shuffleManage模式,1.2以上版本的預設方式是sort,即shuffle write階段會進行排序,每個executor上生成的檔案會合并成兩個檔案(包含一個索引檔案)。

2)spark.shuffle.sort.bypassMergeThreshold

設定啟用bypass機制的門檻值(預設為200),若Shuffle Read階段的task數小于等于該值,則Shuffle Write階段啟用bypass機制。

3)spark.shuffle.file.buffer (預設32M)

設定Shuffle Write階段寫檔案時buffer的大小,若記憶體比較充足的話,可以将其值調大一些(比如64M),這樣能減少executor的IO讀寫次數。

4)spark.shuffle.io.maxRetries (預設3次)

設定Shuffle Read階段fetches資料時的重試次數,若shuffle階段的資料量很大,可以适當調大一些。

繼續閱讀