單例模式是一種常用的設計模式,但是在叢集模式下的 Spark 中使用單例模式會引發一些錯誤。我們用下面代碼作例子,解讀在 Spark 中使用單例模式遇到的問題。
object Example{
var instance:Example = new Example("default_name");
def getInstance():Example = {
return instance
}
def init(name:String){
instance = new Example(name)
}
}
class Example private(name1:String) extends Serializable{
var name = name1
}
object Main{
def main(args:Array[String]) = {
Example.init("To create happiness with money")
val sc = new SparkContext(new SparkConf().setAppName("test"))
val rdd = sc.parallelize(1 to 10, 3)
rdd.map(x=>{
x + "_"+ Example.getInstance().name
}).collect.foreach(println)
}
}
複制
我們預期結果是數字和騰訊遊戲座右銘,然後實際的結果确實數字和預設名字,如下所示
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsICMyYTMvw1dvwlMvwlM3VWaWV2Zh1Wa-cmbw5ScwBzZwdmY3gHcvw1MxQzM2ATMtUGall3LcVmdhNXLwRHdo9CXt92YucWbpRWdvx2Yx5yazF2Lc9CX6MHc0RHaiojIsJye.png)
就像 Example.init(“To create happiness with money”) 沒有執行一樣。在 Stackoverflow 上,有不少人也碰到這個錯誤,比如 問題1、問題2和問題3。
這是由什麼原因導緻的呢?Spark 執行算子之前,會将算子需要東西準備好并打包(這就是閉包的概念),分發到不同的 executor,但這裡不包括類。類存在 jar 包中,随着 jar 包分發到不同的 executors 中。當不同的 executors 執行算子需要類時,直接從分發的 jar 包取得。這時候在 driver 上對類的靜态變量進行改變,并不能影響 executors 中的類。拿上面的程式做例子,jar 包存的 Example.instance = new Example(“default_name”),分發到不同的 executors。這時候不同 executors 中 Example.getInstance().name 等于 “default_name”。
這個部分涉及到 Spark 底層原理,很難堂堂正正地解決,隻能采取取巧的辦法。不能再 executors 使用類,那麼我們可以用對象嘛。我們可以把 Example 的執行個體對象塞進算子的閉包,随着閉包分發到不同的 executors。修改之後的代碼如下所示。
object Main{
def main(args:Array[String]) = {
Example.init(""To create happiness with money"")
val sc = new SparkContext(new SparkConf().setAppName("test"))
val instance = Example.getInstance()
val rdd = sc.parallelize(1 to 10, 3)
rdd.map(x=>{
x + "_"+ instance.name
}).collect.foreach(println)
}
}
複制
上面代碼在叢集模式下的 Spark 運作結果是數字和騰訊遊戲座右銘。