前言
在上篇文章中,向大家介紹了如何使用Spark Operator在kubernetes叢集上面送出一個計算作業。今天我們會繼續使用上篇文章中搭建的Playground進行調試與解析,幫助大家更深入的了解Spark Operator的工作原理。是以如果沒有浏覽過上篇文章的同學,可以通過
傳送門直達,先配置好Playground的環境。
Spark Operator的内部實作
在深入解析Spark Operator之前,我們先補充一些關于kubernetes operator的知識。2018年可以說是kubernetes operator泛濫的一年,各種operator如雨後春筍般出現。operator是擴充kubernetes以及與kubernetes內建的最佳方式之一。在kubernetes的設計理念中,有很重要的一條就是進行了抽象,比如對存儲進行抽象、對應用負載進行抽象、對接入層進行抽象等等。每個抽象又對應了各自生命周期管理的controller,開發者送出的Yaml實際上是對抽象終态的描述,而controller會監聽抽象的變化、解析并進行處理,最終嘗試将狀态修正到終态。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcuEWOyATOxE2N3IWZkZmZmJTN2ITOmFWOmJTO2UGMjRGNvwVbvNmLj5Wat4Wd5lGbh5iY1BXLn1WauU3bop3ZuFGat42YucWbp1iMhRXYvw1LcpDc0RHaiojIsJye.png)
那麼對于在kubernetes中未定義的抽象該如何處理呢,答案就是operator。一個标準operator通常包含如下幾個部分:1. CRD抽象的定義,負責描述抽象所能包含的功能。 2.CRD Controller ,負責解析CRD定義的内容以及生命周期的管理。3.clent-go的SDK,負責提供代碼內建時使用的SDK。
有了這個知識儲備,那麼我們回過頭來看Spark Operator的代碼,結構基本就比較明晰了。核心的代碼邏輯都在pkg下,其中apis下面主要是定義了不同版本的API;client目錄下主要是自動生成的client-go的SDK;crd目錄下主要是定義的兩個自定義資源sparkapplication和scheduledsparkapplication的結構。controller目錄下主要定義的就是這個operator的生命周期管理的邏輯;config目錄下主要處理spark config的轉換。了解一個Operator能力最快捷的方式,就是檢視CRD的定義。在Spark Operator中定義了sparkapplication和scheduledsparkapplication兩個CRD,他們之間有什麼差別呢?
sparkapplication 是對正常spark任務的抽象,作業是單次運作的,作業運作完畢後,所有的Pod會進入Succeed或者Failed的狀态。而scheduledsparkapplication是對離線定時任務的一種抽象,開發者可以在scheduledsparkapplication中定義類似crontab的任務,實作spark離線任務的周期性定時排程。
上面這張圖是Spark中kubernetes的內建圖,也就是說當我們通過spark-submit送出作業的時候,會自動生成driver pod與exector pods。那麼引入了Spark Operator後,這個流程變成了什麼呢?
func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {
// prometheus的監控名額的暴露
appToSubmit := app.DeepCopy()
if appToSubmit.Spec.Monitoring != nil && appToSubmit.Spec.Monitoring.Prometheus != nil {
if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err != nil {
glog.Error(err)
}
}
// 将CRD中的定義轉變為spark-submit的指令
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
if err != nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
return app
}
// 在operator容器内通過spark-submit送出作業
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
if err != nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return app
}
// 因為Pod的狀态也會被Spark Operator進行觀測,是以driver pod宕掉會被重新拉起
// 這是和直接跑spark-submit的一大差別,提供了故障恢複的能力。
if !submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return app
}
glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.SubmittedState,
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
ExecutionAttempts: app.Status.ExecutionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
// 通過service暴露spark-ui
service, err := createSparkUIService(app, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.nodePort
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat != "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err != nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
return app
}
其實到此,我們就已經基本了解Spark Operator做的事情了,首先定義了兩種不同的CRD對象,分别對應普通的計算任務與定時周期性的計算任務,然後解析CRD的配置檔案,拼裝成為spark-submit的指令,通過prometheus暴露監控資料采集接口,建立Service提供spark-ui的通路。然後通過監聽Pod的狀态,不斷回寫更新CRD對象,實作了spark作業任務的生命周期管理。
Spark Operator的任務狀态機
當我們了解了Spark Operator的設計思路和基本流程後,還需要深入了解的就是sparkapplication的狀态都包含哪些,他們之間是如何進行轉換的,因為這是Spark Operator對于生命周期管理增強最重要的部分。
一個Spark的作業任務可以通過上述的狀态機轉換圖進行表示,一個正常的作業任務經曆如下幾個狀态:
New -> Submitted -> Running -> Succeeding -> Completed
而當任務失敗的時候會進行重試,若重試超過最大重試次數則會失敗。也就是說如果在任務的執行過程中,由于資源、排程等因素造成Pod被驅逐或者移除,Spark Operator都會通過自身的狀态機狀态轉換進行重試。
Spark Operator的狀态排查
我們已經知道了Spark Operator最核心的功能就是将CRD的配置轉換為spark-submit的指令,那麼當一個作業運作不預期的時候,我們該如何判斷是哪一層出現的問題呢?首先我們要判斷的就是spark-submit時所生成的參數是否是預期的,因為CRD的Yaml配置雖然可以增強表達能力,但是提高了配置的難度與出錯的可能性。
func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
if !present {
glog.Error("SPARK_HOME is not specified")
}
var command = filepath.Join(sparkHome, "/bin/spark-submit")
cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
if _, err := cmd.Output(); err != nil {
var errorMsg string
if exitErr, ok := err.(*exec.ExitError); ok {
errorMsg = string(exitErr.Stderr)
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
return false, nil
}
if errorMsg != "" {
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
}
return true, nil
}
預設情況下Spark Operator會通過glog level=2等級對外輸出每次作業送出後轉換的送出指令。而預設情況下,glog的level即為2,是以通過檢查Spark Operator的Pod日志可以協助開發者快速排查問題。此外在sparkapplication上面也會通過event的方式進行狀态的記錄,上述狀态機之間的轉換都會通過event的方式展現在sparkapplication的對象上。掌握這兩種方式進行問題排查,可以節省大量排錯時間。
最後
使用Spark Operator是在kubernetes上實踐spark的最佳方式,和傳統的spark-submit相比提供了更多的故障恢複與可靠性保障,并且提供了監控、日志、UI等能力的內建與支援。在下一篇中,會為大家介紹在kubernetes叢集中,送出spark作業時的如何使用外部存儲存儲的最佳實踐。