為什麼要用checkpoint呢?
checkpoint的意思就是建立檢查點,類似于快照,比如,在spark計算中,計算流程DAG很長,要是将整個DAG計算完成并得出結果,需要很長時間,在這等待時間中突然中間資料丢失,spark就會根據RDD的依賴關系從頭到尾開始計算一遍,這樣會很費性能的,怎麼解決呢?這就需要用到緩存了,我們可以将中間的計算結過通過cache或者persist方式放到記憶體中,這樣也不一定保證資料不會丢失,如果存儲的記憶體除了問題,也是會導緻spark重新根據RDD計算的,是以就有了checkpint。
其中checkpoint的作用就是将DAG中比較重要的中間資料做一個檢查點,将結果存儲到一個高可用的地方(通常呢這個地方就是hdfs裡面)。
示例:sc.textFile(“hdfspath”).flatMap(.split(" ")).map((,1)).reduceByKey(+).saveAsTextFile(“hdfspath”)
1.在textFile讀取hdfs的時候就會先建立一個HadoopRDD,其中這個RDD是去讀取hdfs的資料key為偏移量value為一行資料,因為通常來講偏移量沒有太大的作用是以然後會将HadoopRDD轉化為MapPartitionsRDD,這個RDD隻保留了hdfs的資料
2.flatMap 産生一個RDD MapPartitionsRDD
3.map 産生一個RDD MapPartitionsRDD
4.reduceByKey 産生一個RDD ShuffledRDD
5.saveAsTextFile 産生一個RDD MapPartitionsRDD
可以在測試環境中直接檢視RDD的依賴:rdd.toDebugString方法或者去webUI
scala> val rdd = sc.textFile("hdfs://192.168.1.101:9000/checkpoint0610/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[29] at reduceByKey at <console>:27
scala> rdd.toDebugString
res3: String =
(2) ShuffledRDD[29] at reduceByKey at <console>:27 []
+-(2) MapPartitionsRDD[28] at map at <console>:27 []
| MapPartitionsRDD[27] at flatMap at <console>:27 []
| hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 MapPartitionsRDD[26] at textFile at <console>:27 []
| hdfs://lijie:9000/checkpoint0727/c1a51ee9-1daf-4169-991e-b290f88bac20/rdd-0/part-00000 HadoopRDD[25] at textFile at <console>:27 []
val sc: SparkContext = SparkContext.getOrCreate(conf)
sc.setCheckpointDir("file:///F:/checkpointFile") //設定檢查點儲存的檔案名
val rdd: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(2,"b")))
rdd.checkpoint() //設定檢查點
rdd.foreach(println)
println(rdd.count())
println(rdd.isCheckpointed) //判斷是否設定檢查點
println(rdd.getCheckpointFile) //擷取檢查點所在檔案目錄
(2,b)
(1,a)
2
true
Some(file:/F:/checkpointFile/87962c20-cd22-52ee-b01c-2a1d09090c3b/rdd-0)