天天看點

【Flink】Flink叢集部署與YARN內建

1、Standalone

軟體要求: Java 1.8.x or higher ssh JAVA_HOME配置 You can set this variable in  conf/flink-conf.yaml  via the  env.java.home  key.

Flink配置: 下載下傳解壓 配置:conf/flink-conf.yaml jobmanager.rpc.address : master 節點 jobmanager.heap.mb : JobManager可用的記憶體數量 taskmanager.heap.mb : 每個TaskManager可以用記憶體數量 taskmanager.numberOfTaskSlots : 每個機器可用的CPU數量 parallelism.default : 叢集中總的CPU數量 taskmanager.tmp.dirs : 臨時目錄 配置conf/slaves:添加worker節點主機名 下面是三個節點的Standalone模式配置的拓撲結構:

【Flink】Flink叢集部署與YARN內建

啟動Flink 下面的腳本在本地節點上啟動一個JobManager,并通過SSH連接配接到salves檔案中列出的所有Worker節點,以便在每個節點上啟動TaskManager。現在Flink系統已經開始運作了。在本地節點上運作的JobManager現在将接受已配置RPC端口的作業 bin/start-cluster.sh bin/stop-cluster.sh

添加JobManager或TaskManager執行個體到叢集 可以使用bin.jobmanager.sh和bin/taskmanager.sh腳本為運作中的叢集添加JobManager和TaskManager執行個體 bin/jobmanager.sh (( start|start-foreground ) cluster ) |stop|stop-all bin/taskmanager.sh start|start-foreground|stop|stop-all

生産中基本很少使用這種模式的,大多數都是基于YARN來進行送出任務,下面主要給出YARN的任務送出配置方式

2、YARN

在YARN上啟動一個長時間運作的Flink叢集(start a long-time Flink cluster on YARN),這種模式會長期占用YARN的資源,當我們送出任務時,該YARN上cluster接收任務。 啟動一個YARN session用4個TaskManager(每個TaskManager配置設定4GB的堆空間) ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096 -s 2 -n : TaskManager的數量,相當于executor的數量 -s : 每個JobManager的core的數量,executor-cores。建議将slot的數量設定每台機器的處理器數量 -tm : 每個TaskManager的記憶體大小,executor-memory -jm : JobManager的記憶體大小,driver-memory 下面是啟動前叢集的資源配置設定:

【Flink】Flink叢集部署與YARN內建

啟動之後的配置設定: 記憶體增加了18G,core增加了9core=2*4+1(多出來的是作為APPMaster和JobManager的)

【Flink】Flink叢集部署與YARN內建

YARN sesstion啟動之後就可以使用bin/flink來啟動送出作業 ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar 下面詳解介紹YARN session模式

Flink YARN Session

Flink session 将啟動所有需要的Flink服務(JobManager和TaskManager),這樣就可以向叢集送出任務。注意,每個session中可以運作多個任務 bin/yarn-sseaion.sh 下面是啟動yarn session時需要給出的參數

【Flink】Flink叢集部署與YARN內建

注意 :用戶端需要設定YARN_CONF_DIR 或者 HADOOP_CONF_DIR環境變量和HDFS的配置 Example:下面的指令配置設定了10TaskManager,每個TM配置設定8GB和32slot(vcore) ./bin/yarn-session.sh -n 10 -tm 8192 -s 32

Flink on YARN需要重寫下面的配置參數: jobmanager.rpc.address : JobManager允許被配置設定到不同的機器上 taskmanager.tmp.dirs : 使用YARN給出的臨時目錄 parallelism.default : slots的數量需要被指定

-D 可以動态修改參數 -Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624 上面例子啟動了11個container(10個container作為TaskManager的),還有一個作為ApplicationMaster和JobManager的

如果叢集上有足夠的資源,Flink on YARN上隻會啟動所有被請求的container。大多數YARN排程器用于container的請求記憶體,一些也用于core的數量。一般情況下,vcore的數量等于處理的slot(-s)的數量,yarn.containers.vcores運作使用自定義的值覆寫

分離YARN Session 如果不想讓Flink YARN用戶端一直運作,那麼也可以啟動一個分離的YARN session。參數:-d or --detached 在這種情況下,Flink YARN用戶端斷隻會送出Flink任務到叢集,然後關閉。注意,這種情況下,用Flink來stop YARN Session是不可能的

連接配接到一個存在的Session: 下面的指令連接配接到一個運作的Flink YARN session ./bin/yarn-session.sh -id application_1463870264508_0029

submit job to flink ./bin/flink

Run a single Flink Job on YARN

上面内容描述了如何在Hadoop YARN環境中啟動FLink 叢集。也可以啟動Flink任務執行單個Job ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar User jars & Classpath yarn.per-job-cluster.include-user-jar

Recovery behavior of Flink on YARN

Flink’s YARN client has the following configuration parameters to control how to behave in case of container failures. These parameters can be set either from the  conf/flink-conf.yaml  or when starting the YARN session, using  -D  parameters yarn.reallocate-failed : 控制 Flink是否應該重新配置設定失敗的TaskManager容器,預設true yarn.maximum-failed-containers : ApplicationMaster接收container失敗的最大次數,預設是TaskManager的次數(-n的值) yarn.application-attempts : ApplicationMaster嘗試次數。如果這個值為1(預設),那麼當Application Master失敗時,整個YARN session就會失敗。更高的值是指ApplicationMaster重新啟動的次數

Flink and YARN 任務送出流程

【Flink】Flink叢集部署與YARN內建

YARN用戶端需要通路Hadoop配置來連接配接YARN資料總管和HDFS。它使用下面的政策來确定Hadoop的配置:

  • YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CONF_PATH.其中一個變量被設定,就能讀取配置
  • 如果上面的政策失敗(在正确的yarn 設定中不應該出來這種情況),用戶端使用HADOOP_HOME環境變量。如果設定了,那麼用戶端就會嘗試通路$HADOOP_HOME/tect/hadoop

step1 : 當啟動一個新的Flink YARN session時,用戶端首先檢查資源(container和memory)是否可用。然後,上傳一個包含Flink和配置的jar包到HDFS上。 用戶端請求(step 2)YARN container啟動ApplicationMaster(step 3).由于用戶端将配置和jar檔案注冊到容器,在特定機器上運作的YARN的NodeManager将負責準備container(例如下載下傳檔案)。一旦完成,ApplicationMaster就被啟動了 JobManager和ApplicationMaster運作在同一個container上。一旦他們被成功啟動,AM就知道JobManager的位址(AM它自己所在的機器)。它就會為TaskManager生成一個新的Flink配置檔案(他們就可以連接配接到JobManager)。這個配置檔案也被上傳到HDFS上。此外,AM容器也提供了Flink的web服務接口。YARN所配置設定的所有端口都是臨時端口,這允許使用者并行執行多個Flink session。 最後,AM開始為Flink的任務TaskManager配置設定container,它将從HDFS加載jar檔案和修改的配置檔案。一旦這些步驟完成,Flink就準備好接口Job的送出了

參考資料: https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/yarn_setup.html

繼續閱讀