天天看點

Spark資料傾斜解決方案

1、原理以及現象分析

1.1、資料傾斜怎麼出現的

在執行shuffle操作的時候,是按照key,來進行values的資料的輸出、拉取和聚合的。

同一個key的values,一定是配置設定到一個reduce task進行處理的。

多個key對應的values,比如一共是90萬。可能某個key對應了88萬資料,被配置設定到一個task上去面去執行。

另外兩個task,可能各配置設定到了1萬資料,可能是數百個key,對應的1萬條資料。

這樣就會出現資料傾斜問題。

想象一下,出現資料傾斜以後的運作的情況。很糟糕!

其中兩個task,各配置設定到了1萬資料,可能同時在10分鐘内都運作完了。另外一個task有88萬條,88 * 10 =  880分鐘 = 14.5個小時。

大家看,本來另外兩個task很快就運作完畢了(10分鐘),但是由于一個拖後腿的家夥,第三個task,要14.5個小時才能運作完,就導緻整個spark作業,也得14.5個小時才能運作完。

資料傾斜,一旦出現,是不是性能殺手?!

1.2、發生資料傾斜以後的現象

spark資料傾斜,有兩種表現:

1、你的大部分的task,都執行的特别特别快,(你要用client模式,standalone client,yarn client,本地機器一執行spark-submit腳本,就會開始列印log),task175 finished,剩下幾個task,執行的特别特别慢,前面的task,一般1s可以執行完5個,最後發現1000個task,998,999 task,要執行1個小時,2個小時才能執行完一個task。

出現以上loginfo,就表明出現資料傾斜了。

這樣還算好的,因為雖然老牛拉破車一樣非常慢,但是至少還能跑。

2、另一種情況是,運作的時候,其他task都執行完了,也沒什麼特别的問題,但是有的task,就是會突然間報了一個oom,jvm out of memory,記憶體溢出了,task failed,task lost,resubmitting task。反複執行幾次都到了某個task就是跑不通,最後就挂掉。

某個task就直接oom,那麼基本上也是因為資料傾斜了,task配置設定的數量實在是太大了!是以記憶體放不下,然後你的task每處理一條資料,還要建立大量的對象,記憶體爆掉了。

這樣也表明出現資料傾斜了。

這種就不太好了,因為你的程式如果不去解決資料傾斜的問題,壓根兒就跑不出來。

作業都跑不完,還談什麼性能調優這些東西?!

1.3、定位資料傾斜出現的原因與出現問題的位置

根據log去定位

出現資料傾斜的原因,基本隻可能是因為發生了shuffle操作,在shuffle的過程中,出現了資料傾斜的問題。因為某個或者某些key對應的資料,遠遠的高于其他的key。

1、你在自己的程式裡面找找,哪些地方用了會産生shuffle的算子,groupbykey、countbykey、reducebykey、join

2、看log

log一般會報是在你的哪一行代碼,導緻了oom異常。或者看log,看看是執行到了第幾個stage。spark代碼,是怎麼劃分成一個一個的stage的。哪一個stage生成的task特别慢,就能夠自己用肉眼去對你的spark代碼進行stage的劃分,就能夠通過stage定位到你的代碼,到底哪裡發生了資料傾斜。

2、聚合源資料以及過濾導緻傾斜的key

資料傾斜解決方案,第一個方案和第二個方案,一起來講。這兩個方案是最直接、最有效、最簡單的解決資料傾斜問題的方案。

第一個方案:聚合源資料。

第二個方案:過濾導緻傾斜的key。

後面的五個方案,尤其是最後4個方案,都是那種特别狂拽炫酷吊炸天的方案。但沒有第一二個方案簡單直接。如果碰到了資料傾斜的問題。上來就先考慮第一個和第二個方案看能不能做,如果能做的話,後面的5個方案,都不用去搞了。

有效、簡單、直接才是最好的,徹底根除了資料傾斜的問題。

2.1、方案一:聚合源資料

一些聚合的操作,比如groupbykey、reducebykey,groupbykey說白了就是拿到每個key對應的values。reducebykey說白了就是對每個key對應的values執行一定的計算。

這些操作,比如groupbykey和reducebykey,包括之前說的join。都是在spark作業中執行的。

spark作業的資料來源,通常是哪裡呢?90%的情況下,資料來源都是hive表(hdfs,大資料分布式存儲系統)。hdfs上存儲的大資料。hive表中的資料通常是怎麼出來的呢?有了spark以後,hive比較适合做什麼事情?hive就是适合做離線的,晚上淩晨跑的,etl(extract transform load,資料的采集、清洗、導入),hive sql,去做這些事情,進而去形成一個完整的hive中的資料倉庫。說白了,資料倉庫,就是一堆表。

spark作業的源表,hive表,通常情況下來說,也是通過某些hive etl生成的。hive etl可能是晚上淩晨在那兒跑。今天跑昨天的資料。

資料傾斜,某個key對應的80萬資料,某些key對應幾百條,某些key對應幾十條。現在咱們直接在生成hive表的hive etl中對資料進行聚合。比如按key來分組,将key對應的所有的values全部用一種特殊的格式拼接到一個字元串裡面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

對key進行group,在spark中,拿到key=sessionid,values<iterable>。hive etl中,直接對key進行了聚合。那麼也就意味着,每個key就隻對應一條資料。在spark中,就不需要再去執行groupbykey+map這種操作了。直接對每個key對應的values字元串進行map操作,進行你需要的操作即可。

spark中,可能對這個操作,就不需要執行shffule操作了,也就根本不可能導緻資料傾斜。

或者是對每個key在hive etl中進行聚合,對所有values聚合一下,不一定是拼接起來,可能是直接進行計算。reducebykey計算函數應用在hive etl中,進而得到每個key的values。

聚合源資料方案第二種做法是,你可能沒有辦法對每個key聚合出來一條資料。那麼也可以做一個妥協,對每個key對應的資料,10萬條。有好幾個粒度,比如10萬條裡面包含了幾個城市、幾天、幾個地區的資料,現在放粗粒度。直接就按照城市粒度,做一下聚合,幾個城市,幾天、幾個地區粒度的資料,都給聚合起來。比如說

city_id date area_id

select ... from ... group by city_id

盡量去聚合,減少每個key對應的數量,也許聚合到比較粗的粒度之後,原先有10萬資料量的key,現在隻有1萬資料量。減輕資料傾斜的現象和問題。

2.2、方案二:過濾導緻傾斜的key

如果你能夠接受某些資料在spark作業中直接就摒棄掉不使用。比如說,總共有100萬個key。隻有2個key是資料量達到10萬的。其他所有的key,對應的數量都是幾十萬。

這個時候,你自己可以去取舍,如果業務和需求可以了解和接受的話,在你從hive表查詢源資料的時候,直接在sql中用where條件,過濾掉某幾個key。

那麼這幾個原先有大量資料,會導緻資料傾斜的key,被過濾掉之後,那麼在你的spark作業中,自然就不會發生資料傾斜了。

3、提高shuffle操作reduce并行度

3.1、問題描述

第一個和第二個方案,都不适合做,然後再考慮這個方案。

将reduce task的數量變多,就可以讓每個reduce task配置設定到更少的資料量。這樣的話也許就可以緩解甚至是基本解決掉資料傾斜的問題。

3.2、提升shuffle reduce端并行度的操作方法

很簡單,主要給我們所有的shuffle算子,比如groupbykey、countbykey、reducebykey。在調用的時候,傳入進去一個參數。那個數字,就代表了那個shuffle操作的reduce端的并行度。那麼在進行shuffle操作的時候,就會對應着建立指定數量的reduce task。

這樣的話,就可以讓每個reduce task配置設定到更少的資料。基本可以緩解資料傾斜的問題。

比如說,原本某個task配置設定資料特别多,直接oom,記憶體溢出了,程式沒法運作,直接挂掉。按照log,找到發生資料傾斜的shuffle操作,給它傳入一個并行度數字,這樣的話,原先那個task配置設定到的資料,肯定會變少。就至少可以避免oom的情況,程式至少是可以跑的。

3.3、提升shuffle reduce并行度的缺陷

治标不治本的意思,因為它沒有從根本上改變資料傾斜的本質和問題。不像第一個和第二個方案(直接避免了資料傾斜的發生)。原理沒有改變,隻是說,盡可能地去緩解和減輕shuffle reduce task的資料壓力,以及資料傾斜的問題。

實際生産環境中的經驗:

1、如果最理想的情況下,提升并行度以後,減輕了資料傾斜的問題,或者甚至可以讓資料傾斜的現象忽略不計,那麼就最好。就不用做其他的資料傾斜解決方案了。

2、不太理想的情況下,比如之前某個task運作特别慢,要5個小時,現在稍微快了一點,變成了4個小時。或者是原先運作到某個task,直接oom,現在至少不會oom了,但是那個task運作特别慢,要5個小時才能跑完。

那麼,如果出現第二種情況的話,各位,就立即放棄第三種方案,開始去嘗試和選擇後面的四種方案。

4、使用随機key實作雙重聚合

4.1、使用場景

groupbykey、reducebykey比較适合使用這種方式。join咱們通常不會這樣來做,後面會講三種針對不同的join造成的資料傾斜的問題的解決方案。

4.2、解決方案

第一輪聚合的時候,對key進行打散,将原先一樣的key,變成不一樣的key,相當于是将每個key分為多組。

先針對多個組,進行key的局部聚合。接着,再去除掉每個key的字首,然後對所有的key進行全局的聚合。

對groupbykey、reducebykey造成的資料傾斜,有比較好的效果。

如果說,之前的第一、第二、第三種方案,都沒法解決資料傾斜的問題,那麼就隻能依靠這一種方式了。

5、将reduce join轉換為map join

5.1、使用方式

普通的join,那麼肯定是要走shuffle。既然是走shuffle,那麼普通的join就肯定是走的是reduce join。那怎麼将reduce join 轉換為mapjoin呢?先将所有相同的key,對應的value彙聚到一個task中,然後再進行join。

5.2、使用場景

這種方式适合在什麼樣的情況下來使用?

如果兩個rdd要進行join,其中一個rdd是比較小的。比如一個rdd是100萬資料,一個rdd是1萬資料。(一個rdd是1億資料,一個rdd是100萬資料)。

其中一個rdd必須是比較小的,broadcast出去那個小rdd的資料以後,就會在每個executor的block manager中都儲存一份。要確定你的記憶體足夠存放那個小rdd中的資料。

這種方式下,根本不會發生shuffle操作,肯定也不會發生資料傾斜。從根本上杜絕了join操作可能導緻的資料傾斜的問題。

對于join中有資料傾斜的情況,大家盡量第一時間先考慮這種方式,效果非常好。

不适合的情況

兩個rdd都比較大,那麼這個時候,你去将其中一個rdd做成broadcast,就很笨拙了。很可能導緻記憶體不足。最終導緻記憶體溢出,程式挂掉。

而且其中某些key(或者是某個key),還發生了資料傾斜。此時可以采用最後兩種方式。

對于join這種操作,不光是考慮資料傾斜的問題。即使是沒有資料傾斜問題,也完全可以優先考慮,用我們講的這種進階的reduce join轉map join的技術,不要用普通的join,去通過shuffle,進行資料的join。完全可以通過簡單的map,使用map join的方式,犧牲一點記憶體資源。在可行的情況下,優先這麼使用。

不走shuffle,直接走map,是不是性能也會高很多?這是肯定的。

6、sample采樣傾斜key單獨進行join

6.1、方案實作思路

将發生資料傾斜的key,單獨拉出來,放到一個rdd中去。就用這個原本會傾斜的key rdd跟其他rdd單獨去join一下,這個時候key對應的資料可能就會分散到多個task中去進行join操作。

就不至于說是,這個key跟之前其他的key混合在一個rdd中時,肯定是會導緻一個key對應的所有資料都到一個task中去,就會導緻資料傾斜。

6.2、使用場景

這種方案什麼時候适合使用?

優先對于join,肯定是希望能夠采用上一個方案,即reduce join轉換map join。兩個rdd資料都比較大,那麼就不要那麼搞了。

針對你的rdd的資料,你可以自己把它轉換成一個中間表,或者是直接用countbykey()的方式,你可以看一下這個rdd各個key對應的資料量。此時如果你發現整個rdd就一個,或者少數幾個key對應的資料量特别多。盡量建議,比如就是一個key對應的資料量特别多。

此時可以采用這種方案,單拉出來那個最多的key,單獨進行join,盡可能地将key分散到各個task上去進行join操作。

什麼時候不适用呢?

如果一個rdd中,導緻資料傾斜的key特别多。那麼此時,最好還是不要這樣了。還是使用我們最後一個方案,終極的join資料傾斜的解決方案。

就是說,咱們單拉出來了一個或者少數幾個可能會産生資料傾斜的key,然後還可以進行更加優化的一個操作。

對于那個key,從另外一個要join的表中,也過濾出來一份資料,比如可能就隻有一條資料。userid2infordd,一個userid key,就對應一條資料。

然後呢,采取對那個隻有一條資料的rdd,進行flatmap操作,打上100個随機數,作為字首,傳回100條資料。

單獨拉出來的可能産生資料傾斜的rdd,給每一條資料,都打上一個100以内的随機數,作為字首。

再去進行join,是不是性能就更好了。肯定可以将資料進行打散,去進行join。join完以後,可以執行map操作,去将之前打上的随機數給去掉,然後再和另外一個普通rdd join以後的結果進行union操作。

7、使用随機數以及擴容表進行join

7.1、使用場景及步驟

當采用随機數和擴容表進行join解決資料傾斜的時候,就代表着,你的之前的資料傾斜的解決方案,都沒法使用。

這個方案是沒辦法徹底解決資料傾斜的,更多的,是一種對資料傾斜的緩解。

步驟:

1、選擇一個rdd,要用flatmap,進行擴容,将每條資料,映射為多條資料,每個映射出來的資料,都帶了一個n以内的随機數,通常來說會選擇10。

2、将另外一個rdd,做普通的map映射操作,每條資料都打上一個10以内的随機數。

3、最後将兩個處理後的rdd進行join操作。

7.2、局限性

1、因為你的兩個rdd都很大,是以你沒有辦法去将某一個rdd擴的特别大,一般咱們就是10倍。

2、如果就是10倍的話,那麼資料傾斜問題的确是隻能說是緩解和減輕,不能說徹底解決。

sample采樣傾斜key并單獨進行join

将key,從另外一個rdd中過濾出的資料,可能隻有一條或者幾條,此時,咱們可以任意進行擴容,擴成1000倍。

将從第一個rdd中拆分出來的那個傾斜key rdd,打上1000以内的一個随機數。

這種情況下,還可以配合上,提升shuffle reduce并行度,join(rdd, 1000)。通常情況下,效果還是非常不錯的。

打散成100份,甚至1000份,2000份,去進行join,那麼就肯定沒有資料傾斜的問題了吧。

繼續閱讀