天天看點

Spark在處理資料的時候,會将資料都加載到記憶體再做處理嗎?

對于Spark的初學者,往往會有一個疑問:Spark(如SparkRDD、SparkSQL)在處理資料的時候,會将資料都加載到記憶體再做處理嗎?

很顯然,答案是否定的!

對該問題産生疑問的根源還是對Spark計算模型了解不透徹。

對于Spark RDD,它是一個分布式的彈性資料集,不真正存儲資料。如果你沒有在代碼中調用persist或者cache算子,Spark是不會真正将資料都放到記憶體裡的。

此外,還要考慮persist/cache的緩存級别,以及對什麼進行緩存(比如是對整張表生成的DataSet緩存還是列裁剪之後生成的DataSet緩存)(關于Spark RDD的特性解析參考《Spark RDD詳解》

既然Spark RDD不存儲資料,那麼它内部是如何讀取資料的呢?其實Spark内部也實作了一套存儲系統:BlockManager。為了更深刻的了解Spark RDD資料的處理流程,先抛開BlockManager本身原理,從源碼角度闡述RDD内部函數的疊代體系。

我們都知道RDD算子最終會被轉化為shuffle map task和result task,這些task通過調用RDD的iterator方法擷取對應partition資料,而這個iterator方法又會逐層調用父RDD的iterator方法擷取資料(通過重寫scala.collection.iterator的hasNext和next方法實作)。主要過程如下:

首先看ShuffleMapTask和ResultTask中runTask方法的源碼:

Spark在處理資料的時候,會将資料都加載到記憶體再做處理嗎?
Spark在處理資料的時候,會将資料都加載到記憶體再做處理嗎?
關鍵看這部分處理邏輯:

rdd.iterator(partition, context)           
Spark在處理資料的時候,會将資料都加載到記憶體再做處理嗎?

getOrCompute方法會先通過目前executor上的BlockManager擷取指定blockId的block,如果block不存在則調用computeOrReadCheckpoint,如果要處理的RDD沒有被checkpoint或者materialized,則接着調用compute方法進行計算。

compute方法是RDD的抽象方法,由繼承RDD的子類具體實作。

以WordCount為例:

sc.textFile(input)
  .flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  .saveAsTextFile(output)           

1.textFile會建構一個HadoopRDD

2.flatMap/map會建構一個MapPartitionsRDD

3.reduceByKey觸發shuffle時會建構一個ShuffledRDD

4.saveAsTextFile作為action算子會觸發整個任務的執行

以flatMap/map産生的MapPartitionsRDD實作的compute方法為例:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))           

底層調用了parent RDD的iterator方法,然後作為參數傳入到了目前的MapPartitionsRDD。而f函數就是對parent RDD的iterator調用了相同的map類函數以執行使用者給定的函數。

是以,這是一個逐層嵌套的rdd.iterator方法調用,子RDD調用父RDD的iterator方法并在其結果之上調用Iterator的map函數以執行使用者給定的函數,逐層調用直到調用到最初的iterator(比如上述WordCount示例中HadoopRDD partition的iterator)。

而scala.collection.Iterator的map/flatMap方法傳回的Iterator就是基于目前Iterator重寫了next和hasNext方法的Iterator執行個體。比如,對于map函數,結果Iterator的hasNext就是直接調用了self iterator的hasNext,next方法就是在self iterator的next方法的結果上調用了指定的map函數。

flatMap和filter函數稍微複雜些,但本質上一樣,都是通過調用self iterator的hasNext和next方法對資料進行周遊和處理。

是以,當我們調用最終結果iterator的hasNext和next方法進行周遊時,每周遊一個資料元素都會逐層調用父層iterator的hasNext和next方法。各層的map函數組成一個pipeline,每個資料元素都經過這個pipeline的處理得到最終結果。

這也是Spark的優勢之一,map類算子整個形成類似流式處理的pipeline管道,一條資料被該鍊條上的各個RDD所包裹的函數處理。

再回到WordCount例子。HadoopRDD直接跟資料源關聯,記憶體中存儲多少資料跟讀取檔案的buffer和該RDD的分區數相關(比如buffer*partitionNum,當然這是一個理論值),saveAsTextFile與此類似。MapPartitionsRDD裡實際在記憶體裡的資料也跟partition數有關系。ShuffledRDD稍微複雜些,因為牽扯到shuffle,但是RDD本身的特性仍然滿足(記錄檔案的存儲位置)。

說完了Spark RDD,再來看另一個問題:Spark SQL對于多表之間join操作,會先把所有表中資料加載到記憶體再做處理嗎?

當然,肯定也不需要!

具體可以檢視Spark SQL針對相應的Join SQL的查詢計劃,以及在之前的文章《Spark SQL如何選擇join政策》中,針對目前Spark SQL支援的join方式,任何一種都不要将join語句中涉及的表全部加載到記憶體。即使是Broadcast Hash Join也隻需将滿足條件的小表完整加載到記憶體。

繼續閱讀