Spark調優
由于大部分Spark計算都是在記憶體中完成的,是以Spark程式的瓶頸可能由叢集中任意一種資源導緻,如:CPU、網絡帶寬、或者記憶體等。最常見的情況是,資料能裝進記憶體,而瓶頸是網絡帶寬;當然,有時候我們也需要做一些優化調整來減少記憶體占用,例如将RDD以序列化格式儲存(storing RDDs in serialized form)。本文将主要涵蓋兩個主題:1.資料序列化(這對于優化網絡性能極為重要);2.減少記憶體占用以及記憶體調優。同時,我們也會提及其他幾個比較小的主題。
資料序列化
序列化在任何一種分布式應用性能優化時都扮演幾位重要的角色。如果序列化格式序列化過程緩慢,或者需要占用位元組很多,都會大大拖慢整體的計算效率。通常,序列化都是Spark應用優化時首先需要關注的地方。Spark着眼于要達到便利性(允許你在計算過程中使用任何Java類型)和性能的一個平衡。Spark主要提供了兩個序列化庫:
- Java serialization: 預設情況,Spark使用Java自帶的ObjectOutputStream 架構來序列化對象,這樣任何實作了
java.io.Serializable 接口的對象,都能被序列化。同時,你還可以通過擴充
java.io.Externalizable 來控制序列化性能。Java序列化很靈活但性能較差,同時序列化後占用的位元組數也較多。
- Kryo serialization: Spark還可以使用Kryo 庫(版本2)提供更高效的序列化格式。Kryo的序列化速度和位元組占用都比Java序列化好很多(通常是10倍左右),但Kryo不支援所有實作了
接口的類型,它需要你在程式中 register 需要序列化的類型,以得到最佳性能。Serializable
要切換到使用 Kryo,你可以在 SparkConf 初始化的時候調用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。這個設定不僅控制各個worker節點之間的混洗資料序列化格式,同時還控制RDD存到磁盤上的序列化格式。目前,Kryo不是預設的序列化格式,因為它需要你在使用前注冊需要序列化的類型,不過我們還是建議在對網絡敏感的應用場景下使用Kryo。
Spark對一些常用的Scala核心類型(包括在Twitter chill 庫的AllScalaRegistrar中)自動使用Kryo序列化格式。
如果你的自定義類型需要使用Kryo序列化,可以用 registerKryoClasses 方法先注冊:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo的文檔(Kryo documentation )中有較長的描述了更多的進階選項,如:自定義序列化代碼等。
如果你的對象很大,你可能需要增大 spark.kryoserializer.buffer 配置項(config)。其值至少需要大于最大對象的序列化長度。
最後,如果你不注冊需要序列化的自定義類型,Kryo也能工作,不過每一個對象執行個體的序列化結果都會包含一份完整的類名,這有點浪費空間。
記憶體調優
記憶體占用調優主要需要考慮3點:1.資料占用的總記憶體(你多半會希望整個資料集都能裝進記憶體吧);2.通路資料集中每個對象的開銷;3.垃圾回收的開銷(如果你的資料集中對象周轉速度很快的話)。
一般,Java對象的通路時很快的,但同時Java對象會比原始資料(僅包含各個字段值)占用的空間多2~5倍。主要原因有:
- 每個Java對象都有一個對象頭(object header),對象頭大約占用16位元組,其中包含像其對應class的指針這樣的資訊。對于一些包含較少資料的對象(比如隻包含一個Int字段),這個對象頭可能比對象資料本身還大。
- Java字元串(String)有大約40子節點額外開銷(Java String以Char資料的形式儲存原始資料,是以需要一些額外的字段,如數組長度等),并且每個字元都以兩位元組的UTF-16編碼在内部儲存。是以,10個字元的String很容易就占了60位元組。
- 一些常見的集合類,如 HashMap、LinkedList,使用的是連結清單類資料結構,是以它們對每項資料都有一個包裝器。這些包裝器對象不僅其自身就有“對象頭”,同時還有指向下一個包裝器對象的連結清單指針(通常為8位元組)。
- 原始類型的集合通常也是以“裝箱”的形式包裝成對象(如:java.lang.Integer)。
本節隻是Spark記憶體管理的一個概要,下面我們會更詳細地讨論各種Spark記憶體調優的具體政策。特别地,我們會讨論如何評估資料的記憶體使用量,以及如何改進 – 要麼改變你的資料結構,要麼以某種序列化格式存儲資料。最後,我們還會讨論如何調整Spark的緩存大小,以及如何調優Java的垃圾回收器。
記憶體管理概覽
Spark中記憶體主要用于兩類目的:執行計算和資料存儲。執行計算的記憶體主要用于混洗(Shuffle)、關聯(join)、排序(sort)以及聚合(aggregation),而資料存儲的記憶體主要用于緩存和叢集内部資料傳播。Spark中執行計算和資料存儲都是共享同一個記憶體區域(M)。如果執行計算沒有占用記憶體,那麼資料存儲可以申請占用所有可用的記憶體,反之亦然。執行計算可能會搶占資料存儲使用的記憶體,并将存儲于記憶體的資料逐出記憶體,直到資料存儲占用的記憶體比例降低到一個指定的比例(R)。換句話說,R是M基礎上的一個子區域,這個區域的記憶體資料永遠不會被逐出記憶體。然而,資料存儲不會搶占執行計算的記憶體(否則實作太複雜了)。
這樣設計主要有這麼幾個需要考慮的點。首先,不需要緩存資料的應用可以把整個空間用來執行計算,進而避免頻繁地把資料吐到磁盤上。其次,需要緩存資料的應用能夠有一個資料存儲比例(R)的最低保證,也避免這部分緩存資料被全部逐出記憶體。最後,這個實作方式能夠在預設情況下,為大多數使用場景提供合理的性能,而不需要專家級使用者來設定記憶體使用如何劃分。
雖然有兩個記憶體劃分相關的配置參數,但一般來說,使用者不需要設定,因為預設值已經能夠适用于絕大部分的使用場景:
- spark.memory.fraction 表示上面M的大小,其值為相對于JVM堆記憶體的比例(預設0.75)。剩餘的25%是為其他使用者資料結構、Spark内部中繼資料以及避免OOM錯誤的安全預留白間(大量稀疏資料和異常大的資料記錄)。
- spark.memory.storageFraction 表示上面R的大小,其值為相對于M的一個比例(預設0.5)。R是M中專門用于緩存資料塊,且這部分資料塊永遠不會因執行計算任務而逐出記憶體。
評估記憶體消耗
确定一個資料集占用記憶體總量最好的辦法就是,建立一個RDD,并緩存到記憶體中,然後再到web UI上”Storage”頁面檢視。頁面上會展示這個RDD總共占用了多少記憶體。
要評估一個特定對象的記憶體占用量,可以用 SizeEstimator.estimate 方法。這個方法對試驗哪種資料結構能夠裁剪記憶體占用量比較有用,同時,也可以幫助使用者了解廣播變量在每個執行器堆上占用的記憶體量。
資料結構調優
減少記憶體消耗的首要方法就是避免過多的Java封裝(減少對象頭和額外輔助字段),比如基于指針的資料結構和包裝對象等。以下有幾條建議:
- 設計資料結構的時候,優先使用對象數組和原生類型,減少對複雜集合類型(如:HashMap)的使用。fastutil 提供了一些很友善的原聲類型集合,同時相容Java标準庫。
- 盡可能避免嵌套大量的小對象和指針。
- 對應鍵值應盡量使用數值型或枚舉型,而不是字元串型。
- 如果記憶體小于32GB,可以設定JVM标志參數 -XX:+UseCompressdOops 将指針設為4位元組而不是8位元組。你可以在
spark-env.sh 中設定這個參數。
序列化RDD存儲
如果經過上面的調整後,存儲的資料對象還是太大,那麼你可以試試将這些對象以序列化格式存儲,所需要做的隻是通過 RDD persistence API 設定好存儲級别,如:MEMORY_ONLY_SER。Spark會将RDD的每個分區以一個巨大的位元組數組形式存儲起來。以序列化格式存儲的唯一缺點就是通路資料會變慢一點,因為Spark需要反序列化每個被通路的對象。如果你需要序列化緩存資料,我們強烈建議你使用Kryo(using Kryo),和Java序列化相比,Kryo能大大減少序列化對象占用的空間(當然也比原始Java對象小很多)。
垃圾回收調優
JVM的垃圾回收在某些情況下可能會造成瓶頸,比如,你的RDD存儲經常需要“換入換出”(新RDD搶占了老RDD記憶體,不過如果你的程式沒有這種情況的話那JVM垃圾回收一般不是問題,比如,你的RDD隻是載入一次,後續隻是在這一個RDD上做操作)。當Java需要把老對象逐出記憶體的時候,JVM需要跟蹤所有的Java對象,并找出那些對象已經沒有用了。概括起來就是,垃圾回收的開銷和對象個數成正比,是以減少對象的個數(比如用 Int數組取代 LinkedList),就能大大減少垃圾回收的開銷。當然,一個更好的方法就如前面所說的,以序列化形式存儲資料,這時每個RDD分區都隻包含有一個對象了(一個巨大的位元組數組)。在嘗試其他技術方案前,首先可以試試用序列化RDD的方式(serialized caching)評估一下GC是不是一個瓶頸。
如果你的作業中各個任務需要的工作記憶體和節點上存儲的RDD緩存占用的記憶體産生沖突,那麼GC很可能會出現問題。下面我們将讨論一下如何控制好RDD緩存使用的記憶體空間,以減少這種沖突。
衡量GC的影響
GC調優的第一步是統計一下,垃圾回收啟動的頻率以及GC所使用的總時間。給JVM設定一下這幾個參數(參考Spark配置指南 – configuration guide,檢視Spark作業中的Java選項參數):-verbose:gc -XX:+PrintGCDetails,就可以在後續Spark作業的worker日志中看到每次GC花費的時間。注意,這些日志是在叢集worker節點上(在各節點的工作目錄下stdout檔案中),而不是你的驅動器所在節點。
進階GC調優
為了進一步調優GC,我們就需要對JVM記憶體管理有一個基本的了解:
- Java堆記憶體可配置設定的空間有兩個區域:新生代(Young generation)和老生代(Old generation)。新生代用以儲存生存周期短的對象,而老生代則是儲存生存周期長的對象。
- 新生代區域被進一步劃分為三個子區域:Eden,Survivor1,Survivor2。
- 簡要描述一下垃圾回收的過程:如果Eden區滿了,則啟動一輪minor GC回收Eden中的對象,生存下來(沒有被回收掉)的Eden中的對象和Survivor1區中的對象一并複制到Survivor2中。兩個Survivor區域是互相切換使用的(就是說,下次從Eden和Survivor2中複制到Survivor1中)。如果某個對象的年齡(每次GC所有生存下來的對象長一歲)超過某個門檻值,或者Survivor2(下次是Survivor1)區域滿了,則将對象移到老生代(Old區)。最終如果老生代也滿了,就會啟動full GC。
Spark GC調優的目标就是確定老生代(Old generation )隻儲存長生命周期RDD,而同時新生代(Young generation )的空間又能足夠儲存短生命周期的對象。這樣就能在任務執行期間,避免啟動full GC。以下是GC調優的主要步驟:
- 從GC的統計日志中觀察GC是否啟動太多。如果某個任務結束前,多次啟動了full GC,則意味着用以執行該任務的記憶體不夠。
- 如果GC統計資訊中顯示,老生代記憶體空間已經接近存滿,可以通過降低 spark.memory.storageFraction 來減少RDD緩存占用的記憶體;減少緩存對象總比任務執行緩慢要強!
- 如果major GC比較少,但minor GC很多的話,可以多配置設定一些Eden記憶體。你可以把Eden的大小設為高于各個任務執行所需的工作記憶體。如果要把Eden大小設為E,則可以這樣設定新生代區域大小:-Xmn=4/3*E。(放大4/3倍,主要是為了給Survivor區域保留白間)
- 舉例來說,如果你的任務會從HDFS上讀取資料,那麼單個任務的記憶體需求可以用其所讀取的HDFS資料塊的大小來評估。需要特别注意的是,解壓後的HDFS塊是解壓前的2~3倍大。是以如果我們希望保留3~4個任務并行的工作記憶體,并且HDFS塊大小為64MB,那麼可以評估Eden的大小應該設為 4*3*64MB。
- 最後,再觀察一下垃圾回收的啟動頻率和總耗時有沒有什麼變化。
我們的很多經驗表明,GC調優的效果和你的程式代碼以及可用的總記憶體相關。網上還有不少調優的選項說明(many more tuning options),但總體來說,就是控制好full GC的啟動頻率,就能有效減少垃圾回收開銷。
其他注意事項
并行度
一般來說叢集并不會滿負荷運轉,除非你吧每個操作的并行度都設得足夠大。Spark會自動根據對應的輸入檔案大小來設定“map”類算子的并行度(當然你可以通過一個SparkContext.textFile等函數的可選參數來控制并行度),而對于想 groupByKey 或reduceByKey這類 “reduce” 算子,會使用其各父RDD分區數的最大值。你可以将并行度作為建構RDD第二個參數(參考
spark.PairRDDFunctions
),或者設定 spark.default.parallelism 這個預設值。一般來說,評估并行度的時候,我們建議2~3個任務共享一個CPU。
Reduce任務的記憶體占用
如果RDD比記憶體要大,有時候你可能收到一個OutOfMemoryError,但其實這是因為你的任務集中的某個任務太大了,如reduce任務groupByKey。Spark的混洗(Shuffle)算子(sortByKey,groupByKey,reduceByKey,join等)會在每個任務中建構一個哈希表,以便在任務中對資料分組,這個哈希表有時會很大。最簡單的修複辦法就是增大并行度,以減小單個任務的輸入集。Spark對于200ms以内的短任務支援非常好,因為Spark可以跨任務複用執行器JVM,任務的啟動開銷很小,是以把并行度增加到比叢集中總CPU核數還多是沒有任何問題的。
廣播大變量
使用SparkContext中的廣播變量相關功能(broadcast functionality)能大大減少每個任務本身序列化的大小,以及叢集中啟動作業的開銷。如果你的Spark任務正在使用驅動器(driver)程式中定義的巨大對象(比如:靜态查詢表),請考慮使用廣播變量替代之。Spark會在master上将各個任務的序列化後大小列印出來,是以你可以檢查一下各個任務是否過大;通常來說,大于20KB的任務就值得優化一下。
資料本地性
資料本地性對Spark作業往往會有較大的影響。如果代碼和其所操作的資料在統一節點上,那麼計算速度肯定會更快一些。但如果二者不在一起,那必然需要挪動其中之一。一般來說,挪動序列化好的代碼肯定比挪動一大堆資料要快。Spark就是基于這個一般性原則來建構資料本地性的排程。
資料本地性是指代碼和其所處理的資料的距離。基于資料目前的位置,資料本地性可以劃分成以下幾個層次(按從近到遠排序):
- PROCESS_LOCAL 資料和運作的代碼處于同一個JVM程序内。
- NODE_LOCAL 資料和代碼處于同一節點。例如,資料處于HDFS上某個節點,而對應的執行器(executor)也在同一個機器節點上。這會比PROCESS_LOCAL稍微慢一些,因為資料需要跨程序傳遞。
- NO_PREF 資料在任何地方處理都一樣,沒有本地性偏好。
- RACK_LOCAL 資料和代碼處于同一個機架上的不同機器。這時,資料和代碼處于不同機器上,需要通過網絡傳遞,但還是在同一個機架上,一般也就通過一個交換機傳輸即可。
- ANY 資料在網絡中其他未知,即資料和代碼不在同一個機架上。
Spark傾向于讓所有任務都具有最佳的資料本地性,但這并非總是可行的。某些情況下,可能會出現一些空閑的執行器(executor)沒有待處理的資料,那麼Spark可能就會犧牲一些資料本地性。有兩種可能的選項:a)等待已經有任務的CPU,待其釋放後立即在同一台機器上啟動一個任務;b)立即在其他節點上啟動新任務,并把所需要的資料複制過去。
而通常,Spark會等待一小會,看看是否有CPU會被釋放出來。一旦等待逾時,則立即在其他節點上啟動并将所需的資料複制過去。資料本地性各個級别之間的回落逾時可以單獨配置,也可以在統一參數内一起設定;詳細請參考 configuration page 中的 spark.locality 相關參數。如果你的任務執行時間比較長并且資料本地性很差,你就應該試試調大這幾個參數,不過預設值一般都能适用于大多數場景了。
總結
本文是一個簡短的Spark調優指南,列舉了Spark應用調優一些比較重要的考慮點 – 最重要的就是,資料序列化和記憶體調優。對于絕大多數應用來說,用Kryo格式序列化資料能夠解決大多數的性能問題。如果您有其他關于性能調優最佳實踐的問題,歡迎郵件咨詢(Spark mailing list )。
該文轉自 http://ifeve.com/spark-tuning/
官方英文位址 http://spark.apache.org/docs/latest/tuning.html
作者:石山園 出處:http://www.cnblogs.com/shishanyuan/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。如果覺得還有幫助的話,可以點一下右下角的【推薦】,希望能夠持續的為大家帶來好的技術文章!想跟我一起進步麼?那就【關注】我吧。