天天看點

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

Spark學習: 簡述總結

Spark 是使用 scala 實作的基于記憶體計算的大資料開源叢集計算環境.提供了 java,scala, python,R 等語言的調用接口.
  • Spark學習 簡述總結
    • 引言
      • 1 Hadoop 和 Spark 的關系
    • Spark 系統架構
      • 1 spark 運作原理
    • RDD 初識
    • shuffle 和 stage
    • 性能優化
      • 1 緩存機制和 cache 的意義
      • 2 shuffle 的優化
      • 3 資源參數調優
      • 4 小結
    • 本地搭建 Spark 開發環境
      • 1 Spark-Scala-IntelliJ
      • 2 Spark-Notebook 開發環境
    • 參考文獻

1 引言

1.1 Hadoop 和 Spark 的關系

   Google 在 2003 年和 2004 年先後發表了 Google 檔案系統 GFS 和 MapReduce 程式設計模型兩篇文章,. 基于這兩篇開源文檔,06 年 Nutch 項目子項目之一的 Hadoop 實作了兩個強有力的開源産品:HDFS 和 MapReduce. Hadoop 成為了典型的大資料批量處理架構,由 HDFS 負責靜态資料的存儲,并通過 MapReduce 将計算邏輯配置設定到各資料節點進行資料計算和價值發現.之後以 HDFS 和 MapReduce 為基礎建立了很多項目,形成了 Hadoop 生态圈.

  而 Spark 則是UC Berkeley AMP lab (加州大學伯克利分校AMP實驗室)所開源的類Hadoop MapReduce的通用并行架構, 專門用于大資料量下的疊代式計算.是為了跟 Hadoop 配合而開發出來的,不是為了取代 Hadoop, Spark 運算比 Hadoop 的 MapReduce 架構快的原因是因為 Hadoop 在一次 MapReduce 運算之後,會将資料的運算結果從記憶體寫入到磁盤中,第二次 Mapredue 運算時在從磁盤中讀取資料,是以其瓶頸在2次運算間的多餘 IO 消耗. Spark 則是将資料一直緩存在記憶體中,直到計算得到最後的結果,再将結果寫入到磁盤,是以多次運算的情況下, Spark 是比較快的. 其優化了疊代式工作負載1.

具體差別如下:

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

  伯克利大學将 Spark 的整個生态系統成為 伯克利資料分析棧(BDAS),在核心架構 Spark 的基礎上,主要提供四個範疇的計算架構:

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結
  • Spark SQL: 提供了類 SQL 的查詢,傳回 Spark-DataFrame 的資料結構
  • Spark Streaming: 流式計算,主要用于處理線上實時時序資料
  • MLlib: 提供機器學習的各種模型和調優
  • GraphX: 提供基于圖的算法,如 PageRank

關于四個子產品更詳細的可以參見2這篇博文. 後面介紹的内容主要是關于 MLlib 子產品方面的.

  

Spark 的主要特點還包括:

  • (1)提供 Cache 機制來支援需要反複疊代計算或者多次資料共享,減少資料讀取的 IO 開銷;
  • (2)提供了一套支援 DAG 圖的分布式并行計算的程式設計架構,減少多次計算之間中間結果寫到 Hdfs 的開銷;
  • (3)使用多線程池模型減少 Task 啟動開稍, shuffle 過程中避免不必要的 sort 操作并減少磁盤 IO 操作。(Hadoop 的 Map 和 reduce 之間的 shuffle 需要 sort)

2 Spark 系統架構

首先明确相關術語3:

  • 應用程式(Application): 基于Spark的使用者程式,包含了一個Driver Program 和叢集中多個的Executor;
  • 驅動(Driver): 運作Application的main()函數并且建立SparkContext;
  • 執行單元(Executor): 是為某Application運作在Worker Node上的一個程序,該程序負責運作Task,并且負責将資料存在記憶體或者磁盤上,每個Application都有各自獨立的Executors;
  • 叢集管理程式(Cluster Manager): 在叢集上擷取資源的外部服務(例如:Local、Standalone、Mesos或Yarn等叢集管理系統);
  • 操作(Operation): 作用于RDD的各種操作分為Transformation和Action.

整個 Spark 叢集中,分為 Master 節點與 worker 節點,,其中 Master 節點上常駐 Master 守護程序和 Driver 程序, Master 負責将串行任務變成可并行執行的任務集Tasks, 同時還負責出錯問題處理等,而 Worker 節點上常駐 Worker 守護程序, Master 節點與 Worker 節點分工不同, Master 負載管理全部的 Worker 節點,而 Worker 節點負責執行任務.

  Driver 的功能是建立 SparkContext, 負責執行使用者寫的 Application 的 main 函數程序,Application 就是使用者寫的程式.

Spark 支援不同的運作模式,包括Local, Standalone,Mesoses,Yarn 模式.不同的模式可能會将 Driver 排程到不同的節點上執行.叢集管理模式裡, local 一般用于本地調試.

  每個 Worker 上存在一個或多個 Executor 程序,該對象擁有一個線程池,每個線程負責一個 Task 任務的執行.根據 Executor 上 CPU-core 的數量,其每個時間可以并行多個 跟 core 一樣數量的 Task4.Task 任務即為具體執行的 Spark 程式的任務.

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

5

2.1 spark 運作原理

一開始看不懂的話可以看完第三和第四章再回來看.

底層詳細細節介紹6:

  我們使用spark-submit送出一個Spark作業之後,這個作業就會啟動一個對應的Driver程序。根據你使用的部署模式(deploy-mode)不同,Driver程序可能在本地啟動,也可能在叢集中某個工作節點上啟動。而Driver程序要做的第一件事情,就是向叢集管理器(可以是Spark Standalone叢集,也可以是其他的資源管理叢集,美團•大衆點評使用的是YARN作為資源管理叢集)申請運作Spark作業需要使用的資源,這裡的資源指的就是Executor程序。YARN叢集管理器會根據我們為Spark作業設定的資源參數,在各個工作節點上,啟動一定數量的Executor程序,每個Executor程序都占有一定數量的記憶體和CPU core。

  在申請到了作業執行所需的資源之後,Driver程序就會開始排程和執行我們編寫的作業代碼了。Driver程序會将我們編寫的Spark作業代碼分拆為多個stage,每個stage執行一部分代碼片段,并為每個stage建立一批Task,然後将這些Task配置設定到各個Executor程序中執行。Task是最小的計算單元,負責執行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),隻是每個Task處理的資料不同而已。一個stage的所有Task都執行完畢之後,會在各個節點本地的磁盤檔案中寫入計算中間結果,然後Driver就會排程運作下一個stage。下一個stage的Task的輸入資料就是上一個stage輸出的中間結果。如此循環往複,直到将我們自己編寫的代碼邏輯全部執行完,并且計算完所有的資料,得到我們想要的結果為止。

  Spark是根據shuffle類算子來進行stage的劃分。如果我們的代碼中執行了某個shuffle類算子(比如reduceByKey、join等),那麼就會在該算子處,劃分出一個stage界限來。可以大緻了解為,shuffle算子執行之前的代碼會被劃分為一個stage,shuffle算子執行以及之後的代碼會被劃分為下一個stage。是以一個stage剛開始執行的時候,它的每個Task可能都會從上一個stage的Task所在的節點,去通過網絡傳輸拉取需要自己處理的所有key,然後對拉取到的所有相同的key使用我們自己編寫的算子函數執行聚合操作(比如reduceByKey()算子接收的函數)。這個過程就是shuffle。

  當我們在代碼中執行了cache/persist等持久化操作時,根據我們選擇的持久化級别的不同,每個Task計算出來的資料也會儲存到Executor程序的記憶體或者所在節點的磁盤檔案中。

  是以Executor的記憶體主要分為三塊:第一塊是讓Task執行我們自己編寫的代碼時使用,預設是占Executor總記憶體的20%;第二塊是讓Task通過shuffle過程拉取了上一個stage的Task的輸出後,進行聚合等操作時使用,預設也是占Executor總記憶體的20%;第三塊是讓RDD持久化時使用,預設占Executor總記憶體的60%。

  Task的執行速度是跟每個Executor程序的CPU core數量有直接關系的。一個CPU core同一時間隻能執行一個線程。而每個Executor程序上配置設定到的多個Task,都是以每個Task一條線程的方式,多線程并發運作的。如果CPU core數量比較充足,而且配置設定到的Task數量比較合理,那麼通常來說,可以比較快速和高效地執行完這些Task線程。

  以上就是Spark作業的基本運作原理的說明.

  在實際程式設計中,我們不需關心以上排程細節.隻需使用 Spark 提供的指定語言的程式設計接口調用相應的 API 即可.

  在 Spark API 中, 一個 應用(Application) 對應一個 SparkContext 的執行個體。一個 應用 可以用于單個 Job,或者分開的多個 Job 的 session,或者響應請求的長時間生存的伺服器。與 MapReduce 不同的是,一個 應用 的程序(我們稱之為 Executor),會一直在叢集上運作,即使當時沒有 Job 在上面運作。

  而調用一個Spark内部的 Action 會産生一個 Spark job 來完成它。 為了确定這些job實際的内容,Spark 檢查 RDD 的DAG再計算出執行 plan 。這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 資料已經緩存下來的 RDD),産生結果 RDD 的 Action 為結束 。并根據是否發生 shuffle 劃分 DAG 的 stage.

// parameter
val appName = "RetailLocAdjust"
val master = "local"   // 選擇模式
val conf = new SparkConf().setMaster(master).setAppName(appName)
// 啟動一個 SparkContext Application
val sc = new SparkContext(conf)
val rdd = sc.textFile("path/...")
           

  要啟動 Spark 運作程式主要有兩種方式:一種是使用 spark-submit 将腳本檔案送出,一種是打開 Spark 跟某種特定語言的解釋器,如:

  • spark-shell: 啟動了 Spark 的 scala 解釋器.
  • pyspark: 啟動了 Spark 的 python 解釋器.
  • sparkR: 啟動了 Spark 的 R 解釋器.

    (以上解釋器位于spark 的 bin 目錄下)

3 RDD 初識

  RDD(Resilent Distributed Datasets)俗稱彈性分布式資料集,是 Spark 底層的分布式存儲的資料結構,可以說是 Spark 的核心, Spark API 的所有操作都是基于 RDD 的. 資料不隻存儲在一台機器上,而是分布在多台機器上,實作資料計算的并行化.彈性表明資料丢失時,可以進行重建.在Spark 1.5版以後,新增了資料結構 Spark-DataFrame,仿造的 R 和 python 的類 SQL 結構-DataFrame, 底層為 RDD, 能夠讓資料從業人員更好的操作 RDD.

  在Spark 的設計思想中,為了減少網絡及磁盤 IO 開銷,需要設計出一種新的容錯方式,于是才誕生了新的資料結構 RDD. RDD 是一種隻讀的資料塊,可以從外部資料轉換而來,你可以對RDD 進行函數操作(Operation),包括 Transformation 和 Action. 在這裡隻讀表示當你對一個 RDD 進行了操作,那麼結果将會是一個新的 RDD, 這種情況放在代碼裡,假設變換前後都是使用同一個變量表示這一 RDD,RDD 裡面的資料并不是真實的資料,而是一些中繼資料資訊,記錄了該 RDD 是通過哪些 Transformation 得到的,在計算機中使用 lineage 來表示這種血緣結構,lineage 形成一個有向無環圖 DAG, 整個計算過程中,将不需要将中間結果落地到 HDFS 進行容錯,加入某個節點出錯,則隻需要通過 lineage 關系重新計算即可.

1). RDD 主要具有如下特點:

  • 1.它是在叢集節點上的不可變的、已分區的集合對象;
  • 2.通過并行轉換的方式來建立(如 Map、 filter、join 等);
  • 3.失敗自動重建;
  • 4.可以控制存儲級别(記憶體、磁盤等)來進行重用;
  • 5.必須是可序列化的;
  • 6.是靜态類型的(隻讀)。

2). RDD 的建立方式主要有2種:

- 并行化(Parallelizing)一個已經存在與驅動程式(Driver Program)中的集合如set、list;

- 讀取外部存儲系統上的一個資料集,比如HDFS、Hive、HBase,或者任何提供了Hadoop InputFormat的資料源.也可以從本地讀取 txt、csv 等資料集

3). RDD 的操作函數(operation)主要分為2種類型 Transformation 和 Action.

類别 函數 差別
Transformation Map,filter,groupBy,join, union,reduce,sort,partitionBy

傳回值還是 RDD

,不會馬上 送出 Spark 叢集運作
Action count,collect,take,save, show

傳回值不是 RDD

,會形成 DAG 圖,送出 Spark 叢集運作 并立即傳回結果

Transformation 操作不是馬上送出 Spark 叢集執行的,Spark 在遇到 Transformation 操作時隻會記錄需要這樣的操作,并不會去執行,需要等到有 Action 操作的時候才會真正啟動計算過程進行計算.針對每個 Action,Spark 會生成一個 Job, 從資料的建立開始,經過 Transformation, 結尾是 Action 操作.這些操作對應形成一個有向無環圖(DAG),形成 DAG 的先決條件是最後的函數操作是一個Action.

如下例子:

val arr = Array("cat", "dog", "lion", "monkey", "mouse")
// create RDD by collection
val rdd = sc.parallize(arr)    
// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))
val result = rdd2.collect()             
print(result)
// output:Array((d,1), (l,1), (m,2))
           

  首先,當你在解釋器裡一行行輸入的時候,實際上 Spark 并不會立即執行函數,而是當你輸入了

val result = rdd2.collect()

的時候, Spark 才會開始計算,從

sc.parallize(arr)

到最後的

collect

,形成一個 Job.

Created with Raphaël 2.1.0sc.parallizeMapgroupByMapcollect

4.shuffle 和 stage

shuffle 是劃分 DAG 中 stage 的辨別,同時影響 Spark 執行速度的關鍵步驟.

  RDD 的 Transformation 函數中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.窄依賴跟寬依賴的差別是是否發生 shuffle(洗牌) 操作.寬依賴會發生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴于其他分片,能夠獨立計算得到結果,寬依賴指子 RDD 的各個分片會依賴于父RDD 的多個分片,是以會造成父 RDD 的各個分片在叢集中重新分片, 看如下兩個示例:

// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))
           

  第一個 Map 操作将 RDD 裡的各個元素進行映射, RDD 的各個資料元素之間不存在依賴,可以在叢集的各個記憶體中獨立計算,也就是并行化,第二個 groupby 之後的 Map 操作,為了計算相同 key 下的元素個數,需要把相同 key 的元素聚集到同一個 partition 下,是以造成了資料在記憶體中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗時的操作,應盡量避免不必要的 shuffle.

  寬依賴主要有兩個過程: shuffle write 和 shuffle fetch. 類似 Hadoop 的 Map 和 Reduce 階段.shuffle write 将 ShuffleMapTask 任務産生的中間結果緩存到記憶體中, shuffle fetch 獲得 ShuffleMapTask 緩存的中間結果進行 ShuffleReduceTask 計算,這個過程容易造成OutOfMemory.

  shuffle 過程記憶體配置設定使用 ShuffleMemoryManager 類管理,會針對每個 Task 配置設定記憶體,Task 任務完成後通過 Executor 釋放空間.這裡可以把 Task 了解成不同 key 的資料對應一個 Task. 早期的記憶體配置設定機制使用公平配置設定,即不同 Task 配置設定的記憶體是一樣的,但是這樣容易造成記憶體需求過多的 Task 的 OutOfMemory, 進而造成多餘的 磁盤 IO 過程,影響整體的效率.(例:某一個 key 下的資料明顯偏多,但因為大家記憶體都一樣,這一個 key 的資料就容易 OutOfMemory).1.5版以後 Task 共用一個記憶體池,記憶體池的大小預設為 JVM 最大運作時記憶體容量的16%,配置設定機制如下:假如有 N 個 Task,ShuffleMemoryManager 保證每個 Task 溢出之前至少可以申請到1/2N 記憶體,且至多申請到1/N,N 為目前活動的 shuffle Task 數,因為N 是一直變化的,是以 manager 會一直追蹤 Task 數的變化,重新計算隊列中的1/N 和1/2N.但是這樣仍然容易造成記憶體需要多的 Task 任務溢出,是以最近有很多相關的研究是針對 shuffle 過程記憶體優化的.

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

如下 DAG 流程圖中,分别讀取資料,經過處理後 join 2個 RDD 得到結果:

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

在這個圖中,根據是否發生 shuffle 操作能夠将其分成如下的 stage 類型:

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

(join 需要針對同一個 key 合并,是以需要 shuffle)

  運作到每個 stage 的邊界時,資料在父 stage 中按照 Task 寫到磁盤上,而在子 stage 中通過網絡按照 Task 去讀取資料。這些操作會導緻很重的網絡以及磁盤的I/O,是以 stage 的邊界是非常占資源的,在編寫 Spark 程式的時候需要盡量避免的 。父 stage 中 partition 個數與子 stage 的 partition 個數可能不同,是以那些産生 stage 邊界的 Transformation 常常需要接受一個 numPartition 的參數來覺得子 stage 中的資料将被切分為多少個 partition7。

PS:shuffle 操作的時候可以用 combiner 壓縮資料,減少 IO 的消耗

5.性能優化

主要是我之前寫腳本的時候踩過的一些坑和在網上看到的比較好的調優的方法.

5.1 緩存機制和 cache 的意義

  Spark中對于一個RDD執行多次算子(函數操作)的預設原理是這樣的:每次你對一個RDD執行一個算子操作時,都會重新從源頭處計算一遍,計算出那個RDD來,然後再對這個RDD執行你的算子操作。這種方式的性能是很差的。

是以對于這種情況,我們的建議是:對多次使用的RDD進行持久化。

  首先要認識到的是, .Spark 本身就是一個基于記憶體的疊代式計算,是以如果程式從頭到尾隻有一個 Action 操作且子 RDD 隻依賴于一個父RDD 的話,就不需要使用 cache 這個機制, RDD 會在記憶體中一直從頭計算到尾,最後才根據你的 Action 操作傳回一個值或者儲存到相應的磁盤中.需要 cache 的是當存在多個 Action 操作或者依賴于多個 RDD 的時候, 可以在那之前緩存RDD. 如下:

val rdd = sc.textFile("path/to/file").Map(...).filter(...)
val rdd1 = rdd.Map(x => x+1)
val rdd2 = rdd.Map(x => x+100)
val rdd3 = rdd1.join(rdd2)
rdd3.count()
           

  在這裡 有2個 RDD 依賴于 rdd, 會形成如下的 DAG 圖:

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

  是以可以在 rdd 生成之後使用 cache 函數對 rdd 進行緩存,這次就不用再從頭開始計算了.緩存之後過程如下:

Spark : spark 原理簡述與 shuffle 過程介紹 & PerfectSpark學習: 簡述總結

  除了 cache 函數外,緩存還可以使用 persist, cache 是使用的預設緩存選項,一般預設為Memory_only(記憶體中緩存), persist 則可以在緩存的時候選擇任意一種緩存類型.事實上, cache 内部調用的是預設的 persist.

持久化的類型8如下:

持久化級别 含義解釋
MEMORY_ONLY 使用未序列化的Java對象格式,将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,則資料可能就不會進行持久化。那麼下次對這個RDD執行算子操作時,那些沒有被持久化的資料,需要從源頭處重新計算一遍。這是預設的持久化政策,使用cache()方法時,實際就是使用的這種持久化政策。
MEMORY_AND_DISK 使用未序列化的Java對象格式,優先嘗試将資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會将資料寫入磁盤檔案中,下次對這個RDD執行算子時,持久化在磁盤檔案中的資料會被讀取出來使用。
MEMORY_ONLY_SER 基本含義同MEMORY_ONLY。唯一的差別是,會将RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組數組。這種方式更加節省記憶體,進而可以避免持久化的資料占用過多記憶體導緻頻繁GC。
MEMORY_AND_DISK_SER 基本含義同MEMORY_AND_DISK。唯一的差別是,會将RDD中的資料進行序列化,RDD的每個partition會被序列化成一個位元組數組。這種方式更加節省記憶體,進而可以避免持久化的資料占用過多記憶體導緻頻繁GC。
DISK_ONLY 使用未序列化的Java對象格式,将資料全部寫入磁盤檔案中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. 對于上述任意一種持久化政策,如果加上字尾_2,代表的是将每個持久化的資料,都複制一份副本,并将副本儲存到其他節點上。這種基于副本的持久化機制主要用于進行容錯。假如某個節點挂掉,節點的記憶體或磁盤中的持久化資料丢失了,那麼後續對RDD計算時還可以使用該資料在其他節點上的副本。如果沒有副本的話,就隻能将這些資料從源頭處重新計算一遍了。

  是否進行序列化和磁盤寫入,需要充分考慮所配置設定到的記憶體資源和可接受的計算時間長短,序列化會減少記憶體占用,但是反序列化會延長時間,磁盤寫入會延長時間,但是會減少記憶體占用,也許能提高計算速度.此外要認識到:cache 的 RDD 會一直占用記憶體,當後期不需要再依賴于他的反複計算的時候,可以使用 unpersist 釋放掉.

5.2 shuffle 的優化

  我們前面說過,進行 shuffle 操作的是是很消耗系統資源的,需要寫入到磁盤并通過網絡傳輸,有時還需要對資料進行排序.常見的 Transformation 操作如:repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 Transformation 都需要 shuffle 資料9,合理的選用操作将降低 shuffle 操作的成本,提高運算速度.具體如下:

- 當進行聯合的規約操作時,避免使用 groupByKey。舉個例子,rdd.groupByKey().mapValues(_ .sum) 與 rdd.reduceByKey(_ + _) 執行的結果是一樣的,但是前者需要把全部的資料通過網絡傳遞一遍,而後者隻需要根據每個 key 局部的 partition 累積結果,在 shuffle 的之後把局部的累積值相加後得到結果.

- 當輸入和輸入的類型不一緻時,避免使用 reduceByKey。舉個例子,我們需要實作為每一個key查找所有不相同的 string。一個方法是利用 map 把每個元素的轉換成一個 Set,再使用 reduceByKey 将這些 Set 合并起來10.

- 生成新列的時候,避免使用單獨生成一列再 join 回來的方式,而是直接在資料上生成.

- 當需要對兩個 RDD 使用 join 的時候,如果其中一個資料集特别小,小到能塞到每個 Executor 單獨的記憶體中的時候,可以不使用 join, 使用 broadcast 操作将小 RDD 複制廣播到每個 Executor 的記憶體裡 join.

(broadcast 的用法可以檢視官方 API 文檔)

關于 shuffle 更多的介紹可以檢視11這篇博文.

5.3 資源參數調優

這些參數主要在 spark-submit 送出的時候指定,或者寫在配置檔案中啟動.可以通過 spark-submit –help 檢視.

具體如下12:

參數 說明 調優建議
num-Executors 該參數用于設定Spark作業總共要用多少個Executor程序來執行。這個參數非常重要,如果不設定的話,預設隻會給你啟動少量的Executor程序,此時你的Spark作業的運作速度是非常慢的。 每個Spark作業的運作一般設定50~100個左右的Executor程序比較合适。設定的太少,無法充分利用叢集資源;設定的太多的話,大部分隊列可能無法給予充分的資源。
Executor-memory 該參數用于設定每個Executor程序的記憶體。Executor記憶體的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。 每個Executor程序的記憶體設定4G~8G較為合适。具體的設定還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大記憶體限制是多少,num-Executors乘以Executor-memory,就代表了你的Spark作業申請到的總記憶體量。此外,如果你是跟團隊裡其他人共享這個資源隊列,那麼申請的總記憶體量最好不要超過資源隊列最大總記憶體的1/3~1/2,避免你自己的Spark作業占用了隊列所有的資源,導緻别的同學的作業無法運作。
Executor-cores 用于設定每個Executor程序的CPU core數量。這個參數決定了每個Executor并行執行Task線程的能力。每個core同一時間隻能執行一個Task線程,是以每個Executor的core越多,越能夠快速地執行完配置設定給自己的所有Task線程。 Executor的CPU core數量設定為2~4個較為合适。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設定的Executor數量,來決定每個Executor程序可以配置設定到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那麼num-Executors * Executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合适
driver-memory 該參數用于設定Driver程序的記憶體。 Driver的記憶體通常來說不設定,或者設定1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect算子将RDD的資料全部拉取到Driver上進行處理,那麼必須確定Driver的記憶體足夠大,否則會出現OOM記憶體溢出的問題。
spark.default. parallelism 該參數用于設定每個stage的預設Task數量。這個參數極為重要,如果不設定可能會直接影響你的Spark作業性能。 Spark作業的預設Task數量為500~1000個較合适。如果不去設定這個參數,那麼就會導緻Spark自己根據底層HDFS的block數量來設定Task的數量,預設是一個HDFS block對應一個Task。通常來說,Spark預設設定的數量是偏少的(比如幾十個Task),如果Task數量偏少的話,就會導緻你前面設定好的Executor的參數都前功盡棄。即無論你的Executor程序/記憶體/CPU有多大,但是Task隻有幾個,那麼90%的Executor程序可能根本就沒有Task執行,也就白白浪費了資源此Spark官網建議的設定原則是,設定該參數為num-Executors * Executor-cores的2~3倍較為合适,比如Executor的總CPU core數量為300個,那麼設定1000個Task是可以的,可以充分地利用Spark叢集的資源。
spark.storage. memoryFrAction 該參數用于設定RDD持久化資料在Executor記憶體中能占的比例,預設是0.6。也就是說,預設Executor 60%的記憶體,可以用來儲存持久化的RDD資料。根據你選擇的不同的持久化政策,如果記憶體不夠時,可能資料就不會持久化,或者資料會寫入磁盤。 如果Spark作業中,有較多的RDD持久化操作,該參數的值可以适當提高一些,保證持久化的資料能夠容納在記憶體中。避免記憶體不夠緩存所有的資料,導緻資料隻能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那麼這個參數的值适當降低一些比較合适。此外,如果發現作業由于頻繁的gc導緻運作緩慢(通過Spark web ui可以觀察到作業的gc耗時),意味着Task執行使用者代碼的記憶體不夠用,那麼同樣建議調低這個參數的值。
spark.shuffle. memoryFrAction 該參數用于設定shuffle過程中一個Task拉取到上個stage的Task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設20%。shuffle操作在進行聚合時,如果使用的記憶體超出20%的限制,多餘的資料就會溢寫到磁盤,此時會極大地降低性能。 如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的記憶體占比,提高shuffle操作的記憶體占比比例,避免shuffle過程中資料過多時記憶體不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由于頻繁的gc導緻運作緩慢,意味着Task執行使用者代碼的記憶體不夠用,那麼同樣建議調低這個參數的值。

  資源參數的調優,沒有一個固定的值,需要根據自己的實際情況(包括Spark作業中的shuffle操作數量、RDD持久化操作數量以及Spark web ui中顯示的作業gc情況),同時參考本篇文章中給出的原理以及調優建議,合理地設定上述參數。

5.4 小結

  • 對需要重複計算的才使用 cache, 同時及時釋放掉(unpersist)不再需要使用的 RDD.
  • 避免使用 shuffle 運算.需要的時候盡量選取較優方案.
  • 合理配置 Executor/Task/core 的參數,合理配置設定持久化/ shuffle的記憶體占比,
    • driver-memory: 1G
    • executor-memory: 4~8G(根據實際需求來)
    • num-executors: 50~100
    • executor-cores: 2~4
    • Tasks: 500~1000

6.本地搭建 Spark 開發環境

6.1 Spark-Scala-IntelliJ

本地搭建 Spark-scala開發環境, 并使用 IntelliJ idea 作為 IDE 的方法,參見部落格另一篇文章:

Spark學習: Spark-Scala-IntelliJ開發環境搭建和編譯Jar包流程

6.2 Spark-Notebook 開發環境

本地搭建 Spark-Notebook(python or scala) 開發環境, 參見部落格另一篇文章(還沒發出來):

Spark學習: Spark-Notebook 開發環境

2016.10.08

databatman

參考文獻

  1. 文獻:大資料分析平台建設與應用綜述 ↩
  2. Spark學習手冊(三):Spark子產品摘讀 ↩
  3. Spark入門實戰系列–3.Spark程式設計模型(上)–程式設計模型及SparkShell實戰 ↩
  4. 文獻: 基于 spark 平台推薦系統研究. ↩
  5. Apache Spark源碼走讀之7 – Standalone部署方式分析 ↩
  6. Spark性能優化指南——基礎篇 ↩
  7. Apache Spark Jobs 性能調優(一) ↩
  8. Spark性能優化指南——基礎篇 ↩
  9. Apache Spark Jobs 性能調優(一) ↩
  10. Apache Spark Jobs 性能調優(一) ↩
  11. Apache Spark Jobs 性能調優(一) ↩
  12. Spark性能優化指南——基礎篇 ↩

原文參考:https://blog.csdn.net/databatman/article/details/53023818#52-shuffle-的優化

繼續閱讀