新公司遇到的第一個spark的坑,尋找原因的過程其實還挺有意思,最終在源碼和spark ui上的統計資料的幫助下找到根源,具體如下。
先說下問題
由于嚴重的資料傾斜,大量資料集中在單個task中,導緻shuffle過程中發生異常
完整的exeception是這樣的
但奇怪的是,經過嘗試減小executor數量後任務反而成功,增大反而失敗,經過多次測試,問題穩定複現。
成功的executor數量是7,失敗的則是15,叢集的active node是7
這結果直接改變了認知,也沒爆記憶體,cpu也夠,怎麼會這樣,executor數量應該越多越快啊,居然還失敗了。
解決過程
這個數在幾個失敗裡不一樣,但是都超過了integer.maxvalue。在spark源碼中,這條異常發生在transportframedecoder這個類中
檢查發現是frame的大小超過了max_frame_size,而max_frame_size的大小定義如下
這個transportframedecoder繼承自channelinboundhandleradapter,這是netty裡的類,好了,看到這就明白了,原來錯誤發生在網絡傳輸過程中,是資料量超大了。
但是對比了成功與失敗的任務,都是單個task嚴重傾斜啊。再看下兩個任務的executor配置設定。
失敗的任務
成功的任務
失敗的任務裡,配置設定到的節點上都有多個executor;成功的任務裡則每個節點隻有一個executor。
再看下stage,失敗的任務失敗在stage26,這個stage依賴于stage24。看圖說話
兩個任務的stage24都是成功的,看下24的executor的資料量情況
可以看到,兩個任務在這個stage上由于資料傾斜,所有資料輸入輸出都在一個executor中完成。但在stage26中,差別來了
為了提升性能,在hadoop和spark中都會盡量選擇資料本地性,盡量讓資料local,不行再選擇rack等其他方案。而24的輸出會作為26的輸入。是以24之後自然會選擇相同節點上的executor,看下stage26的情況
在成功的任務裡,stage26與24的executor完全是同一個,這樣資料是完全本地化的,甚至是同一個程序,因而經過優化不再需要通過網絡傳輸
而在失敗的任務裡,stage26在執行時發現這個node上有3個executor,為了性能的提升,将資料配置設定給3個executor執行計算。可見其中也成功了一半,32686這個端口的executor是24中執行的那個,因而雖然它要處理3.3g的資料,但是因為不需要網絡傳輸,也仍然可以成功。可是對于另外兩個,即使是同一個節點,但是由于是不同程序,仍然需要通過netty的server拉取資料,而這一次的拉取最大不能超過int最大值,因而失敗了一個,導緻整個stage失敗,也就導緻了整個job的失敗。
總結
由此可見在資料極度傾斜的情況下,增大executor的數量未見得是好事,還是要根據具體情況而來。減小了數量解決了問題,但是這其實并不是最好的解決方案,因為這種情況下,可見資料基本等同于本地執行,完全浪費了叢集的并發性,更好的解決方案還需要再繼續深入了解。