以 Flink 和 Spark 為代表的分布式流批計算架構的下層資源管理平台逐漸從 Hadoop 生态的 YARN 轉向 Kubernetes 生态的 k8s 原生 scheduler 以及周邊資源排程器,比如 Volcano 和 Yunikorn 等。這篇文章簡單比較一下兩種計算架構在 Native Kubernetes 的支援和實作上的異同,以及對于應用到生産環境我們還需要做些什麼。
1. 什麼是 Native
這裡的 native 其實就是計算架構直接向 Kubernetes 申請資源。比如很多跑在 YARN 上面的計算架構,需要自己實作一個 AppMaster 來想 YARN 的 ResourceManager 來申請資源。Native K8s 相當于計算架構自己實作一個類似 AppMaster 的角色向 k8s 去申請資源,當然和 AppMaster 還是有差異的 (AppMaster 需要按 YARN 的标準進行實作)。
2. Spark on k8s 使用
送出作業
向 k8s 叢集送出作業和往 YARN 上面送出很類似,指令如下,主要差別包括:
-
參數指定 k8s 叢集的 ApiServer--master
- 需要通過參數
指定在 k8s 運作作業的 image,spark.kubernetes.container.image
- 指定 main jar,需要 driver 程序可通路:如果 driver 運作在 pod 中,jar 包需要包含在鏡像中;如果 driver 運作在本地,那麼 jar 需要在本地。
- 通過
或者--name
指定 app 的名字,作業運作起來之後的 driver 命名會以 app 名字為字首。當然也可以通過參數spark.app.name
直接指定 dirver 的名字spark.kubernetes.driver.pod.name
$ ./bin/spark-submit \
--master k8s://https://: \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image= \
local:///path/to/examples.jar
送出完該指令之後,spark-submit 會建立一個 driver pod 和一個對應的 servcie,然後由 driver 建立 executor pod 并運作作業。
deploy-mode
和在 YARN 上面使用 Spark 一樣,在 k8s 上面也支援 cluster 和 client 兩種模式:
- cluster mode: driver 在 k8s 叢集上面以 pod 形式運作。
- client mode: driver 運作在送出作業的地方,然後 driver 在 k8s 叢集上面建立 executor。為了保證 executor 能夠注冊到 driver 上面,還需要送出作業的機器可以和 k8s 叢集内部的 executor 網絡連通(executor 可以通路到 driver,需要注冊)。
資源清理
這裡的資源指的主要是作業的 driver 和 executor pod。spark 通過 k8s 的 onwer reference 機制将作業的各種資源連接配接起來,這樣當 driver pod 被删除的時候,關聯的 executor pod 也會被連帶删除。但是如果沒有 driver pod,也就是以 client 模式運作作業的話,如下兩種情況涉及到資源清理:
- 作業運作完成,driver 程序退出,executor pod 運作完自動退出
- driver 程序被殺掉,executor pod 連不上 driver 也會自行退出
可以參考:
https://kubernetes.io/docs/concepts/architecture/garbage-collection/依賴管理
前面說到 main jar 包需要在 driver 程序可以通路到的地方,如果是 cluster 模式就需要将 main jar 打包到 spark 鏡像中。但是在日常開發和調試中,每次重新 build 一個鏡像的 effort 實在是太大了。spark 支援送出的時候使用本地的檔案,然後使用 s3 等作為中轉:先上傳上去,然後作業運作的時候再從 s3 上面下載下傳下來。下面是一個執行個體。
...
--packages org.apache.hadoop:hadoop-aws:3.2.0
--conf spark.kubernetes.file.upload.path=s3a:///path
--conf spark.hadoop.fs.s3a.access.key=...
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
--conf spark.hadoop.fs.s3a.fast.upload=true
--conf spark.hadoop.fs.s3a.secret.key=....
--conf spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp
file:///full/path/to/app.jar
Pod Template
k8s 的 controller (比如 Deployment,Job)建立 Pod 的時候根據 spec 中的 pod template 來建立。下面是一個 Job 的示例。
apiVersion: batch/v1
kind: Job
metadata:
name: hello
spec:
template:
# 下面的是一個 pod template
spec:
containers:
- name: hello
image: busybox
command: ['sh', '-c', 'echo "Hello, Kubernetes!" && sleep 3600']
restartPolicy: OnFailure
# The pod template ends here
由于我們通過 spark-submit 送出 spark 作業的時候,最終的 k8s 資源(driver/executor pod)是由 spark 内部邏輯建構出來的。但是有的時候我們想要在 driver/executor pod 上做一些額外的工作,比如增加 sidecar 容器做一些日志收集的工作。這種場景下 PodTemplate 就是一個比較好的選擇,同時 PodTemplate 也将 spark 和底層基礎設施(k8s)解耦開。比如 k8s 釋出新版本支援一些新的特性,那麼我們隻要修改我們的 PodTemplate 即可,而不涉及到 spark 的内部改動。
RBAC
RBAC 全稱是 Role-based access control,是 k8s 中的一套權限控制機制。通俗來說:
- RBAC 中包含了一系列的權限設定,比如 create/delete/watch/list pod 等,這些權限集合的實體叫 Role 或者 ClusterRole
- 同時 RBAC 還包含了角色綁定關系(Role Binding),用于将 Role/ClusterRole 賦予一個或者一組使用者,比如 Service Account 或者 UserAccount
為了将 Spark 作業在 k8s 叢集中運作起來,我們還需要一套 RBAC 資源:
- 指定 namespace 下的 serviceaccount
- 定義了權限規則的 Role 或者 ClusterRole,我們可以使用常見的 ClusterRole "edit"(對幾乎所有資源具有操作權限,比如 create/delete/watch 等)
- 綁定關系
下面指令在 spark namespace 下為 serviceaccount spark 賦予了操作同 namespace 下其他資源的權限,那麼隻要 spark 的 driver pod 挂載了該 serviceaccount,它就可以建立 executor pod 了。
$ kubectl create serviceaccount spark
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark:spark --namespace=spark
下面做一個簡單的示範:
通過如下指令送出作業 SparkPiSleep 到 k8s 叢集中。
$ spark-submit --master k8s://https://: --deploy-mode cluster --class org.apache.spark.examples.SparkPiSleep --conf spark.executor.memory=2g --conf spark.driver.memory=2g --conf spark.driver.core=1 --conf spark.app.name=test12 --conf spark.kubernetes.submission.waitAppCompletion=false --conf spark.executor.core=1 --conf spark.kubernetes.container.image= --conf spark.eventLog.enabled=false --conf spark.shuffle.service.enabled=false --conf spark.executor.instances=1 --conf spark.dynamicAllocation.enabled=false --conf spark.kubernetes.namespace=spark --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark --conf spark.executor.core=1 local:///path/to/main/jar
檢視 k8s 叢集中的資源
$ kubectl get po -n spark
NAME READY STATUS RESTARTS AGE
spark-pi-5b88a27b576050dd-exec-1 0/1 ContainerCreating 0 2s
test12-9fd3c27b576039ae-driver 1/1 Running 0 8s
其中第一個就是 executor pod,第二個是 driver 的 pod。除此之外還建立了一個 service,可以通過該 service 通路到 driver pod,比如 Spark UI 都可以這樣通路到。
$ kubectl get svc -n spark
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
test12-9fd3c27b576039ae-driver-svc ClusterIP None 7078/TCP,7079/TCP,4040/TCP 110s
下面再看一下 service owner reference,executor pod 也是類似的。
$ kubectl get svc test12-9fd3c27b576039ae-driver-svc -n spark -oyaml
apiVersion: v1
kind: Service
metadata:
creationTimestamp: "2021-08-18T03:48:50Z"
name: test12-9fd3c27b576039ae-driver-svc
namespace: spark
# service 的 ownerReference 指向了 driver pod,隻要 driver pod 被删除,該 service 也會被删除
ownerReferences:
- apiVersion: v1
controller: true
kind: Pod
name: test12-9fd3c27b576039ae-driver
uid: 56a50a66-68b5-42a0-b2f6-9a9443665d95
resourceVersion: "9975441"
uid: 06c1349f-be52-4133-80d9-07af34419b1f
3. Flink on k8s 使用
Flink on k8s native 的實作支援兩種模式:
- application mode:在遠端 k8s 叢集中啟動一個 flink 叢集(jm 和 tm),driver 運作在 jm 中,也就是隻支援 detached 模式,不支援 attached 模式。
- session mode:在遠端 k8s 叢集啟動一個常駐的 flink 叢集(隻有 jm),然後向上面送出作業,根據實際情況決定啟動多少個 tm。
在生産上面使用一般不太建議使用 session mode,是以下面主要讨論的是 application mode。
Flink 的 native k8s 模式是不需要指定 tm 個數的,jm 會根據使用者的代碼計算需要多少 tm。
下面是一個簡單的送出指令,需要包含:
- 參數
指定是 application 模式run-application
-
指定運作在 k8s 上--target
-
指定作業運作使用的 flink 鏡像kubernetes.container.image
- 最後需要指定 main jar,路徑是鏡像中的路徑
$ ./bin/flink run-application \
--target kubernetes-application \
-Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar
Flink 的 native 模式會先建立一個 JobManager 的 deployment,并将其托管給 k8s。同一個作業所有的相關資源的 owner reference 都指向該 Deployment,也就是說删除了該 deployment,所有相關的資源都會被清理掉。下面根據作業的運作情況讨論一下資源如何清理。
- 作業運作到終态(SUCCESS,FAILED,CANCELED 等)之後,Flink 會清理掉所有作業
- JobManager 程序啟動失敗(pod 中的 jm 容器啟動失敗),由于控制器是 Deployment,是以會一直重複拉起
- 運作過程中,如果 JobManager 的 pod 被删除,Deployment 會重新拉起
- 運作過程中,如果 JobManager 的 Deployment 被删除,那麼關聯的所有 k8s 資源都會被删除
Flink native 模式也支援 Pod Template,類似 Spark。
類似 Spark。
依賴檔案管理
Flink 暫時隻支援 main jar 以及依賴檔案在鏡像中。也就是說使用者要送出作業需要自己定制化鏡像,體驗不是很好。一種 workaroud 的方式是結合 PodTemplate:
- 如果依賴是本地檔案,需要 upload 到一個 remote 存儲做中轉,比如各大雲廠商的對象存儲。
- 如果依賴是遠端檔案,不需要 upload。
- 運作時在 template 中使用 initContainer 将使用者的 jar 以及依賴檔案下載下傳到 Flink 容器中,并加到 classpath 下運作。
Flink 的作業 demo 就不在示範了。
4. Spark on Kubernetes 實作
Spark on Kubernetes 的實作比較簡單:
- Spark Client 建立一個 k8s pod 運作 driver
- driver 建立 executor pod,然後開始運作作業
- 作業運作結束之後 driver pod 進入到 Completed 狀态,executor pod 會被清理掉。作業結束之後通過 driver pod 我們還是可以檢視 driver pod 的。
代碼實作
Spark 的 native k8s 實作代碼在 resource-managers/kubernetes module 中。我們可以從 SparkSubmit 的代碼開始分析。我們主要看一下 deploy-mode 為 cluster 模式的代碼邏輯。
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("mesos") => MESOS
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, mesos, k8s, or local")
-1
}
首先根據 spark.master 配置中 scheme 來判斷是不是 on k8s。我們上面也看到這個配置的形式為
--master k8s://https://:
。如果是 on k8s 的 cluster 模式,則去加載 Class
org.apache.spark.deploy.k8s.submit.KubernetesClientApplication
,并運作其中的
start
方法。
childArgs
方法的核心邏輯簡單來說就是根據 spark-submit 送出的參數構造出 driver pod 送出到 k8s 運作。
private[spark] class KubernetesClientApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
run(parsedArguments, conf)
}
private def run(clientArguments: ClientArguments, sparkConf: SparkConf): Unit = {
// For constructing the app ID, we can't use the Spark application name, as the app ID is going
// to be added as a label to group resources belonging to the same application. Label values are
// considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate
// a unique app ID (captured by spark.app.id) in the format below.
val kubernetesAppId = KubernetesConf.getKubernetesAppId()
val kubernetesConf = KubernetesConf.createDriverConf(
sparkConf,
kubernetesAppId,
clientArguments.mainAppResource,
clientArguments.mainClass,
clientArguments.driverArgs,
clientArguments.proxyUser)
// The master URL has been checked for validity already in SparkSubmit.
// We just need to get rid of the "k8s://" prefix here.
val master = KubernetesUtils.parseMasterUrl(sparkConf.get("spark.master"))
val watcher = new LoggingPodStatusWatcherImpl(kubernetesConf)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Some(kubernetesConf.namespace),
KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
SparkKubernetesClientFactory.ClientType.Submission,
sparkConf,
None,
None)) { kubernetesClient =>
val client = new Client(
kubernetesConf,
new KubernetesDriverBuilder(),
kubernetesClient,
watcher)
client.run()
}
}
}
上面的代碼的核心就是最後建立 Client 并運作。這個 Client 是 Spark 封裝出來的 Client,内置了 k8s client。
private[spark] class Client(
conf: KubernetesDriverConf,
builder: KubernetesDriverBuilder,
kubernetesClient: KubernetesClient,
watcher: LoggingPodStatusWatcher) extends Logging {
def run(): Unit = {
// 構造 Driver 的 Pod
val resolvedDriverSpec = builder.buildFromFeatures(conf, kubernetesClient)
val configMapName = KubernetesClientUtils.configMapNameDriver
val confFilesMap = KubernetesClientUtils.buildSparkConfDirFilesMap(configMapName,
conf.sparkConf, resolvedDriverSpec.systemProperties)
val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap)
// 修改 Pod 的 container spec:增加 SPARK_CONF_DIR
val resolvedDriverContainer = new ContainerBuilder(resolvedDriverSpec.pod.container)
.addNewEnv()
.withName(ENV_SPARK_CONF_DIR)
.withValue(SPARK_CONF_DIR_INTERNAL)
.endEnv()
.addNewVolumeMount()
.withName(SPARK_CONF_VOLUME_DRIVER)
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.build()
val resolvedDriverPod = new PodBuilder(resolvedDriverSpec.pod.pod)
.editSpec()
.addToContainers(resolvedDriverContainer)
.addNewVolume()
.withName(SPARK_CONF_VOLUME_DRIVER)
.withNewConfigMap()
.withItems(KubernetesClientUtils.buildKeyToPathObjects(confFilesMap).asJava)
.withName(configMapName)
.endConfigMap()
.endVolume()
.endSpec()
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName
var watch: Watch = null
var createdDriverPod: Pod = null
try {
// 通過 k8s client 建立 Driver Pod
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
} catch {
case NonFatal(e) =>
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
throw e
}
try {
// 建立其他資源,修改 owner reference 等
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addOwnerReference(createdDriverPod, otherKubernetesResources)
kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
throw e
}
val sId = Seq(conf.namespace, driverPodName).mkString(":")
// watch pod
breakable {
while (true) {
val podWithName = kubernetesClient
.pods()
.withName(driverPodName)
// Reset resource to old before we start the watch, this is important for race conditions
watcher.reset()
watch = podWithName.watch(watcher)
// Send the latest pod state we know to the watcher to make sure we didn't miss anything
watcher.eventReceived(Action.MODIFIED, podWithName.get())
// Break the while loop if the pod is completed or we don't want to wait
// 根據參數 "spark.kubernetes.submission.waitAppCompletion" 判斷是否需要退出
if(watcher.watchOrStop(sId)) {
watch.close()
break
}
}
}
}
}
下面再簡單介紹一下 Driver 如何管理 Executor 的流程。當 Spark Driver 運作 main 函數時,會建立一個 SparkSession,SparkSession 中包含了 SparkContext,SparkContext 需要建立一個 SchedulerBackend 會管理 Executor 的生命周期。對應到 k8s 上的 SchedulerBackend 其實就是 KubernetesClusterSchedulerBackend,下面主要看一下這個 backend 是如何建立出來的。大膽猜想一下,大機率也是根據 spark.master 的 url 的 scheme "k8s" 建立的。
下面是 SparkContext 建立 SchedulerBackend 的核心代碼邏輯。
private def createTaskScheduler(...) = {
case masterUrl =>
// 建立出 KubernetesClusterManager
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
// 上面建立出來的 KubernetesClusterManager 這裡會建立出 KubernetesClusterSchedulerBackend
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
// 方法 getClsuterManager 會通過 ServiceLoader 加載所有實作 ExternalClusterManager 的 ClusterManager (KubernetesClusterManager 和 YarnClusterManager),然後通過 master url 進行 filter,選出 KubernetesClusterManager
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
val loader = Utils.getContextOrSparkClassLoader
val serviceLoaders =
ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
if (serviceLoaders.size > 1) {
throw new SparkException(
s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
}
serviceLoaders.headOption
}
後面就是 KubernetesClusterSchedulerBackend 管理 Executor 的邏輯了。
可以簡單看一下建立 Executor 的代碼邏輯。
private def requestNewExecutors(
expected: Int,
running: Int,
applicationId: String,
resourceProfileId: Int,
pvcsInUse: Seq[String]): Unit = {
val numExecutorsToAllocate = math.min(expected - running, podAllocationSize)
logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes for " +
s"ResourceProfile Id: $resourceProfileId, target: $expected running: $running.")
// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
applicationId,
driverPod,
resourceProfileId)
// 構造 Executor 的 Pod Spec
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient, rpIdToResourceProfile(resourceProfileId))
val executorPod = resolvedExecutorSpec.pod
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
val resources = replacePVCsIfNeeded(
podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
// 建立 Executor Pod
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
try {
// 增加 owner reference
addOwnerReference(createdExecutorPod, resources)
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && driverPod.nonEmpty) {
addOwnerReference(driverPod.get, Seq(resource))
}
val pvc = resource.asInstanceOf[PersistentVolumeClaim]
logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
s"StorageClass ${pvc.getSpec.getStorageClassName}")
kubernetesClient.persistentVolumeClaims().create(pvc)
}
newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdExecutorPod)
throw e
}
}
}
5. Flink on Kubernetes 實作
Flink 的 Native K8s 實作:
- Flink Client 建立 JobManager 的 Deployment,然後将 Deployment 托管給 k8s
- k8s 的 Deployment Controller 建立 JobManager 的 Pod
- JobManager 内的 ResourceManager 負責先 Kubernetes Scheduler 請求資源并建立 TaskManager 等相關資源并建立相關的 TaskManager Pod 并開始運作作業
- 當作業運作到終态之後所有相關的 k8s 資源都被清理掉
代碼(基于分支 release-1.13)實作主要如下:
- CliFrontend 作為 Flink Client 的入口根據指令行參數
判斷通過方法run-application
去建立 ApplicationClusterrunApplication
- KubernetesClusterDescriptor 通過方法
建立 JobManager 相關的 Deployment 和一些必要的資源deployApplicationCluster
- JobManager 的實作類 JobMaster 通過 ResourceManager 調用類 KubernetesResourceManagerDriver 中的方法
建立 TaskManager 等資源requestResource
其中 KubernetesClusterDescriptor 實作自 interface
ClusterDescriptor
,用來描述對 Flink 叢集的操作。根據底層的資源使用不同,
ClusterDescriptor
有不同的實作,包括 KubernetesClusterDescriptor、YarnClusterDescriptor、StandaloneClusterDescriptor。
public interface ClusterDescriptor<T> extends AutoCloseable {
/* Returns a String containing details about the cluster (NodeManagers, available memory, ...). */
String getClusterDescription();
/* 查詢已存在的 Flink 叢集. */
ClusterClientProvider<T> retrieve(T clusterId) throws ClusterRetrieveException;
/** 建立 Flink Session 叢集 */
ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification)
throws ClusterDeploymentException;
/** 建立 Flink Application 叢集 **/
ClusterClientProvider<T> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException;
/** 建立 Per-job 叢集 **/
ClusterClientProvider<T> deployJobCluster(
final ClusterSpecification clusterSpecification,
final JobGraph jobGraph,
final boolean detached)
throws ClusterDeploymentException;
/** 删除叢集 **/
void killCluster(T clusterId) throws FlinkException;
@Override
void close();
}
下面簡單看一下 KubernetesClusterDescriptor 的核心邏輯:建立 Application 叢集。
public class KubernetesClusterDescriptor implements ClusterDescriptor<String> {
private final Configuration flinkConfig;
// 内置 k8s client
private final FlinkKubeClient client;
private final String clusterId;
@Override
public ClusterClientProvider<String> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
// 查詢 flink 叢集在 k8s 中是否存在
if (client.getRestService(clusterId).isPresent()) {
throw new ClusterDeploymentException(
"The Flink cluster " + clusterId + " already exists.");
}
final KubernetesDeploymentTarget deploymentTarget =
KubernetesDeploymentTarget.fromConfig(flinkConfig);
if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Kubernetes Application Cluster."
+ " Expected deployment.target="
+ KubernetesDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget
+ "\"");
}
// 設定 application 參數:$internal.application.program-args 和 $internal.application.main
applicationConfiguration.applyToConfiguration(flinkConfig);
// 建立叢集
final ClusterClientProvider<String> clusterClientProvider =
deployClusterInternal(
KubernetesApplicationClusterEntrypoint.class.getName(),
clusterSpecification,
false);
try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
LOG.info(
"Create flink application cluster {} successfully, JobManager Web Interface: {}",
clusterId,
clusterClient.getWebInterfaceURL());
}
return clusterClientProvider;
}
// 建立叢集邏輯
private ClusterClientProvider<String> deployClusterInternal(
String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
throws ClusterDeploymentException {
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfig.setString(
ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);
// Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
// 将端口指定為固定值,友善 k8s 的資源建構。因為 pod 的隔離性,是以沒有端口沖突
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
// HA 配置
if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig,
HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,
flinkConfig.get(JobManagerOptions.PORT));
}
try {
final KubernetesJobManagerParameters kubernetesJobManagerParameters =
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
// 補充 PodTemplate 邏輯
final FlinkPod podTemplate =
kubernetesJobManagerParameters
.getPodTemplateFilePath()
.map(
file ->
KubernetesUtils.loadPodFromTemplateFile(
client, file, Constants.MAIN_CONTAINER_NAME))
.orElse(new FlinkPod.Builder().build());
final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
podTemplate, kubernetesJobManagerParameters);
// 核心邏輯:在 k8s 中建立包括 JobManager Deployment 在内 k8s 資源,比如 Service 和 ConfigMap
client.createJobManagerComponent(kubernetesJobManagerSpec);
return createClusterClientProvider(clusterId);
} catch (Exception e) {
//...
}
}
}
上面代碼中需要說的在建構 JobManager 的時候補充 PodTemplate。簡單來說 PodTemplate 就是一個 Pod 檔案。
第三步的 TaskManager 建立就不再贅述了。
7. 生态
這裡生态這個詞可能也不太合适,這裡主要指的的如果要在生産上面使用該功能還有哪些可以做的。下面主要讨論在生産環境上面用來做 trouble-shooting 的兩個功能:日志和監控。
日志
日志收集對于線上系統是非常重要的一環,毫不誇張地說,80% 的故障都可以通過日志查到原因。但是前面也說過,Flink 作業在作業運作到終态之後會清理掉所有資源,Spark 作業運作完隻會保留 Driver Pod 的日志,那麼我們如何收集到完整的作業日志呢?
有幾種方案可供選擇:
- DaemonSet。每個 k8s 的 node 上面以 DaemonSet 形式部署日志收集 agent,對 node 上面運作的所有容器日志進行統一收集,并存儲到類似 ElasticSearch 的統一日志搜尋平台。
- SideCar。使用 Flink/Spark 提供的 PodTemplate 功能在主容器側配置一個 SideCar 容器用來進行日志收集,最後存儲到統一的日志服務裡面。
這兩種方式都有一個前提是有其他的日志服務提供存儲、甚至搜尋的功能,比如 ELK,或者各大雲廠商的日志服務。
除此之外還有一種簡易的方式可以考慮:利用 log4j 的擴充機制,自定義 log appender,在 appender 中定制化 append 邏輯,将日志直接收集并存儲到 remote storage,比如 hdfs,對象存儲等。這種方案需要将自定義的 log appender 的 jar 包放到運作作業的 ClassPath 下,而且這種方式有可能會影響作業主流程的運作效率,對性能比較敏感的作業并不太建議使用這種方式。
監控
目前 Prometheus 已經成為 k8s 生态的監控事實标準,下面我們的讨論也是讨論如何将 Flink/Spark 的作業的名額對接到 Prometheus。下面先看一下 Prometheus 的架構。
其中的核心在于 Prometheus Servier 收集名額的方式是 pull 還是 push:
- 對于常駐的程序,比如線上服務,一般由 Prometheus Server 主動去程序暴露出來的 api pull 名額。
- 對于會結束的程序名額收集,比如 batch 作業,一般使用程序主動 push 的方式。詳細流程是程序将名額 push 到常駐的 PushGateway,然後 Prometheus Server 去 PushGateway pull 名額。
上面兩種使用方式也是 Prometheus 官方建議的使用方式,但是看完描述不難發現其實第一種場景也可以使用第二種處理方式。隻不過第二種方式由于 PushGateway 是常駐的,對其穩定性要求會比較高。
Flink
Flink 同時提供了 PrometheusReporter (将名額通過 api 暴露,由 Prometheus Server 來主動 pull 資料) 和 PrometheusPushGatewayReporter (将名額主動 push 給 PushGateway,Prometheus Server 不需要感覺 Flink 作業)。
這兩種方式中 PrometheusPushGatewayReporter 會更簡單一點,但是 PushGateway 可能會成為瓶頸。如果使用 PrometheusReporter 的方式,需要引入服務發現機制幫助 Prometheus Server 自動發現運作的 Flink 作業的 Endpoint。Prometheus 目前支援的主流的服務發現機制主要有:
- 基于 Consul。Consul 是基于 etcd 的一套完整的服務注冊與發現解決方案,要使用這種方式,我們需要 Flink 對接 Consul。比如我們在送出作業的時候,将作業對應的 Service 進行捕獲并寫入 Consul。
- 基于檔案。檔案也就是 Prometheus 的配置檔案,裡面配置需要拉取 target 的 endpoint。檔案這種方式本來是比較雞肋的,因為它需要 Prometheus Server 和 Flink 作業同時都可以通路,但是需要檔案是 local 的。但是在 k8s 環境中,基于檔案反而變的比較簡單,我們可以将 ConfigMap 挂載到 Prometheus Server 的 Pod 上面,Flink 作業修改 ConfigMap 就可以了。
- 基于 Kubernetes 的服務發現機制。Kubernetes 的服務發現機制簡單來說就是 label select。可以參考 https://prometheus.io/docs/prometheus/latest/configuration/configuration/#kubernetes_sd_config
關于 Prometheus 支援的更多服務發現機制,可以參考:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/,簡單羅列包括:
- azure
- consul
- digitalocean
- docker
- dockerswarm
- dns
- ec2
- eureka
- file
- gce
- hetzner
- http
- kubernetes
- ...
Spark
以批計算為代表的 Spark 使用 PushGateway 的方式來對接 Prometheus 是比較好的方式,但是 Spark 官方并沒有提供對 PushGateway 的支援,隻支援了 Prometheus 的 Exporter,需要 Prometheus Server 主動去 pull 資料。
這裡推薦使用基于 Kubernetes 的服務發現機制。
需要注意的是 Prometheus Server 拉取名額是按固定時間間隔進行拉取的,對于持續時間比較短的批作業,有可能存在還沒有拉取名額,作業就結束的情況。
8. 缺陷
雖然 Spark 和 Flink 都實作了 native k8s 的模式,具體實作略有差異。但是在實際使用上發現兩者的實作在某些場景下還是略有缺陷的。
pod 不具有容錯性 spark-submit 會先建構一個 k8s 的 driver pod,然後由 driver pod 啟動 executor 的 pod。但是在 k8s 環境中并不太建議直接建構 pod 資源,因為 pod 不具有容錯性,pod 所在節點挂了之後 pod 就挂了。熟悉 k8s scheduler 的同學應該知道 pod 有一個字段叫 podName,scheduler 的核心是為 pod 填充這個字段,也就是為 pod 選擇一個合适的 node。一旦排程完成之後 pod 的該字段就固定下來了。這也是 pod 不具有 node 容錯的原因。
Deployment 語義。 Deployment 可以認為是 ReplicaSet 的增強版,而 ReplicaSet 的官方定義如下。
A ReplicaSet's purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
簡單來說,ReplicaSet 的目的是保證幾個相同的 Pod 副本可以不間斷的運作,說是為了線上服務量身定制的也不為過(線上服務最好是無狀态且支援原地重新開機,比如 WebService)。但是盡管 Flink 以流式作業為主,但是我們并不能簡單地将流式作業等同于無狀态的 WebService。比如 Flink 作業的 Main Jar 如果寫的有問題,會導緻 JobManager 的 Pod 一直啟動失敗,但是由于是 Deployment 語義的問題會不斷被重新開機。這個可能是 ByDesign 的,但是感覺并不太好。
Batch 作業處理。 由于 Flink 作業運作完所有資源包括 Deployment 都會被清理掉,拿不到最終的作業狀态,不知道成功有否(流作業的話停止就可以認為是失敗了)。對于這個問題可以利用 Flink 本身的歸檔功能,将結果歸檔到外部的檔案系統(相容 s3 協定,比如阿裡雲對象存儲 oss)中。涉及到的配置如下:
- s3.access-key
- s3.secret-key
- s3.region
- s3.endpoint
- jobmanager.archive.fs.dir
如果不想引入外部系統的話,需要改造 Flink 代碼在作業運作完成之後将資料寫到 k8s 的 api object 中,比如 ConfigMap 或者 Secret。
作業日志。 Spark 作業運作結束之後 Executor Pod 被清理掉,Driver Pod 被保留,我們可以通過它檢視到 Driver 的日志。Flink 作業結束之後就什麼日志都檢視不到了。
9. 總結
本文從使用方式、源碼實作以及在生産系統上面如何補足周邊系統地介紹了 Spark 和 Flink 在 k8s 生态上的實作、實踐以及對比。但是限于篇幅,很多内容來不及讨論了,比如 shuffle 如何處理。如果你們公司也在做這方面的工作,相信還是有很多參考價值的,也歡迎留言交流。
另外,YARN 的時代已經過去了,以後 on k8s scheduler 将成為大資料計算以及 AI 架構的标配。但是 k8s scheduler 這種天生為線上服務設計的排程器在吞吐上面有很大的不足,并不是很契合大資料作業。k8s 社群的批排程器 kube-batch,以及基于 kube-batch 衍生出來的 Volcano 排程器,基于 YARN 的排程算法實作的 k8s 生态排程器 Yunikorn 也逐漸在大資料 on k8s 場景下嶄露頭角,不過這些都是後話了,後面有時間再專門寫文章進行分析對比。