整理:翟玥(Flink 社群志願者)
校對:溫天柱(Flink 社群志願者)
作者:王陽(亦祺)
摘要:本文由阿裡巴巴技術專家王陽(亦祺)分享,社群志願者翟玥整理主要介紹如何原生的在 Kubernetes 上運作 Flink。主要内容包括:Tips:點選「 閱讀原文 」連結可檢視作者原版 PPT 及分享視訊~
- Kubernetes 簡介
- Flink on Kubernetes 部署演進
- Flink Native Integration 技術細節
- Demo 示範
什麼是 Kubernetes?
Kubernetes 相信大家都比較熟悉,近兩年大家都在讨論雲原生的話題,讨論 Kubernetes。那麼什麼是 Kubernetes 呢?
- K8s 是一個資源管理系統。
如果大家對 Yarn、 Mesos 熟悉,假設給定一批裸的實體機,将資源管理系統部署上去之後,可以在此之上基于它的 API 或者 SDK 開發一些分布式軟體或者應用程式。例如可以在 Yarn 上開發傳統的 MapReduce,在 K8s 上可以開發一些分布式的 Web Server,或者是大資料計算任務等等。
- K8s 是一個容器編排系統。
不同于傳統的 Yarn,K8s 在所有的程序運作過程中,是全部基于容器化的,但這裡的容器并不隻是單純的 Docker 容器,它也包括 Rocket 等其他相關的隔離措施。如果在生産環境中要求比較高的話,可能會有一些安全容器,比如 Kata Containers 等等。K8s 在 Slave 上部署的應用程式,都是用容器化的方式去做分發和管理,同時用容器化的技術做隔離。
- K8s 是一個自動化運維系統。
它是一個聲明式的 API,我們隻需要告訴 K8s 叢集需要建立一個 Deployment,設定的副本數量,需要達到一個什麼樣的狀态,排程系統也就是 K8s 就會幫助我們維持狀态,直到達到設定的狀态為止。如果中間發生了一些 failover 或者發生了一些失敗,它會自動地将任務遷移到其他的機器上,來滿足目前的排程。
- 雲原生。
目前幾乎所有的雲廠商都已經提供了 K8s 服務支援,包括國内的阿裡、國際上的 Amazon、Google 等等,包括傳統的微軟都已經提供了對于 K8s 的 Managed 服務或者是 Unmanaged 服務。随着目前 Lambda 表達式或者 Function 計算的應用, Serverless 方式也變得更加流行。除了傳統的部署小叢集以外,通過雲産生一個 manager,建構一個大的 Serverless 叢集,然後使用者按需進行計算資源付費,這也是一種新的模式。
Kubernetes 的架構
上圖是 K8s 基本的架構,這是一個非常典型的 Master-Slave 的架構。
- 在 Master 上,是由 Controller,API Server,Scheduler 以及包括做存儲的 Etcd 等構成。Etcd 可以算成 Master,也可以作為獨立于 Master 之外的存儲來對待。Master 的 Controller、API Server、Scheduler 都是單獨的程序模式。這和 Yarn 有一些不同,Yarn 的整個 Master 是一個單程序的模式。K8s 的 Master 還可以在多個 Master 之間完成自發的選舉,然後由 active 狀态的 Master 對外提供服務。
- 在 Slave 上,它主要是包括 Kube proxy、Kubelet,以及 Docker 等相關的元件,每個 Node 上部署的相關元件都是類似的,通過它來管理上面運作的多個 Pod。
- 根據不同使用者的習慣,可以通過 UI 或者 CLI 的方式向 K8s 送出任務。使用者可以通過 K8s 提供的 Dashboard Web UI 的方式将任務進行送出,也可以通過 Kubectl 指令行的方式進行送出。
Kubernetes 的一些概念
- ConfigMap
ConfigMap 是一個 K-V 資料結構。通常的用法是将 ConfigMap 挂載到 Pod ,作為配置檔案提供 Pod 裡新的程序使用。在 Flink 中可以将 Log4j 檔案或者是 flink-conf 檔案寫到 ConfigMap 裡面,在 JobManager 或者 TaskManger 起來之前将它挂載到 Pod 裡,然後 JobManager 去讀取相應的 conf 檔案,加載其配置,進而再正确地拉起 JobManager 一些相應的元件。
- Service(簡稱 SVC )
一種對外暴露服務的方式。如果現在内部有一個服務,需要在 K8s 外部進行通路,此時可以通過 Service,然後用 LoadBalancer 或者 NodePort 的方式将其暴露出去。
如果有一個 Service,不希望或不需要将其對外暴露,可以把它設定為 Cluster IP 或者是 None 這種 Headless 的模式。這個時候,它可以用于服務之間互相連接配接,例如傳統的前端去聯後端服務,或者是在 Flink 中非 HA 的情況下,TaskManager 去連 JobManager 等等。
- Pod
Pod 是 K8s 裡最小的排程單元。K8s 都是以 Pod 進行排程的。每個 Pod 可以包含一個或者多個 Container。每個 Container 都會有自己的資源,互相之間資源也是已經隔離的,但是所有 Container 共享同一個網絡,這就意味着所有的 Container 可以通過 localhost 直接進行通信。
同時,Container 之間可以通過 Volume 共享一些檔案。比如 JobManager 或 TaskManager 的 Pod 裡産生了一些日志,在同一個 Pod 裡再去起另外一個程序收集不符合 K8s 的原生語義。可以通過 SideCar 的方式去起另外一個 Container,把 JobManager 産生的日志收走。這就是一個 Pod 多個 Container 的具體用途。
- Deployment
因為 Pod 是可以随時被終止的,是以當 Pod 終止之後,就無法再拉起來去做 failover 等其他相關操作。Deployment 是在 Pod 之上提供了更高一層的抽象。Deployment 可以設定 Pod 的狀态,比如需要起 5 個 TaskManager,Deployment 會維持目前狀态。當有 TaskManager 挂了以後,它會起新的 TaskManager,來補上。這樣可以避免自己彙報 Pod 的狀态,可以去做一些更複雜的管理 failover 等等。這也是最基礎的概念——運維自動化。
目前都有什麼樣的任務在 K8s 上運作?
除了傳統的 Web 以及移動端一些無狀态的如 MySQL、Kafka 等存儲相關的任務外,有狀态的服務也不斷地在 K8s 上做适配和運作。除此之外,深度學習架構 Tensorflow 原生即可在 K8s 上運作,包括 Spark、Flink 等等,一些大資料相關的架構也在不斷地去相容,不斷地去适配,以便讓更多的大資料服務可以更好地在 K8s 上運作。
從這一點我們可以看出, K8s 相比于 Yarn 或傳統的 Hadoop 具有更好的包容性,它可以把存儲、深度學習、大資料包括 OLAP 分析等多種計算架構、引擎都運作在 K8s 之上。這樣就會帶來一個很大的好處,整個公司隻需要去管理一個排程架構,就可以把所有的存儲,實時計算,批量計算,包括深度學習,OLAP 分析等等,都在一個叢集裡面運作。除了管理更友善以外,也可以達到更好的叢集使用率。
Flink On Kubernetes 的部署演進
Flink 在 K8s 上最簡單的方式是以 Standalone 方式進行部署。這種方式部署的好處在于不需要對 Flink 做任何改動,同時 Flink 對 K8s 叢集是無感覺的,通過外部手段即可讓 Flink 運作起來。
Standalone Session On K8s
Standalone方式在k8s運作步驟:
如圖所示:
- 步驟1, 使用 Kubectl 或者 K8s 的 Dashboard 送出請求到 K8s Master。
- 步驟2, K8s Master 将建立 Flink Master Deployment、TaskManager Deployment、ConfigMap、SVC 的請求分發給 Slave 去建立這四個角色,建立完成後,這時 Flink Master、TaskManager 啟動了。
- 步驟3, TaskManager 注冊到 JobManager。在非 HA 的情況下,是通過内部 Service 注冊到 JobManager。
- 至此,Flink 的 Sesion Cluster 已經建立起來。此時就可以送出任務了。
- 步驟4,在 Flink Cluster 上送出 Flink run 的指令,通過指定 Flink Master 的位址,将相應任務送出上來,使用者的 Jar 和 JobGrapth 會在 Flink Client 生成,通過 SVC 傳給 Dispatcher。
- 步驟5,Dispatcher 會發現有一個新的 Job 送出上來,這時會起一個新的 JobMaster,去運作這個 Job。
- 步驟6,JobMaster 會向 ResourceManager 申請資源,因為 Standalone 方式并不具備主動申請資源的能力,是以這個時候會直接傳回,而且我們已經提前把 TaskManager 起好,并且已經注冊回來了。
- 步驟7-8,這時 JobMaster 會把 Task 部署到相應的 TaskManager 上,整個任務運作的過程就完成了。
Standalone perjob on K8s
現在我們看一下 Perjob 的部署,因為 Session Cluster 和 Perjob 分别都有不同的适用場景,一個 Session 裡面可以跑多個任務,但是每個任務之間沒有辦法達到更好的隔離性。而 Perjob 的方式,每個job都會有一個自己獨立的 Flink Cluster 去運作,它們之間互相獨立。
■ Perjob 的特點:
- 使用者的 Jar 和依賴都是在鏡像裡提前編譯好,或者通過 Init Container 方式,在真正 Container 啟動之前進行初始化。
- 每個 Job 都會啟動一個新的 Cluster。
- 一步送出,不需要像 Session Cluster 一樣先啟動叢集再送出任務。
- 使用者的 main 方法是在 Cluster 裡運作。在特殊網絡環境情況下,main 方法需要在 Cluster 裡運作的話,Session 方式是無法做到的,而 Perjob 方式是可以執行的。
■ 執行步驟:
由 Standalone JobCluster EntryPoint 執行,從 classpath 找到使用者 Jar,執行它的 main 方法得到 JobGrapth 。再送出到 Dispathcher,這時候走 Recover Job 的邏輯,送出到 JobMaster。JobMaster 向 ResourceManager 申請資源,請求 slot,執行 Job。
Helm Chart 方式
Helm 類似于 Linux 上的 Yum。
K8s 裡的 Helm 是一個包管理工具,可以很友善的安裝一個包。部署一個 Flink 叢集等操作,隻需要 helm install 就可以将之前很多步的安裝操作,一步去完成。本質上沒有什麼差别,隻是它用 Helm 重新組織,包括一些模闆等等,用起來會更加友善。
Flink Kubernetes Operator
- 任務生命周期管理
使用 Operator 的方式來管理 Flink,主要是來管理多個 Cluster 的情況,可起到任務生命周期管理的作用。它和 Standalone、Native 的方式,本質上不是在一個層次上,它類似于一個更上層的做任務管理的工具。
- 基于 K8s Operator,友善建立 Flink Cluster。
之前去建立一個 Perjob Cluster,可能需要部署多次,如果任務要做更新,甚至可能需要把之前的删掉,然後修改配置,再重新部署。
引入 K8s Operator 就隻需要做一些簡單操作。比如 Operator 中有自己的一套 yaml 描述方式,修改其中某一個字段,如修改 image 的 version 字段,此時背景會自動觸發一些重新開機,包括對目前正在執行的任務做 savepoint,然後把 Cluster 銷毀掉,再進行新的定向就可以将叢集拉起,等一系列自動化的操作。對 Flink 的配置做修改等也都可以在背景自動化完成。
目前 Operater 有 Lyft 和 Google 兩個開源的 operator,他們在功能上類似,而且都是已經經過生産檢驗,與目前的 Standalone Cluster 結合的比較好的,已經達到生産可用的标準。
參考:
1.lyft/flinkk8soperator
https://github.com/lyft/flinkk8soperator 2.GoogleCloudPlatform/flink-on-k8s-operator https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
總結
當然,Flink on K8s 目前也存在一些不足:
- 無論 Operator、Helm Chart 或者是直接使用 Kubectl Yaml 的方式,Flink 都感覺不到 K8s 的存在。
- 目前主要使用靜态的資源配置設定。需要提前确認好需要多少個 TaskManager,如果 Job 的并發需要做一些調整,TaskManager 的資源情況必須相應的跟上,否則任務無法正常執行。
- 使用者需要對一些 Container、Operator 或者 K8s 有一些最基本的認識,這樣才能保證順利将 Flink 運作到 K8s 之上。
- 對于批處理任務,或者想在一個 Session 裡送出多個任務不太友好。無法實時申請資源和釋放資源。因為 TaskManager 的資源是固定的,批處理任務可能會分多個階段去運作,需要去實時地申請資源、釋放資源,目前也無法實作。如果需要在一個 Session 裡跑多個 Job 并且陸續運作結束目前也無法實作。這時如果維持一個比較大的 Session Cluster,可能會資源浪費。但如果維持的 Session Cluster 比較小,可能會導緻 Job 跑得慢或者是跑不起來。
基于這幾點,我們在社群推進了一個 Native 的內建方案,這個 Native 類似于 Yarn 這種原生的內建,就是讓 Flink 原生的感覺到下層 Cluster 的存在。
Navtive Integration 的技術細節
為什麼叫 Native 方式?包括如下幾個含義。
- 資源申請方式:Flink 的 Client 内置了一個 K8s Client,可以借助 K8s Client 去建立 JobManager,當 Job 送出之後,如果對資源有需求,JobManager 會向 Flink 自己的 ResourceManager 去申請資源。這個時候 Flink 的 ResourceManager 會直接跟 K8s 的 API Server 通信,将這些請求資源直接下發給 K8s Cluster,告訴它需要多少個 TaskManger,每個 TaskManager 多大。當任務運作完之後,它也會告訴 K8s Cluster釋放沒有使用的資源。相當于 Flink 用很原生的方式了解到 K8s Cluster 的存在,并知曉何時申請資源,何時釋放資源。
- Native 是相對于 Flink 而言的,借助 Flink 的指令就可以達到自治的一個狀态,不需要引入外部工具就可以通過 Flink 完成任務在 K8s 上的運作。
具體如何工作?主要分 Session 和 Perjob 兩個方面來給大家介紹。
Native Kubernetes Session 方式
首先 Session 的方式。
- 第一個階段:啟動 Session Cluster。Flink Client 内置了 K8s Client,告訴 K8s Master 建立 Flink Master Deployment,ConfigMap,SVC。建立完成後,Master 就拉起來了。這時,Session 就部署完成了,并沒有維護任何 TaskManager。
- 第二個階段:當使用者送出 Job 時,可以通過 Flink Client 或者 Dashboard 的方式,然後通過 Service 到 Dispatcher,Dispatcher 會産生一個 JobMaster。JobMaster 會向 K8sResourceManager 申請資源。ResourceManager 會發現現在沒有任何可用的資源,它就會繼續向 K8s 的 Master 去請求資源,請求資源之後将其發送回去,起新的 Taskmanager。Taskmanager 起來之後,再注冊回來,此時的 ResourceManager 再向它去申請 slot 提供給 JobMaster,最後由 JobMaster 将相應的 Task 部署到 TaskManager 上。這樣整個從 Session 的拉起到使用者送出都完成了。
- 需注意的是,圖中 SVC 是一個 External Service。必須要保證 Client 通過 Service 可以通路到 Master。在很多 K8s 叢集裡,K8s 和 Flink Client 是不在同一個網絡環境的,這時候可以通過 LoadBalancer 的方式或者 NodePort 的方式,使 Flink Client 可以通路到 Jobmanager Dispatcher,否則 Jar 包是無法送出的。
Native Kubernetes Perjob 方式
我們再來看一下 Perjob 的方式,如圖所示,Perjob 方式其實和之前是有一些類似,差别在于不需要先去起一個 Session Cluster,再送出任務,而是一步的。
- 首先建立出了 Service、Master 和 ConfigMap 這幾個資源以後,Flink Master Deployment 裡面已經帶了一個使用者 Jar,這個時候 entrypoint 就會從使用者 Jar 裡面去提取出或者運作使用者的 main,然後産生 JobGraph。之後再送出到 Dispatcher,由 Dispatcher 去産生 Master,然後再向 ResourceManager 申請資源,後面的邏輯的就和 Session 的方式是一樣的。
- 它和 Session 最大的差異就在于它是一步送出的。因為沒有了兩步送出的需求,如果不需要在任務起來以後通路外部 UI,就可以不用外部的 Service。可直接通過一步送出使任務運作。通過本地的 port-forward 或者是用 K8s ApiServer 的一些 proxy 可以通路 Flink 的 Web UI。此時,External Service 就不需要了,意味着不需要再占用一個 LoadBalancer 或者占用 NodePort。這就是 perjob 方式。
Session 與 Perjob 方式的不同
我們來看一下 Session 和 Perjob 方式有哪些不同?
Session
- 啟動 Session
注意:image 需要替換為自己的鏡像或者使用 docker hub 上的 Flink 官方鏡像庫。
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
- 送出 job 到 Session
./bin/flink run -d -p 10 -e kubernetes-session -Dkubernetes.cluster-id=k8s-session-1 examples/streaming/WindowJoin.jar
- 停止 Session
echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=k8s-session-1 -Dexecution.attached=true
Application
Application 模式是 FLIP-85 引入的一種新的模式,長遠來看是用于替換社群目前的 perjob 模式的。目前 application 模式和 perjob 模式最大的差別是使用者代碼在 client 端還是 jobmanager 端運作。在 K8s 部署上,由于使用者的 jar 和依賴都可以提前打在鏡像裡面,是以支援 application 模式就變得非常容易。
注意:Application 模式是在 Flink 1.11 中才支援的功能,需要使用對應的 Flink client 和鏡像。可以參考社群文檔來建構自己的鏡像。
./bin/flink run-application -p 10 -t kubernetes-application \
-Dkubernetes.cluster-id=k8s-app1 \
-Dkubernetes.container.image=<ImageName> \
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
local:///opt/flink/examples/streaming/WindowJoin.jar
目前功能的狀态
- Native Kubernetes Session模式
FLINK-9953:已經在 Flink 1.10釋出:
https://issues.apache.org/jira/browse/FLINK-9953- Native Kubernetes Application 模式
FLINK-10934:計劃在 Flink 1.11釋出
https://issues.apache.org/jira/browse/FLINK-10934- Native模式下高可用
FLINK-12884:目前高可用方式是基于 zk 實作的,未來是希望不依賴于外部元件,基于 K8s 的 ConfigMap 去做 Meta 存儲和 Leader 選舉。目前已經有内部實作,未來将會貢獻給社群。
https://issues.apache.org/jira/browse/FLINK-12884- 其他功能支援:
FLINK-14460:計劃陸續在 Flink 1.11/1.12兩個版本中釋出和完善
https://issues.apache.org/jira/browse/FLINK-14460包括内容如下:
- Label,annotation,node-selector:希望将任務排程到某個叢集特定的機器上的應用場景可能需要 node-selector,希望給叢集打特定的 Label 并且外部能通路到,可以使用 Label 的功能等等。
- Sidecar container:幫助完成日志收集等
- Init container:可以幫助在 JobManager,TaskManager 啟動之前,把 Jar 下載下傳好,這樣我們可以使用統一的景象,不需要把使用者 Jar 打到鏡像裡
- 存儲優化
- Pod 模闆完成一些不常用的功能等等
目前社群的 Flink on K8s 的 native 方案還在快速發展和完善,希望大家多多試用并且提出回報意見,如果有興趣也非常歡迎一起參與進來開發。
文章不夠看?點選「
」可直接回顧作者現場分享的講解視訊~