天天看點

Spark Kubernetes 的源碼分析系列 - scheduler1 Overview2 分析3 Summary

文章目錄

  • 1 Overview
  • 2 分析
    • 2.1 KubernetesExecutorBuilder
    • 2.2 KubernetesClusterManager
    • 2.3 KubernetesClusterSchedulerBackend
    • 2.4 ExecutorPodsSnapshotsStore
    • 2.5 ExecutorPodsSnapshot
    • 2.6 ExecutorPodsSnapshotsStoreImpl
    • 2.7 ExecutorPodsWatchSnapshotSource
    • 2.8 ExecutorPodsPollingSnapshotSource
    • 2.9 ExecutorPodsLifecycleManager
  • 3 Summary

1 Overview

這一塊代碼可以了解為 Spark 是如何實作一個基于 K8S 的排程器,來排程生成 Executor Pod 的。

2 分析

/path/to/spark/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler
└── cluster
    └── k8s
        ├── ExecutorPodStates.scala
        ├── ExecutorPodsAllocator.scala
        ├── ExecutorPodsLifecycleManager.scala
        ├── ExecutorPodsPollingSnapshotSource.scala
        ├── ExecutorPodsSnapshot.scala
        ├── ExecutorPodsSnapshotsStore.scala
        ├── ExecutorPodsSnapshotsStoreImpl.scala
        ├── ExecutorPodsWatchSnapshotSource.scala
        ├── KubernetesClusterManager.scala
        ├── KubernetesClusterSchedulerBackend.scala
        └── KubernetesExecutorBuilder.scala

2 directories, 11 files
           

2.1 KubernetesExecutorBuilder

由于上篇文章主要介紹了 Driver 的 Pod 是如何生成的,在講 scheduler 之前,先補充一下 Executor 的配置步驟。重點代碼在下面這個

features

裡。步驟跟 Driver 類似,但是少了一些,剩下的就是 一個 Basic 的配置,當然是包含 Pod 或者 Container 的一些 meta 資訊。此外,跟 ApiServer 互動請求 Executor Pod 的時候也需要 K8S 的安全認證的機制。然後就是類似 Env 和本地目錄挂載的一些配置。

val features = Seq(
  new BasicExecutorFeatureStep(conf, secMgr),
  new ExecutorKubernetesCredentialsFeatureStep(conf),
  new MountSecretsFeatureStep(conf),
  new EnvSecretsFeatureStep(conf),
  new LocalDirsFeatureStep(conf),
  new MountVolumesFeatureStep(conf))
           

2.2 KubernetesClusterManager

這個是 Spark 這一段,關于 K8S 叢集作為 resource manager 的一個管理中心。這個類是繼承了

ExternalClusterManager

接口的,主要是控制生成

schedulerBackend

對象。

2.3 KubernetesClusterSchedulerBackend

這是 K8S 叢集排程器的封裝,SchedulerBackend,簡稱 SB 就好了…SB 主要是包含了申請 request 和删除 remove Executor 的邏輯。

// 這裡是指定初始申請的 Executor 的數量,可以通過 conf 來配置
private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
           
// 這個是 Executor 出問題 debug 的關鍵
// 預設情況下 Executor 退出後,會由 Spark 的 K8S 用戶端主動進行删除
// 是以 Executor 的日志就找不到了
// 開啟這個配置 spark.kubernetes.executor.deleteOnTermination
// 這樣 Executor 即時 Failed 了,他的 Pod 也不會被自動删除
private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

// 移除 Executor 的邏輯,上面說到的 Pod 被删除就是這裡的 delete 導緻的
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future[Boolean] {
  logInfo("do send request to kill executors!")
  kubernetesClient
    .pods()
    .withLabel(SPARK_APP_ID_LABEL, applicationId())
    .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
    .withLabelIn(SPARK_EXECUTOR_ID_LABEL, executorIds: _*)
    .delete()
  // Don't do anything else - let event handling from the Kubernetes API do the Spark changes
}
           

2.4 ExecutorPodsSnapshotsStore

這個接口是用于管理 Executor Pod,下面簡稱 EP…EP 的狀态,并且用

ExecutorPodsSnapshot

的資料結構來記錄變化的情況。

2.5 ExecutorPodsSnapshot

ExecutorPodsSnapshot

是關于 Spark App 在叢集裡 EP 的狀态的不可變視圖。

private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) {

  import ExecutorPodsSnapshot._
  
  // 核心方法,witUpdate 通過傳入 Pod 參數,通過 new 一個 EP snapshot 視圖來記錄 EP 的狀态,本質上一個 Map(Executor id -> Executor Pod 狀态) 的資料結構
  def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
    val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
    new ExecutorPodsSnapshot(newExecutorPods)
  }
}
           

2.6 ExecutorPodsSnapshotsStoreImpl

這是

ExecutorPodsSnapshotsStore

的實作類。下面一段是了解整個 scheduler 的關鍵,是以建議拿着英文注釋認真看一遍,大概就能了解了。

Controls the propagation of the Spark application’s executor pods state to subscribers that react to that state.

Roughly follows a producer-consumer model. Producers report states of executor pods, and these states are then published to consumers that can perform any actions in response to these states.

Producers push updates in one of two ways. An incremental update sent by updatePod() represents a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that

the passed pods are all of the most up to date states of all executor pods for the application.

The combination of the states of all executor pods for the application is collectively known as a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that

most recent snapshot - either by incrementally updating the snapshot with a single new pod state, or by replacing the snapshot entirely on a full sync.

Consumers, or subscribers, register that they want to be informed about all snapshots of the executor pods. Every time the store replaces its most up to date snapshot from either an incremental update or a full sync, the most recent snapshot after the update is posted to the subscriber’s buffer.

Subscribers receive blocks of snapshots produced by the producers in time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different time intervals.

以上就是他的設計思想,簡單來說就是依照生産消費者模式,訂閱者訂閱的是 EP 的狀态,而這個狀态是上文提到的 Snapshot。

SNAPSHOT_LOCK // 鎖
subscribers // 訂閱者
pollingTasks // ?
currentSnapshot // 目前的 Snapshot
           

2.7 ExecutorPodsWatchSnapshotSource

這裡面主要是繼承 K8S 用戶端的一個 Wathcher 監聽器,主要監聽 Pod 的事件。因為 EP 被增删改出錯,等都需要被 SB 感覺。

enum Action {
  ADDED, MODIFIED, DELETED, ERROR
}
           

2.8 ExecutorPodsPollingSnapshotSource

這個是通過 K8S client 輪詢 ApiServer 擷取 Pod 狀态并且儲存到 Snapshot 裡的過程。

private class PollRunnable(applicationId: String) extends Runnable {

  override def run(): Unit = Utils.tryLogNonFatalError {
    logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
    // 核心方法,将得到的 Pod 的狀态,通過 replaceSnapshot 來記錄
    snapshotsStore.replaceSnapshot(kubernetesClient
      .pods()
      .withLabel(SPARK_APP_ID_LABEL, applicationId)
      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
      .list()
      .getItems
      .asScala)
  }
}

// 輪詢預設是30s一次
private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
           

2.9 ExecutorPodsLifecycleManager

這個就是一個 EP 生命周期的 Manager,本質上 Pod 是建立在 K8S 叢集的,Driver Pod 對 EP 的管理需要通過 K8S 的 ApiServer,而當 Pod 發生狀态改變了,對應的也要告知 Driver。

private def onNewSnapshots(
    schedulerBackend: KubernetesClusterSchedulerBackend,
    snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
  val execIdsRemovedInThisRound = mutable.HashSet.empty[Long]
  snapshots.foreach { snapshot =>
    snapshot.executorPods.foreach { case (execId, state) =>
      state match {
        case [email protected](_) =>
          logDebug(s"Snapshot reported deleted executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          removeExecutorFromSpark(schedulerBackend, deleted, execId)
          execIdsRemovedInThisRound += execId
        case [email protected](_) =>
          logDebug(s"Snapshot reported failed executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}")
          onFinalNonDeletedState(failed, execId, schedulerBackend, execIdsRemovedInThisRound)
        case [email protected](_) =>
          logDebug(s"Snapshot reported succeeded executor with id $execId," +
            s" pod name ${state.pod.getMetadata.getName}. Note that succeeded executors are" +
            s" unusual unless Spark specifically informed the executor to exit.")
          onFinalNonDeletedState(succeeded, execId, schedulerBackend, execIdsRemovedInThisRound)
        case _ =>
      }
    }
  }

  if (snapshots.nonEmpty) {
    val latestSnapshot = snapshots.last
    (schedulerBackend.getExecutorIds().map(_.toLong).toSet
      -- latestSnapshot.executorPods.keySet
      -- execIdsRemovedInThisRound).foreach { missingExecutorId =>
      if (removedExecutorsCache.getIfPresent(missingExecutorId) == null) {
        val exitReasonMessage = s"The executor with ID $missingExecutorId was not found in the" +
          s" cluster but we didn't get a reason why. Marking the executor as failed. The" +
          s" executor may have been deleted but the driver missed the deletion event."
        logDebug(exitReasonMessage)

        val exitReason = ExecutorExited(
          UNKNOWN_EXIT_CODE,
          exitCausedByApp = false,
          exitReasonMessage)
        schedulerBackend.doRemoveExecutor(missingExecutorId.toString, exitReason)
        execIdsRemovedInThisRound += missingExecutorId
      }
    }
  }

  if (execIdsRemovedInThisRound.nonEmpty) {
    logDebug(s"Removed executors with ids ${execIdsRemovedInThisRound.mkString(",")}" +
      s" from Spark that were either found to be deleted or non-existent in the cluster.")
  }
}
           

3 Summary

Scheduler 的粗淺分析就到這裡,其實不是太難了解的,排程器的功能就是找到給 Driver 配置設定和在合适的時候移除 Executor,至于如何找合适的節點來跑 Executor,那是 K8S 的事情,這裡是把 K8S 作為一個外部的叢集模式,具體的排程工作是交給 K8S 的。

繼續閱讀