天天看點

Spark性能調優

通常我們對一個系統進行性能優化無怪乎兩個步驟——性能監控和參數調整,本文主要分享的也是這兩方面内容。

性能監控工具

【Spark監控工具】

Spark提供了一些基本的Web監控頁面,對于日常監控十分有用。

1. Application Web UI

http://master:4040(預設端口是4040,可以通過spark.ui.port修改)可獲得這些資訊:(1)stages和tasks排程情況;(2)RDD大小及記憶體使用;(3)系統環境資訊;(4)正在執行的executor資訊。

2. history server

當Spark應用退出後,仍可以獲得曆史Spark應用的stages和tasks執行資訊,便于分析程式不明原因挂掉的情況。配置方法如下:

(1)$SPARK_HOME/conf/spark-env.sh

export SPARK_HISTORY_OPTS="-Dspark.history.retainedApplications=50

Dspark.history.fs.logDirectory=hdfs://hadoop000:8020/directory"

說明:spark.history.retainedApplica-tions僅顯示最近50個應用spark.history.fs.logDirectory:Spark History Server頁面隻展示該路徑下的資訊。

(2)$SPARK_HOME/conf/spark-defaults.conf

spark.eventLog.enabled true

spark.eventLog.dir hdfs://hadoop000:8020/directory #應用在運作過程中所有的資訊均記錄在該屬性指定的路徑下

3. spark.eventLog.compress true

(1)HistoryServer啟動

$SPARK_HOMR/bin/start-histrory-server.sh

(2)HistoryServer停止

$SPARK_HOMR/bin/stop-histrory-server.sh

4. ganglia

通過配置ganglia,可以分析叢集的使用狀況和資源瓶頸,但是預設情況下ganglia是未被打包的,需要在mvn編譯時添加-Pspark-ganglia-lgpl,并修改配置檔案$SPARK_HOME/conf/metrics.properties。

5. Executor logs

Standalone模式:$SPARK_HOME/logs

YARN模式:在yarn-site.xml檔案中配置了YARN日志的存放位置:yarn.nodemanager.log-dirs,或使用指令擷取yarn logs -applicationId。

【其他監控工具】

1. Nmon(http://www.ibm.com/developerworks/aix/library/au-analyze_aix/)

Nmon 輸入:c:CPU n:網絡 m:記憶體 d:磁盤

2. Jmeter(http://jmeter. apache.org/)

通常使用Jmeter做系統性能參數的實時展示,JMeter的安裝非常簡單,從官方網站上下載下傳,解壓之後即可使用。運作指令在%JMETER_HOME%/bin下,對于 Windows 使用者,直接使用jmeter.bat。

啟動jmeter:建立測試計劃,設定線程組設定循環次數。

添加監聽器:jp@gc - PerfMon Metrics Collector。

設定監聽器:監聽主機端口及監聽内容,例如CPU。

啟動監聽:可以實時獲得節點的CPU狀态資訊,從圖4可看出CPU已出現瓶頸。

3. Jprofiler(http://www.ej-technologies.com/products/jprofiler/overview.html)

JProfiler是一個全功能的Java剖析工具(profiler),專用于分析J2SE和J2EE應用程式。它把CPU、線程和記憶體的剖析組合在一個強大的應用中。JProfiler的GUI可以更友善地找到性能瓶頸、抓住記憶體洩漏(memory leaks),并解決多線程的問題。例如分析哪個對象占用的記憶體比較多;哪個方法占用較大的CPU資源等;我們通常使用Jprofiler來監控Spark應用在local模式下運作時的性能瓶頸和記憶體洩漏情況。

上述幾個工具可以直接通過提供的連結了解詳細的使用方法。

Spark調優

【Spark叢集并行度】

在Spark叢集環境下,隻有足夠高的并行度才能使系統資源得到充分的利用,可以通過修改spark-env.sh來調整Executor的數量和使用資源,Standalone和YARN方式資源的排程管理是不同的。

在Standalone模式下:

1. 每個節點使用的最大記憶體數:SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY;

2. 每個節點的最大并發task數:SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES。

在YARN模式下:

1. 叢集task并行度:SPARK_ EXECUTOR_INSTANCES* SPARK_EXECUTOR_CORES;

2. 叢集記憶體總量:(executor個數) * (SPARK_EXECUTOR_MEMORY+ spark.yarn.executor.memoryOverhead)+(SPARK_DRIVER_MEMORY+spark.yarn.driver.memoryOverhead)。

重點強調:Spark對Executor和Driver額外添加堆記憶體大小,Executor端:由spark.yarn.executor.memoryOverhead設定,預設值executorMemory * 0.07與384的最大值。Driver端:由spark.yarn.driver.memoryOverhead設定,預設值driverMemory * 0.07與384的最大值。

通過調整上述參數,可以提高叢集并行度,讓系統同時執行的任務更多,那麼對于相同的任務,并行度高了,可以減少輪詢次數。舉例說明:如果一個stage有100task,并行度為50,那麼執行完這次任務,需要輪詢兩次才能完成,如果并行度為100,那麼一次就可以了。

但是在資源相同的情況,并行度高了,相應的Executor記憶體就會減少,是以需要根據實際實況協調記憶體和core。此外,Spark能夠非常有效的支援短時間任務(例如:200ms),因為會對所有的任務複用JVM,這樣能減小任務啟動的消耗,Standalone模式下,core可以允許1-2倍于實體core的數量進行超配。

【Spark任務數量調整】

Spark的任務數由stage中的起始的所有RDD的partition之和數量決定,是以需要了解每個RDD的partition的計算方法。以Spark應用從HDFS讀取資料為例,HadoopRDD的partition切分方法完全繼承于MapReduce中的FileInputFormat,具體的partition數量由HDFS的塊大小、mapred.min.split.size的大小、檔案的壓縮方式等多個因素決定,詳情需要參見FileInputFormat的代碼。

【Spark記憶體調優】

記憶體優化有三個方面的考慮:對象所占用的記憶體,通路對象的消耗以及垃圾回收所占用的開銷。

1. 對象所占記憶體,優化資料結構

Spark 預設使用Java序列化對象,雖然Java對象的通路速度更快,但其占用的空間通常比其内部的屬性資料大2-5倍。為了減少記憶體的使用,減少Java序列化後的額外開銷,下面列舉一些Spark官網(http://spark.apache.org/docs/latest/tuning.html#tuning-data-structures)提供的方法。

(1)使用對象數組以及原始類型(primitive type)數組以替代Java或者Scala集合類(collection class)。fastutil 庫為原始資料類型提供了非常友善的集合類,且相容Java标準類庫。

(2)盡可能地避免采用含有指針的嵌套資料結構來儲存小對象。

(3)考慮采用數字ID或者枚舉類型以便替代String類型的主鍵。

(4)如果記憶體少于32GB,設定JVM參數-XX:+UseCom-pressedOops以便将8位元組指針修改成4位元組。與此同時,在Java 7或者更高版本,設定JVM參數-XX:+UseC-----ompressedStrings以便采用8比特來編碼每一個ASCII字元。

2. 記憶體回收

(1)擷取記憶體統計資訊:優化記憶體前需要了解叢集的記憶體回收頻率、記憶體回收耗費時間等資訊,可以在spark-env.sh中設定SPARK_JAVA_OPTS=“-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps $ SPARK_JAVA_OPTS”來擷取每一次記憶體回收的資訊。

(2)優化緩存大小:預設情況Spark采用運作記憶體(spark.executor.memory)的60%來進行RDD緩存。這表明在任務執行期間,有40%的記憶體可以用來進行對象建立。如果任務運作速度變慢且JVM頻繁進行記憶體回收,或者記憶體空間不足,那麼降低緩存大小設定可以減少記憶體消耗,可以降低spark.storage.memoryFraction的大小。

3. 頻繁GC或者OOM

針對這種情況,首先要确定現象是發生在Driver端還是在Executor端,然後在分别處理。

Driver端:通常由于計算過大的結果集被回收到Driver端導緻,需要調大Driver端的記憶體解決,或者進一步減少結果集的數量。

Executor端:

(1)以外部資料作為輸入的Stage:這類Stage中出現GC通常是因為在Map側進行map-side-combine時,由于group過多引起的。解決方法可以增加partition的數量(即task的數量)來減少每個task要處理的資料,來減少GC的可能性。

(2)以shuffle作為輸入的Stage:這類Stage中出現GC的通常原因也是和shuffle有關,常見原因是某一個或多個group的資料過多,也就是所謂的資料傾斜,最簡單的辦法就是增加shuffle的task數量,比如在SparkSQL中設定SET spark.sql.shuffle.partitions=400,如果調大shuffle的task無法解決問題,說明你的資料傾斜很嚴重,某一個group的資料遠遠大于其他的group,需要你在業務邏輯上進行調整,預先針對較大的group做單獨處理。

【修改序列化】

使用Kryo序列化,因為Kryo序列化結果比Java标準序列化更小,更快速。具體方法:spark-default.conf 裡設定spark.serializer為org.apache.spark.serializer.KryoSerializer 。

參考官方文檔(http://spark.apache.org/docs/latest/tuning.html#summary):對于大多數程式而言,采用Kryo架構以及序列化能夠解決性能相關的大部分問題。

【Spark 磁盤調優】

在叢集環境下,如果資料分布不均勻,造成節點間任務分布不均勻,也會導緻節點間源資料不必要的網絡傳輸,進而大大影響系統性能,那麼對于磁盤調優最好先将資料資源分布均勻。除此之外,還可以對源資料做一定的處理:

1. 在記憶體允許範圍内,将頻繁通路的檔案或資料置于記憶體中;

2. 如果磁盤充裕,可以适當增加源資料在HDFS上的備份數以減少網絡傳輸;

3. Spark支援多種檔案格式及壓縮方式,根據不同的應用環境進行合理的選擇。如果每次計算隻需要其中的某幾列,可以使用列式檔案格式,以減少磁盤I/O,常用的列式有parquet、rcfile。如果檔案過大,将原檔案壓縮可以減少磁盤I/O,例如:gzip、snappy、lzo。

【其他】

廣播變量(broadcast)

當task中需要通路一個Driver端較大的資料時,可以通過使用SparkContext的廣播變量來減小每一個任務的大小以及在叢集中啟動作業的消耗。參考官方文檔http://spark.apache.org/docs/latest/tuning.html#broadcasting-large-variables。

開啟推測機制

推測機制後,如果叢集中,某一台機器的幾個task特别慢,推測機制會将任務配置設定到其他機器執行,最後Spark會選取最快的作為最終結果。

在spark-default.conf 中添加:spark.speculation true

推測機制與以下幾個參數有關:

1. spark.speculation.interval 100:檢測周期,機關毫秒;

2. spark.speculation.quantile 0.75:完成task的百分比時啟動推測;

3. spark.speculation.multiplier 1.5:比其他的慢多少倍時啟動推測。

總結

Spark系統的性能調優是一個很複雜的過程,需要對Spark以及Hadoop有足夠的知識儲備。從業務應用平台(Spark)、存儲(HDFS)、作業系統、硬體等多個層面都會對性能産生很大的影響。借助于多種性能監控工具,我們可以很好地了解系統的性能表現,并根據上面介紹的經驗進行調整。

作者簡介:田毅,亞信科技大資料平台部門研發經理,Spark Contributor,北京Spark Meetup發起人,主要關注SparkSQL與Spark Streaming。

本文選自程式員電子版2015年3月A刊,該期更多文章請檢視這裡。2000年創刊至今所有文章目錄請檢視程式員封面秀。歡迎訂閱程式員電子版(含iPad版、Android版、PDF版)。

轉載自今日頭條 網址:http://toutiao.com/a4674690248/?tt_from=mobile_qq&iid=2512939059&app=news_article