天天看點

Spark shuffle調優

Spark shuffle是什麼

Shuffle在Spark中即是把父RDD中的KV對按照Key重新分區,進而得到一個新的RDD。也就是說原本同屬于父RDD同一個分區的資料需要進入到子RDD的不同的分區。

現在的spark版本預設使用的是sortshuffle;

shuffle在哪裡産生

shuffle在spark的算子中産生,也就是運作task的時候才會産生shuffle.

sortShuffleManagerspark

shuffle的預設計算引擎叫sortshuffleManager,它負責shuffle過程的執行、計算群組件的處理,sortshuffleManager會将task進行shuffle操作時産生的臨時磁盤檔案合并成一個磁盤檔案,在下一個stage的shuffle read task拉取自己的資料時,隻要根據索引讀取每個磁盤檔案中的部分資料即可。

sortshuffle的内部機制

Spark shuffle調優

資料會根據不同的shuffle算子存儲到map資料結構(如reduceByKey)或者array資料結構(join);不過Map是一邊聚合,一邊寫入記憶體,array是直接寫入記憶體. 當記憶體達到一個門檻值,就會溢出寫到磁盤,是以在溢出這個環節會在磁盤上産生多個臨時檔案,磁盤上的這些檔案需要合并,于是spark就有了merge機制.

在溢寫到磁盤之前,在記憶體中會按照key來排序,排序過後會進入到一個buffer緩沖區,預設為32K,緩沖區的batch預設為1萬條key,也就是緩沖區以每次一萬條的量寫入到磁盤檔案中,該緩沖區減少IO,提高性能. 緩沖區和寫入磁盤使用的技術是java中的BufferedOutputStream.

merge會将之前産生的所有的臨時檔案進行合并,包括緩沖區讀寫到磁盤上的檔案,合并成一個大的檔案到磁盤,預設為48M,與這個檔案相對于的還有一個索引檔案,索引檔案裡面記錄的是這個檔案的元資訊,且這個磁盤檔案也是下遊stage的Task的輸入資訊!    注: 一個下遊的task對應一個磁盤檔案和這個磁盤檔案的元資訊. 于是就有了血統,繼承之類的!

shuffle當中可能會遇到的問題

資料量非常大,從其他各台機器收集資料占用大量網絡。

資料如何分類,即如何Partition,Hash、Sort等;

負載均衡(資料傾斜),因為采用不同的Shuffle方式對資料不同的分類,而分類之後又要跑到具體的節點上計算,如果不恰當的話,很容易産生資料傾斜;

網絡傳輸效率,需要在壓縮和解壓縮之間做出權衡,序列化和反序列也是要考慮的問題;

說明:具體的Task進行計算的時候盡一切最大可能使得資料具備Process Locality的特性;退而求次是增加資料分片,減少每個Task處理的資料量。

shuffle調優

shuffle調優分為兩種,一種是shuffle參數根據實際情況調優,一種是代碼開發調優,代碼開發調優我在spark性能調優裡面去寫!

spark.shuffle.file.buffer(預設值為32K,每次出貨1萬條)該參數是緩沖區的緩沖記憶體,如果可用的記憶體資源較為充足的話,可以将緩沖區的值設定大點,這樣會較少磁盤IO次數.,如果合理調節該參數,性能會提升1%~5%...  可以設定為64K.

spark.reducer.maxSizeInFlight(預設為48M)該參數是stage的每一個task就需要将上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然後進行key的聚合或連接配接等操作,如果合理調節該參數(增大),性能會提升1%~5%...

spark.shuffle.io.maxRetries(預設3次)該參數是stage的task向上一個stage的task計算結果拉取資料,也就是上面那個操作,有時候會因為網絡異常原因,導緻拉取失敗,失敗時候預設重新拉取三次,三次過還是失敗的話作業就執行失敗了,根據具體的業務可以考慮将預設值增大,這樣可以避免由于JVM的一些原因或者網絡不穩定等因素導緻的資料拉取失敗.也有助于提高spark作業的穩定性. 可以适當的提升重新拉取的次數,最大為60次.

spark.shuffle.io.retryWait(預設為5s)該參數和上面一樣,是每次拉取資料的間隔時間...  調優建議:建議加大間隔時長(比如20s),以增加shuffle操作的穩定性

spark.shuffle.memoryFraction(預設0.2,也就是20%)該參數是資料根據不同的shuffle算子将資料寫入記憶體結構中,記憶體結構達到門檻值會溢出臨時檔案,這個參數就是則是記憶體結構的門檻值百分比的,不是記憶體結構的記憶體大小.  如果記憶體充足,而且很少使用持久化操作,建議調高這個比例,可以減少頻繁對磁盤進行IO操作,合理調節該參數可以将性能提升10%左右.

spark.shuffle.manager(預設sort)該參數是設定shuffle的類型,預設是sort,也就是sortshuffleManager, hash參數對應HashShuffleManager, tungsten-sort參數對應tungsten(這個很少用),HashShuffleManager是以前的版本,這個預設就行,

spark.shuffle.sort.bypassMergeThreshold(預設200個)該參數是如果shuffle read task的數量小于等于200個的時候,在sortshufflemanager模式下,就會啟動ByPass sortshufflemanager...這個調優就這樣把 ,預設200挺好的.

spark.shuffle.consolidateFiles(預設為false)該參數隻對HashshuffleManager有效,而HashshuffleManager是spark1.2之前預設使用的版本...

繼續閱讀