一 Spark中Checkpoint是什麼
假設一個應用程式特别複雜場景,從初始RDD開始到最後整個應用程式完成,有非常多的步驟,比如超過20個transformation操作,而且整個運作時間也比較長,比如1-5個小時。此時某一個步驟資料丢失了,盡管之前在之前可能已經持久化到了記憶體或者磁盤,但是依然丢失了,這是很有可能的。也就是說沒有容錯機制,那麼有可能需要重新計算一次。而如果這個步驟很耗時和資源,那麼有點悲劇。
對于一個複雜的RDD,我們如果擔心某些關鍵的,會在後面反複使用的RDD,可能會因為節點的故障,導緻持久化資料的丢失,就可以針對該RDD啟動checkpoint機制,實作容錯和高可用。
它的流程大緻如下圖所示:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiclRnblN0LclHdpZXYyd2LcBzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX90zZOBTQEF2ck1mYohWblZXUYpVd1kmYr50MZV3YyI2cKJDT29GRjBjUIF2LcRHelR3LcJzLctmch1mclRXY39DM2UzMwkDN4AzNyATM3EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
二 如何進行Checkpoint呢?
在SparkContext中需要調用setCheckpointDir方法,設定一個容錯的檔案系統的目錄,比如HDFS。然後對RDD調用checkpoint方法,之後在RDD所處的job運作結束之後,會啟動一個單獨的job來将checkpoint過的RDD的資料寫入之前設定的檔案系統中。進行持久化操作。
那麼此時,即使在後面使用RDD的時候,他的持久化資料不小心丢失了,但是還是可以從它的checkpoint檔案中讀取出該資料,而無需重新計算。
注意:
在進行checkpoint之前,最好先對RDD執行持久化操作,比如persist(StorageLevel.DISK_ONLY)如果持久化了,就不用再重新計算;否則如果沒有持久化RDD,還設定了checkpoint,那麼本來job都結束了,但是由于中間的RDD沒有持久化,那麼checkpointjob想要将RDD資料寫入外部檔案系統,還得從RDD之前的所有的RDD全部重新計算一次,再進行checkpoint。然後從持久化的RDD磁盤檔案讀取資料
三 Checkpoint與持久化的差別
3.1 lineage是否發生改變
持久化隻是将資料儲存在BlockManager中;但是RDD的lineage(血緣關系)是不會變化的
Checkpoint完畢之後,RDD已經沒有之前的lineage(血緣關系),而隻有一個強行為其設定的CheckpointRDD, 也就是說checkpoint之後,lineage發生了改變
3.2 丢失資料的可能性
持久化的資料丢失的可能性更大
Checkpoint的資料通常是儲存在容錯高可用的檔案系統中,比如HDFS,是以checkpoint丢失資料的更能性更小