天天看點

1.4 Flink on YARN叢集部署《Flink核心源碼解讀》

作者:小圈資料
本節講解Flink on YARN叢集模式部署,包括會話模式(Session)、單作業模式(Perjob)、應用模式(Application),最後講解Flink on YARN模式下高可用設定。

Apache Hadoop YARN (Yet Another Resource Negotiator,另一種資源協調者)是一種新的通用資源排程系統,可為上層應用提供統一的資源管理和排程,它的引入為叢集在使用率、資源統一管理和資料共享等方面帶來了巨大好處。Spark、Flink、Hive、Tez等項目都可以使用YANR來進行資源排程管理。YARN是Hadoop的一部分,部署Flink YARN叢集首先需要安裝Hadoop,安裝Hadoop的過程,請參考官網https://hadoop.apache.org/。

實際生成環境很少選擇Flink獨立叢集模式,大多選擇Flink on YARN、Flink on K8S。Flink借助于YARN可以動态地配置設定或者釋放JobManager、TaskManager資源,根據運作的作業需要的Slot數量,Flink用戶端與YANR互動,YARN通過自身的ResourceManager、NodeManger配置設定叢集資源,YARN把Flink的JobManager、TaskManager部署到YARN容器裡。這樣做到了按需使用硬體資源,有作業運作就去申請,運作完作業後再返還計算資源給YARN資料總管。

1.4.1 設定環境變量

部署Flink YARN叢集需要添加Hadoop相關依賴jar包,早期版本的Flink提供了Pre-bundled Hadoop捆綁包,它打包了通路Hadoop需要的用戶端類,然後把它放入Flink的lib目錄。由于Pre-bundled Hadoop捆綁包容易引起 Jar包沖突,以及Hadoop版本疊代更替維護困難,現推薦方式:通過設定環境變量HADOOP_CLASSPATH,Flink啟動會讀取HADOOP_CLASSPATH,并把其HADOOP_CLASSPATH的值加入到Flink啟動依賴項中。

編輯檔案:vim /etc/profile,設定HADOOP_CLASSPATH,“hadoop classpath”是hadoop指令,classpath是參數,需要確定“hadoop classpath”能正确執行,提前是安裝Hadoop并把$HADOOP_HOME/bin添加$PATH環境變量中。

export HADOOP_CLASSPATH=`hadoop classpath`
# 儲存後執行source /etc/profile,重新整理環境變量           

1.4.2 了解Flink on YARN三種部署模式

Flink在YANR上支援三種部署模式,分别是應用模式(Application Mode)、單作業模式(Per-Job Mode)和會話模式(YARN Session)。生産環境推薦使用Application Mode或者Per-Job Mode,因為它們提供更好的應用隔離性。

1.4.2.1 會話模式

會話模式是提前初始化好的Flink叢集并常駐在YARN服務中,使用者送出的所有作業都在該Flink叢集内運作。會話模式下隻有一個JobManager,TaskManager根據作業需要的資源數量動态拉起。由于多個作業會共用TaskManager,如果有一個TaskManager當機,上面運作的所有作業都會失敗,另外作業越多JobManager任務越重,容易成為系統瓶頸,實際使用中容易僵死。會話模式适合部署延遲非常敏感但運作時長較短的作業。

1.4.2.2 單作業模式

單作業模式下,每個作業送出到YARN都會啟動一個Flink叢集,JobManager、TaskManager都是專屬某個作業的,作業之間互相隔離,單個作業的失敗不影響其他作業。由于每個作業都需要全新拉起一個Flink叢集,導緻作業啟動延遲較長。單作業模式下Flink叢集的生命周期和作業周期保持一緻,即是作業啟動,Flink叢集随之啟動,作業結束,Flink叢集也跟着銷毀。單作業模式适合部署長時間運作的作業。

1.4.2.3 應用模式

應用模式是Flink 1.11新增的一種部署模式。它也是每個作業都啟動一個Flink叢集,差別于單作業模式,應用模式運作main()方法的位置不一樣。用戶端送出作業需要經曆解析(運作main方法)、提取算子并生成JobGraph,接着送出JobGraph和Flink依賴庫。我們用于送出作業的伺服器往往是固定的機器,如果作業送出很頻繁會導緻用戶端負載過大,以及大量的資料傳輸導緻網絡帶寬占用。Flink為了優化單作業模式,把作業解析、提取、生成JobGraph的階段放到了JobManager,這樣避免用戶端負載過大。JobManager是YARN動态拉取,且運作在不同YANR NodeManager上,這樣能很好地把壓力分散到各計算節點上。另外依賴的資源包通過指定HDFS路徑,JobManager去HDFS拉取,一般HDFS的資料會分散到三個不同節點,且NodeManager會緩存資料,這兒有效規避了用戶端網絡帶寬占用問題。

1.4 Flink on YARN叢集部署《Flink核心源碼解讀》

1.4.3 啟動Flink YARN叢集

1.4.3.1 會話模式

1.啟動YARN Session叢集,--detached參數代表背景運作。

./bin/yarn-session.sh --detached           

啟動成功後會有類似如下提示資訊,可以看到JobManager Web位址、YANR上應用id、以及怎麼優雅地關閉Flink應用。JobManager Web位址也可以通過YARN ResourceManager頁面進入,使用方法:首先在YARN Applications清單找到目标應用,然後點選進入詳細頁面,接着找到 “Tracking URL: ApplicationMaster” ,點選就能看到JobManager Web位址了。

2022-01-16 16:25:17,301 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - YARN application has been deployed successfully.
2022-01-16 16:25:17,302 INFO org.apache.flink.yarn.YarnClusterDescriptor                 [] - Found Web Interface host1.test.com:39344 of application 'application_1640606693101_0012'.
JobManager Web Interface: http://host1.test.com:39344
2022-01-16 16:25:17,531 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli               [] - The Flink YARN session cluster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1640606693101_0012
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1640606693101_0012
Note that killing Flink might not clean up all job artifacts and temporary files.           

2.送出Flink作業到YANR叢集

# 通過flink run執行送出作業,--detached代表背景運作
./bin/flink run --detached ./examples/streaming/TopSpeedWindowing.jar           

注意:在Flink on YARN叢集模式下,jobmanager.rpc.address參數會失效。JobManager由YARN動态拉起并配置設定到某計算節點,是以不能再通過配置的jobmanager.rpc.address位址通路該服務。

1.4.3.2 單作業模式

通過Per-Job模式送出Flink作業,Flink叢集和作業一起啟動。相比YARN Session下送出作業多了target參數:“-t yarn-per-job”。

./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar           

檢視運作的作業以及取消執行

# 列舉正在執行的作業
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# 取消正在執行的作業,Per-Job模式下取消作業會導緻Flink叢集一并銷毀
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>           

1.4.3.3 應用模式

通過應用模式送出Flink作業,使用者jar的main()方法在JobManager中執行,同樣取消作業會導緻Flink叢集一并銷毀。

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar           

為了避免用戶端網絡傳輸到Flink叢集,通過yarn.provided.lib.dirs參數指定Flink依賴jar包,使用者提前把Flink依賴jar包、使用者應用jar包上傳到HDFS叢集。

# 指定包擷取路徑,而非本地用戶端上傳
./bin/flink run-application -t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" \
hdfs://myhdfs/jars/my-application.jar           

1.4.4 Flink on YARN高可用

在Flink on Yarn叢集模式下,Flink高可用和獨立模式有些差異,YANR資源管理架構可以動态拉起Flink叢集,包含其中的JobManager。Flink on YARN叢集的高可用隻需要運作一個JobManager執行個體,當這個JobManager崩潰後,Yarn會重新啟動一個新的JobManager。但完成這些動作的前提是作業資訊需要儲存到zookeeper。

Flink on YARN高可用依賴zookeeper,在配置Flink高可用前需先安裝zookeeper叢集。配置zookeeper參數的方式和獨立叢集配置一樣,除此之外有個yarn.application-attempts參數需要關注。

1.編輯flink-conf.yaml配置檔案,設定基于zookeeper的高可用。Flink on YARN修改了配置檔案,不需要同步到其他節點,執行指令拉起Flink叢集的時候用戶端會把配置檔案一并發送到ApplicationMaster。

#必填參數
high-availability: zookeeper  
#必填參數,多quorum格式:host1:port1,host2:port2,host3:port3
high-availability.zookeeper.quorum: host1.test.com:2181
#必填參數,storageDir需要是所有節點都能通路的位址,比如HDFS, S3, Ceph, nfs。如果使用hdfs位址則需要引入hadoop相關包,另外flink的啟動使用者必須有權限建立hdfs目錄
high-availability.storageDir: hdfs:///flink/recovery

#可選參數,存儲狀态的zookeeper路徑
high-availability.zookeeper.path.root: /flink
#應用嘗試啟動的次數,不能超過YARN設定的ApplicationMaster最大嘗試次數
yarn.application-attempts: 2           

應用嘗試啟動的次數yarn.application-attempts,2表示總共能重新開機的次數,包含作業最開始啟動的那一次,當2次啟動都失敗時,YARN會認為這個作業失敗了。需要注意的是,此處配置的上限是YANR服務中的yarn.resourcemanager.am.max-attempts參數,如果需要調高此處參數,則需要檢查yarn.resourcemanager.am.max-attempts參數值。

2.ApplicationMaster最大執行次數,配置項在yarn-site.xml檔案中。

<!-- YARN ApplicationMaster最大的執行次數. 預設值是2 -->
<property>
 <name>yarn.resourcemanager.am.max-attempts</name>
 <value>2</value>
</property>           

3.不同版本的YARN在作業發生錯誤後,在關閉Container時采取的方式不盡相同。

- 在YARN 2.3.0 到 2.4.0版本中:當Application Master失敗時,所有的Containers都将重新開機。

- 在YARN 2.4.0 到 2.6.0版本中:當Application Master失敗時,TaskManager Containers仍會保持存活狀态,這樣做的好處是啟動時間更快,并且不必等待再次獲得容器資源。

- 在YARN 高于或等于2.6.0版本中:在以前的基礎上設定了一個間隔時間,這個間隔時間的值等于Flink的Akka逾時時間,當作業發生錯誤後,會以這個時間間隔來重新開機任務,當重試次數超過Flink設定的yarn.application-attempts值時,作業才會被系統kill。這樣可以避免一個長時間失敗的任務耗盡Application Master的嘗試重新開機次數。

如果有任何疑問歡迎留言,筆者頭條号與公衆号同名:小圈資料

繼續閱讀