Flink的高可用叢集環境
Flink簡介
Flink核心是一個流式的資料流執行引擎,其針對資料流的分布式計算提供了資料分布,資料通信以及容錯機制等功能。
因現在主要Flink這一塊做先關方面的學習,是以準備要開通Apache Flink專欄這一塊定期釋出一些文章。今天在自己的部落格因為專欄無法申請通過,是以先在此記錄第一篇關于Flink部署的文章。
在這裡順便打個小廣告,Flink社群第一季線下meetup,已在上海,北京舉辦。接下來分别會在成都和深圳舉辦接下來的幾期,也希望小夥伴們踴躍的加入到Flink社群來,下載下傳釘釘,掃描下方二維碼即可加入大群。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNvwVZ2x2bzNXak9CX90TQNNkRrFlQKBTSvwFbslmZvwFMwQzLcVmepNHdu9mZvwFVywUNMZTY18CX052bm9CX9UEVPBzZq5keNRVT3V1MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2LcRHelR3LcJzLctmch1mclRXY39TNzMjMxQTM5EjMxgDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
首先今天先介紹一下Flink的安裝,安裝部署最新1.6版本支援有8種安裝方式,詳細可以參考安裝部署方式【Clusters & Deployment】 。下面主要介紹Standalone Cluster模式和on yarn模式 。
軟體包下載下傳位址
一.Flink獨立叢集模式安裝(Cluster Standalone)
1.1.解壓安裝
1.2.Flink配置(Configuring Flink)
對其進行相關的配置。主要涉及到的配置檔案是conf/flink-conf.yaml
flink-conf.yaml配置
jobmanager.rpc.address:值設定成你master節點的IP位址
taskmanager.heap.mb:每個TaskManager可用的總記憶體
taskmanager.numberOfTaskSlots:每台機器上可用CPU的總數
parallelism.default:每個Job運作時預設的并行度(這個參數在文檔中介紹好像有問題)
taskmanager.tmp.dirs:臨時目錄
jobmanager.heap.mb:每個節點的JVM能夠配置設定的最大記憶體
jobmanager.rpc.port: 6123
jobmanager.web.port: 8081
[[email protected] conf]# vim flink-conf.yaml
jobmanager.rpc.address:h001
taskmanager.heap.mb:
taskmanager.numberOfTaskSlots:
parallelism.default:
taskmanager.tmp.dirs:/tmp
jobmanager.heap.mb:
jobmanager.web.port:
jobmanager.rpc.port:
主節點與從節點配置
[root@h002 conf]# vim slaves
h002
h003
h004
h005
[root@h001 conf]# vim masters
h001:
1.3.Flink安裝包分發到所有的worker節點上
1.4.啟動Flink(Starting Flink)
在master節點上運作下面的腳本,那麼這台機器上将會啟動一個JobManager,并通過SSH連接配接列在slaves檔案中的所有節點以便在每個節點上啟動TaskManager
如果停止叢集,可以在master節點上運作下面的指令
1.5. 在已經運作的叢集中添加JobManager/TaskManager
通過bin/taskmanager.sh或者bin/jobmanager.sh腳本在已經運作的叢集中添加JobManager或者TaskManager節點
[[email protected] flink-1.2.0]# bin/jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
[[email protected] flink-]# bin/taskmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
二.JobManager高可用(HA)
JobManager協調每一個Flink叢集環境,它負責作業排程和資源管理。預設情況下,一個Flink叢集中隻有一個JobManager執行個體,這很容易造成單點故障(SPOF)。如果JobManager奔潰了,那麼将沒有新的程式被送出,同時運作的程式将失敗。
對于JobManager高可用來說,我們可以從失敗的JobManager中恢複,是以可以消除單點故障的問題。我們可以配置Standalone模式和YARN叢集模式下的高可用JobManager的HA,是通過Zookeeper實作的,是以需要先搭建好Zookeeper叢集,同時HA的資訊,還要存儲在HDFS中,是以也需要Hadoop叢集,最後修改Flink中的配置檔案。
根據部署方式不同,Flink Jobmanager HA配置分為2種:
1、standalone cluster HA
2、Yarn cluster HA
2.1. Standalone叢集模式高可用
對于Standalone叢集模式下的JobManager高可用通常的方案是:Flink叢集的任一時刻隻有一個leading JobManager,并且有多個standby JobManager。當leader失敗後,standby通過選舉出一個JobManager作為新的leader。這個方案可以保證沒有單點故障的問題。對于standby和master JobManager執行個體來說,其實沒有明确的差別,每一個JobManager能夠當擔master或standby角色。
2.1.1.相關配置
為了保證JobManager高可用,你需要設定Zookeeper為recovery mode(恢複模式),配置一個Zookeeper quorum并且對所有的JobManager節點和它們的Web UI端口号設定一個masters檔案。
- Flink引入Zookeeper的目的主要是讓JobManager實作高可用(leader選舉)
-
Flink使用Zookeeper在所有運作的JobManager執行個體中進行分布式排程的協調。Zookeeper在Flink中是一個獨立的服務,它能夠通過leader選舉和輕量級的一緻性狀态存儲來提供高度可靠的分布式協調器
Master File(masters)
- 為了啟動一個HA-cluster,需要在conf/masters中配置masters。
- masters檔案:masters檔案包含所有的hosts,每個host啟動都JobManager,并且指定綁定的Web UI端口号:
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
配置檔案flink-conf.yaml
為了啟動一個HA-Cluster,需要在conf/flink-conf.yaml添加如下配置參數:
Recovery mode(必須的):recovery.mode: zookeeper
zookeeper quorum(必須的):recovery.zookeeper.quorum: address1:,...
Zookeeper root(推薦的):Flink在Zookeeper中的root節點,下面放置所有需要協調的資料recovery.zookeeper.path.root: /flink
如果你運作多個Flink HA叢集,那麼你必須手工配置每個Flink叢集使用獨立的root節點
State backend and storage directory(必須的):JobManager中繼資料在statebackend保持并且僅僅在Zookeeper中存儲,目前在HA模式中,僅支援filesystem。
state.backend: filesystem
state.backend.fs.checkpointdir:hdfs://namenode-host:port/flink-checkpoints
recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.storageDir指定的路徑中存儲了所有的中繼資料,用來恢複失敗的JobManager
2.2. 兩個JobManager的Standalone模式下的叢集
conf/flink-conf.yaml檔案
配置恢複模式和Zookeeper quorum
[[email protected] conf]# vim flink-conf.yaml
recovery.mode: zookeeper
recovery.zookeeper.quorum: h002:,h003:,h004:
recovery.zookeeper.path.root: /flink
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://h001:/flink/checkpoints
recovery.zookeeper.storageDir: hdfs://h001:/flink/recovery
在Hadoop檔案系統建立檔案夾
[root@h001 conf]# hadoop fs -mkdir -p /flink/checkpoints
[root@h001 conf]# hadoop fs -mkdir -p /flink/recovery
[root@h001 conf]# hadoop fs -chown -R hdfs:supergroup /flink/
配置conf/masters檔案
[[email protected] conf]# vim masters
h001:8081
h002:8081
配置conf/zoo.cfg檔案,添加Zookeeper叢集節點
[[email protected] conf]# vim zoo.cfg
server=h002::
server=h003::
server=h004::
啟動Zookeeper叢集
啟動Flink叢集
經過測試kill掉其中一個jobmanager可切換主備。
2.2. YARN叢集模式高可用
當運作一個高可用YARN叢集時,我們不需要運作多個JobManager(ApplicationMaster)執行個體,隻需要運作一個執行個體,如果失敗了通過YARN來進行重新開機
Flink部署在Yarn上,僅作為yarn上“多租戶”的一個service而存在。Flink在yarn中容器的概念分為2種:
用于啟動JobManager(AM)的容器
用于啟動TaskManager的容器
通過yarn-session.sh –help來看下啟動Flink On Yarn的參數資訊
其中-n代表taskmanager的容器數量,而不是taskmanager+jobmanager的容器數量
在配置HA前,先通過-q看一下我的yarn叢集的資源情況:
從圖中可以看出,我配置的每個NodeManager的記憶體是2048MB(yarn-site.xml),每個NodeManager的vcores數量是2。是以,目前yarn叢集中可用記憶體總量為6144,總cores是6
2.2.1. FLINK ON YARN HA 配置
配置準備
在配置Flink On Yarn之前,必須保證hdfs和yarn都已經開啟,可以通過 HADOOPHOME/sbin/start−all.sh啟動hdfs和yarn配置(yarn−site.xml)此配置需要在 H A D O O P H O M E / s b i n / s t a r t − a l l . s h 啟 動 h d f s 和 y a r n 配 置 ( y a r n − s i t e . x m l ) 此 配 置 需 要 在 HADOOP_CONF_DIR 的yarn-site.xml添加
[[email protected] ~]# cd /usr/bigdata/hadoop/etc/hadoop/
[[email protected] hadoop]# vim yarn-site.xml
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
</property>
此配置代表application master在重新開機時,嘗試的最大次數
配置(flink-conf.yaml),此參數需要在$FLINK_HOME/conf 的flink-conf.yaml中配置
[root@h001 conf]# vim flink-conf.yaml
yarn.application-attempts:
此參數代表Flink Job(yarn中稱為application)在Jobmanager(或者叫Application Master)恢複時,允許重新開機的最大次數。
注意,Flink On Yarn環境中,當Jobmanager(ApplicationMaster)失敗時,yarn會嘗試重新開機JobManager(AM),重新開機後,會重新啟動Flink的Job(application)。是以,yarn.application-attempts的設定不應該超過yarn.resourcemanager.am.max-attemps
配置zookeeper資訊
雖然flink-on-yarn cluster HA依賴于Yarn自己的叢集機制,但是Flink Job在恢複時,需要依賴檢查點産生的快照,而這些快照雖然配置在hdfs,但是其中繼資料資訊儲存在zookeeper中,是以我們還要配置zookeeper的HA資訊。其中,recovery.zookeeper.path.namespace也可以在啟動Flink on Yarn時通過-z參數覆寫。
在yarn模式下,jobmanager.rpc.address不需要指定,因為哪一個容器作為jobManager由Yarn決定,而不由Flink配置決定;taskmanager.tmp.dirs也不需要指定,這個參數将被yarn的tmp參數指定,預設就是/tmp目錄下,儲存一些用于上傳到ResourceManager的jar或lib檔案。parrallelism.default也不需要指定,因為在啟動yarn時,通過-s指定每個taskmanager的slots數量。
完整的Flink配置資訊如下:
[email protected] conf]# vim flink-conf.yaml
env.java.home: /usr/java/jdk1_111
recovery.mode: zookeeper
recovery.zookeeper.quorum: h002:,h003:,h004:
recovery.zookeeper.path.root: /flink
recovery.zookeeper.path.namespace: /cluster_yarn
state.backend: filesystem
state.backend.fs.checkpointdir: hdfs://h001:/flink/checkpoints
recovery.zookeeper.storageDir: hdfs://h001:/flink/recovery
taskmanager.network.numberOfBuffers:
fs.hdfs.hadoopconf: /usr/bigdata/hadoop/etc/Hadoop
以上的yarn HA配置可在Standalone叢集模式下進一步添加幾個參數即可完成。
2.2.2.啟動FLINK YARN SESSION
在YARN上啟動一個Flink主要有兩種方式:
(1)、啟動一個YARN session(Start a long-running Flink cluster on YARN)
(2)、直接在YARN上送出運作Flink作業(Run a Flink job on YARN)
Flink YARN Session
啟動Flink Yarn Session有2種模式:
(1)、分離模式
(2)、用戶端模式
通過-d指定分離模式,即用戶端在啟動Flink Yarn Session後,就不再屬于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通過yarn application -kill 指令來停止
這種模式下會啟動yarn session,并且會啟動Flink的兩個必要服務:JobManager和TaskManagers,然後你可以向叢集送出作業。同一個Session中可以送出多個Flink作業。需要注意的是,這種模式下Hadoop的版本至少是2.2,而且必須安裝了HDFS(因為啟動YARN session的時候會向HDFS上送出相關的jar檔案和配置檔案)。我們可以通過./bin/yarn-session.sh腳本啟動YARN Session。
在啟動的是可以指定TaskManager的個數以及記憶體(預設是1G),也可以指定JobManager的記憶體,但是JobManager的個數隻能是一個。
采用用戶端模式來啟動Flink Yarn Session:
[[email protected] flink-.]# bin/yarn-session.sh -n 4 -jm 4096 -tm 8192 -s 2 -nm FlinkOnYarnSession -d -st
或者
./bin/yarn-session.sh -n -tm -s
參數說明:
-n:--container 指YARN container配置設定的個數(即TaskManagers的個數)
-jm:--jobManagerMemory 指JobManager Containe的記憶體大小,機關為MB
-tm:--taskManagerMemory 指每個TaskManagerContainer的記憶體大小,機關為MB
-s :指每個TaskManager的slot個數
可以通過yarn的webUI檢視一下目前啟動的Application
通過ApplicationMaster tracking一下Flink的WebUI
http://192.168.xxx.xxx:8088/proxy/application_1500340359200_0002/#/overview
送出作業
使用bin/flink腳本送出作業,同樣我們來看看這個腳本支援哪些參數:
[[email protected] flink-1.5.1]# bin/flink run
bin/flink run ./examples/batch/WordCount.jar \
--input hdfs:///user/test/LICENSE \
--output hdfs:///user/test/result.txt
後面相應的跟上參數送出作業即可。
Run a single Flink job on YARN
上面的YARN session是在Hadoop YARN環境下啟動一個Flink cluster叢集,裡面的資源是可以共享給其他的Flink作業。我們還可以在YARN上啟動一個Flink作業。這裡我們還是使用./bin/flink,但是不需要事先啟動YARN session:
[[email protected] flink-1.5.1]# bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar \
--input hdfs:///user/test/LICENSE \
--output hdfs:///user/test/result.txt
上面的指令同樣會啟動一個類似于YARN session啟動的頁面。其中的-yn是指TaskManager的個數,必須指定。