天天看點

Spark調優Spark調優:

Spark調優:

總共分為四點:

1.開發調優 2.資源調優 3.資料傾斜 4.shuffle

1.開發調優

1)避免建立重複的RDD(不包含資料,抽象描述)

如果是需要對一個檔案進行多次計算,那麼注意,最好就隻讀一次。RDD:不可變可分區的彈性分布式資料集。

2)盡可能複用同一個RDD

3)對多次使用的RDD進行持久化(cache persist)(記憶體或磁盤)

rdd1.map.reduce.sort.map()

rdd1.map.reduce.groupByKey()

改進:

rdd2.cache

rdd2.sort.map()

rdd2.groupByKey()

程式運作過程中的資料放置在記憶體。如果程式執行完畢,那麼肯定中間産生的資料會被垃圾回收。

4)盡量避免使用shuffle類算子

shuffle的操作有一個特點:

上一個階段必須全部執行完畢之後,下一個階段才能執行

任務不同步(有的任務運作的時間快,有的慢)

分布式計算,決定了一定會有shuffle

shuffle算子: reduceByKey,groupBy,sortBy,distinct,

集合類的操作基本都有shuffle算子

在mapreduce當中,我們知道如果定義mapreduce的map join實作程式

在spark中,如何實作?

BroadCast 使用廣播變量

rdd1.join(rdd2)

val bc=sparkContext.broadCast(rdd.tolist)
 rdd1.foreachPartition(data =>{
   val data2=bc.value  //小表資料
     val data1=data     //大表資料的其中一個分區
   data1.join(data2)  //這句代碼,是在每個節點上獨立運作的
})
           

5)使用map-side預聚合的shuffle操作

shuffle類的算子有第三分缺點:資料傾斜

目的:在使用shuffle操作的算子的時候,如果有map-side預聚合的話,那麼shuffle的代價還是很小的

哪些shuffle算子沒有map-side:

groupByKey 有shuffle 沒有聚合

coGroup :隻做分組,不做聚合

再看看有聚合的shuffle算子:

執行流程上: reduceByKey =groupByKey+reduce

最終的效率上 reduceByKey> groupByKey +reduce

reduceBuyKey ,combineByKey,aggregateByKey

6)使用高性能的算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey

    2.使用mapPartitions替代普通map(粒度越大越好)

    原理:如果一個操作能針對partition完成,就不要針對單個元素。

    3.使用foreachPartitions替代foreach

    4.使用filter之後進行coalesce(重新分區)操作

    5.使用repartitionAndSortWithinPartitions替代repartition與sort類操作

7)廣播大變量

目的:讓多個task都要使用的在driver中聲明的變量都要維持一個獨立副本,程式設計讓這個executor中所有task都公用一個副本

效果;executor的記憶體占用量就減少了,網絡傳輸少了

原則:要廣播的資料越大,進行廣播這個操作之後得到的收益越好,如果executor的并發度越高,其實執行廣播操作,收益越好。

8)使用Kryo優化序列化性能

序列化:簡單的說,就是把一個對象變成一個010101數組

反序列化:把0101010這種格式的數還原成一個對象

java 中建立對象的方式:

java:實作序列化;

讓參與序列化的類型implements Serializable

有個缺點:

除了把目前這個對象的屬性的值給存儲/攜帶之外

還會把目前這個對象的類型的資訊都攜帶

mapreduce:

序列化:自定義了規則:

對于類型資訊,隻會攜帶一次

spark的序列化:

1.預設情況下,是支援java的原生序列化的機制

2.使用kyriserializer

9)優化資料結構

java中,有三種類型比較耗費記憶體:

字元串

對象

集合

優化措施:

使用原始類型(Int,Long)(基本類型)替代字元串String char 一串字元 char[]

盡量使用字元串替代對象

使用數組代替集合

10)融彙貫通

2.資源調優

1.概述:

在開發完Spark作業之後,就該為作業配置合适的資源了,Spark的資源參數基本都可以在spark-submit指令中作為參數設定。是以,我們必須對Spark中的資源使用原理有一個清晰的認識,并知道,有哪些資源參數可以設定的,以及如何設定合适的參數值。

2.Spark作業基本運作原理

Spark調優Spark調優:

Executor的記憶體主要分為三塊:

第一塊是讓task執行我們自己編寫的代碼時使用,預設是占Executor總記憶體的20%;

第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出後,進行聚合等操作時使用,預設也是占Executor總記憶體的20%;

第三塊是讓RDD持久化時使用,預設占Executor總記憶體的60%。

3.資源參數調優

1)num-executor

說明:該參數用于設定Spark作業總共要用多少個Executor進行來執行。這個參數非常重要,如果不設定的話,預設隻會給你啟動少量的Executor程序,此時你的spark作業運作的速度非常慢。

建議:根據任務大小,和叢集大小取設定。

2)executor-memeory

說明:該參數用于設定每個Executor程序的記憶體。

建議:,具體的設定還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大記憶體限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大記憶體量的。此外,如果你是跟團隊裡其他人共享這個資源隊列,那麼申請的記憶體量最好不要超過資源隊列最大總記憶體的1/3~1/2,避免你自己的Spark作業占用了隊列所有的資源,導緻别的同學的作業無法運作。

3)executor-cores

說明:該參數用于設定每個Executor程序的CPU core數量,這個參數決定了每個Executor程序并行執行task線程的能力。

建議:如果是跟他人共享這個隊列,那麼num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合适,也是避免影響其他同學的作業運作。

4)dirver-memory

說明:該參數用于設定Driver程序的記憶體

建議:不設定或設定1G左右應該就夠了。注意:如果需要使用collect算子将RDD的資料全部拉取到Driver上進行處理,那麼必須確定Driver的記憶體足夠大,否則會出現OOM記憶體溢出的問題。

5)spark.default.parallelism

說明:該參數用于設定每個stage的預設task數量,這個參數極為重要,如果不設定可能直接影響你的Spark作業性能。

建議:該參數設定為num-executors*executor-cores的2-3倍較為合适。

6)spark.storage.memeoryFraction

說明:該參數用于設定RDD持久化資料在Executor記憶體中能占的比列,預設是0.6。

建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以适當提高一些,保證持久化的資料能夠容納在記憶體中。避免記憶體不夠緩存所有的資料,導緻資料隻能寫入磁盤中,降低了性能

7)spark.shuffle.memeoryFraction

說明:該參數用于設定shuffle過程中一個task拉取上個stage的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設是0.2。

建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的記憶體占比,提高shuffle操作的記憶體占比比例

3.資料傾斜調優

1.概述:

資料傾斜是大資料技術中程序遇到的問題,會影響spark作業的性能。資料傾斜調優,就是使用各種技術方案解決不同類型的資料傾斜問題,以保證Spark作業的性能。

2.資料傾斜發生時的現象

1).有的task執行的很快,有的task執行的很慢,

2).原本正常執行的spark作業,某天突然報出OOM(記憶體溢出)異常,觀察異常棧,是我門寫業務代碼造成的。

3.資料傾斜發生的原理

在進行shuffle的時候,必須将各個節點上相同的key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或join等操作。此時如果某個key對應的資料量特别大的話,就會發生資料傾斜。

4.如何定位導出資料傾斜的代碼

資料傾斜隻會發生在shuffle過程中。

可能會觸發shuffle操作的算子:distinct、groupByKey、aggreateByKey、join、cogroup、repartition。

1)某個task執行特别慢的情況

首先要看的,就是資料傾斜發生在第幾個stage中。

如果是用yarn-client模式送出,那麼本地是直接可以看到log的,可以在log中找到目前運作到了第幾個stage;如果是用yarn-cluster模式送出,則可以通過Spark Web UI來檢視目前運作到了第幾個stage。此外,無論是使用yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI上深入看一下目前這個stage各個task配置設定的資料量,進而進一步确定是不是task配置設定的資料不均勻導緻了資料傾斜。

2)某個task莫名其妙記憶體溢出的情況

這種情況下去定位出問題的代碼就比較容易了。我們建議直接看yarn-client模式下本地log的異常棧,或者是通過YARN檢視yarn-cluster模式下的log中的異常棧。一般來說,通過異常棧資訊就可以定位到你的代碼中哪一行發生了記憶體溢出。然後在那行代碼附近找找,一般也會有shuffle類算子,此時很可能就是這個算子導緻了資料傾斜。

5.檢視導緻資料傾斜的key的資料分布情況

目的:檢視一下其中key的分布情況。這主要是為之後選擇哪一種技術方案提供依據。

1)如果是Spark SQL中的group by、join語句導緻的資料傾斜,那麼就查詢一個SQL使用的表的Key分布情況。

2)如果對Spark RDD執行的shuffle算子導緻的資料傾斜,那麼 可以在Spark作業中加入檢視key分布的代碼,比如RDD.countByKey()。然後統計出來的各個key出現的次數,collect/take到用戶端列印一下。

6.解決方案

解決方案一:使用Hive ETL預處理資料

1)适用場景:導緻資料傾斜的是Hive表。

如果該Hive表中的資料本身很不均勻,而且業務場景需要頻繁使用Spark對Hive表執行某個分析操作。

2)實作原理

Hive ETL中進行group by或者join等shuffle操作時,還是會出現資料傾斜,導緻Hive ETL的速度很慢。我們隻是把資料傾斜的發生提前到了Hive ETL中,避免Spark程式發生資料傾斜而已。

3)實踐經驗

在Java系統與Spark結合使用的項目中,會出現Java代碼頻繁調用Spark作業的場景,而且Spark作業的執行性能要求很高。

方法是:将資料傾斜提前提到上遊的Hive ETL,每天僅執行一次,隻是那一次比較慢的,而之後每次Java調用Spark作業時,執行資料很快,能夠提供更好的使用者體驗。

解決方案二、過濾少數導緻資料傾斜的Key

1)使用場景

如果發現導緻資料傾斜的key就少數幾個,而且對計算本身的影響并不大的話,那就很适合使用這種方案。

2)實作思路

直接過濾掉那少數幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter算子過濾掉這些key。

3)實際經驗

Hive表中的某一個key在那天資料異常,導緻資料量暴增。是以就采取每次執行前先進行采樣,計算出樣本中資料量最大的幾個key之後,直接在程式中将那些key給過濾掉。

解決方案三、提高shuffle操作的并行度

1)适用場景

這是處理資料傾斜最簡單的一種方案

2)實作思路

在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,比如reduceByKey(1000),該參數就設定;1這個shuffle算子在執行是shuffle read task的數量。

3)實踐經驗

在發現資料傾斜時嘗試使用的第一種手段,嘗試去用最簡單的方法緩解資料傾斜而已,或者是和其他方案結合起來使用。

解決方案四:兩階段聚合(局部聚合+全局聚合)

1)适用場景

對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較适用這種方案。

2)實作思路

第一是局部聚合,先給每個 key打上一個随機數,接着對打上随機數的資料執行reduceByKey等聚合操作,進行局部聚合,再把各個key的字首去掉,再次進行全局聚合。

3)優缺點

對于聚合類的shuffle操作導緻的資料傾斜,效果是非常不錯的。通常都可以解決掉資料傾斜,或者至少是大幅度緩解資料傾斜,将Spark作業的性能提升數倍以上。

僅僅适用于聚合類的shuffle操作,适用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

解決方案五、将reduce join 轉成map join

1)适用場景(大表join小表)

在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的資料量比較小,比較适合此方案。

2)實作思路

不使用join算子連接配接操作,而使用Broadcase變量與map類算子實作join操作,進而完全避免掉shuffle類的操作,

解決方案六:采樣傾斜key并拆分join操作

1)适用場景(大表 join 大表)and (少數幾個key出現資料傾斜)

如果資料量都比較大,無法采用‘五方案’,那麼可以看一下兩個RDD/Hive表中的key分布情況。如果出現資料傾斜,其中某一個RDD出現資料傾斜,另一個RDD正常,那麼采用這個比較合适的。

解決方案七:使用随機字首和擴容RDD進行join

1)适用場景(大量的key出現資料傾斜)

如果在進行join操作時,RDD中有大量的key導緻資料傾斜,那麼進行分拆key也沒什麼意義,此時就隻能使用最後一種方案來解決問題了。

2)實作思路

該方案的實作思路基本和“解決方案六”類似,首先檢視RDD/Hive表中的資料分布情況,找到那個造成資料傾斜的RDD/Hive表,比如有多個key都對應了超過1萬條資料。

然後将該RDD的每條資料都打上一個n以内的随機字首。

時對另外一個正常的RDD進行擴容,将每條資料都擴容成n條資料,擴容出來的每條資料都依次打上一個0~n的字首。

最後将兩個處理後的RDD進行join即可。

3)實踐經驗

曾經開發一個資料需求的時候,發現一個join導緻了資料傾斜。優化之前,作業的執行時間大約是60分鐘左右;使用該方案優化之後,執行時間縮短到10分鐘左右,性能提升了6倍

解決方案八:多種方案組合使用

我們針對出現了多個資料傾斜環節的Spark作業,可以先運用解決方案一和二,預處理一部分資料,并過濾一部分資料來緩解;其次可以對某些shuffle操作提升并行度,優化其性能;最後還可以針對不同的聚合或join操作,選擇一種方案來優化其性能。

4.shuffle調優

1.概述:

大多數Spark作業的性能主要就是消耗在了shuffle環節,因為該環節包含了大量的磁盤IO、序列化、網絡資料傳輸等操作。是以,如果要讓作業的性能更上一層樓,就有必要對shuffle過程進行調優。

2.shuffle相關參數調優

1)spark.shuffle.file.buffer

預設值 : 32K

參數說明:該參數用于設定shuffle write task的BufferedOutputStream的buffer緩沖大小。将資料寫到磁盤檔案之前,會先寫入buffer緩沖中,待緩沖寫滿之後,才會溢寫到磁盤。

建議:如果作業可用的記憶體資源較為充足的話,可以适當增加這個參數的大小(比如64k)

2)spark.reducer.maxSizeInFlight

預設值: 48m

說明:該參數用于設定shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少資料。

建議:如果作業可用的記憶體資源較為充足的話,可以适當增加這個參數的大小(比如96m),進而減少拉取資料的次數,

3)spark.shuffle.io.maxRetries

預設值: 3

說明:該參數就代表了可以重試的最大次數。如果在指定次數之内拉取還是沒有成功,就可能會導緻作業執行失敗。

建議:對于那些包含了特别耗時的shuffle操作的作業,建議增加重試最大次數(比如60次)。

4)spark.shuffle.io.retryWait

預設值:5s

參數說明:具體解釋同上,該參數代表了每次重試拉取資料的等待間隔,預設是5s。

調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。

繼續閱讀