天天看點

Flink的高可用叢集環境Flink的高可用叢集環境

Flink的高可用叢集環境

Flink簡介

       Flink核心是一個流式的資料流執行引擎,其針對資料流的分布式計算提供了資料分布,資料通信以及容錯機制等功能。

         因現在主要Flink這一塊做先關方面的學習,是以準備要開通Apache Flink專欄這一塊定期釋出一些文章。今天在自己的部落格因為專欄無法申請通過,是以先在此記錄第一篇關于Flink部署的文章。

         在這裡順便打個小廣告,Flink社群第一季線下meetup,已在上海,北京舉辦。接下來分别會在成都和深圳舉辦接下來的幾期,也希望小夥伴們踴躍的加入到Flink社群來,下載下傳釘釘,掃描下方二維碼即可加入大群。

Flink的高可用叢集環境Flink的高可用叢集環境

     首先今天先介紹一下Flink的安裝,安裝部署最新1.6版本支援有8種安裝方式,詳細可以參考安裝部署方式【Clusters & Deployment】 。下面主要介紹Standalone Cluster模式和on yarn模式 。

軟體包下載下傳位址

一.Flink獨立叢集模式安裝(Cluster Standalone)

1.1.解壓安裝

1.2.Flink配置(Configuring Flink)

對其進行相關的配置。主要涉及到的配置檔案是conf/flink-conf.yaml

flink-conf.yaml配置

jobmanager.rpc.address:值設定成你master節點的IP位址

taskmanager.heap.mb:每個TaskManager可用的總記憶體

taskmanager.numberOfTaskSlots:每台機器上可用CPU的總數

parallelism.default:每個Job運作時預設的并行度(這個參數在文檔中介紹好像有問題)

taskmanager.tmp.dirs:臨時目錄

jobmanager.heap.mb:每個節點的JVM能夠配置設定的最大記憶體

jobmanager.rpc.port: 6123

jobmanager.web.port: 8081

[[email protected] conf]# vim flink-conf.yaml
    jobmanager.rpc.address:h001
    taskmanager.heap.mb:
    taskmanager.numberOfTaskSlots:
    parallelism.default:
    taskmanager.tmp.dirs:/tmp
    jobmanager.heap.mb:
    jobmanager.web.port: 
    jobmanager.rpc.port: 
           

主節點與從節點配置

[root@h002 conf]# vim slaves
     h002
     h003
     h004
     h005
[root@h001 conf]# vim masters
    h001:
           

1.3.Flink安裝包分發到所有的worker節點上

1.4.啟動Flink(Starting Flink)

       在master節點上運作下面的腳本,那麼這台機器上将會啟動一個JobManager,并通過SSH連接配接列在slaves檔案中的所有節點以便在每個節點上啟動TaskManager

如果停止叢集,可以在master節點上運作下面的指令

1.5. 在已經運作的叢集中添加JobManager/TaskManager

      通過bin/taskmanager.sh或者bin/jobmanager.sh腳本在已經運作的叢集中添加JobManager或者TaskManager節點

[[email protected] flink-1.2.0]# bin/jobmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)

[[email protected] flink-]# bin/taskmanager.sh (start (local|cluster) [host] [webui-port]|stop|stop-all)
           

二.JobManager高可用(HA)

       JobManager協調每一個Flink叢集環境,它負責作業排程和資源管理。預設情況下,一個Flink叢集中隻有一個JobManager執行個體,這很容易造成單點故障(SPOF)。如果JobManager奔潰了,那麼将沒有新的程式被送出,同時運作的程式将失敗。

       對于JobManager高可用來說,我們可以從失敗的JobManager中恢複,是以可以消除單點故障的問題。我們可以配置Standalone模式和YARN叢集模式下的高可用JobManager的HA,是通過Zookeeper實作的,是以需要先搭建好Zookeeper叢集,同時HA的資訊,還要存儲在HDFS中,是以也需要Hadoop叢集,最後修改Flink中的配置檔案。

根據部署方式不同,Flink Jobmanager HA配置分為2種:

        1、standalone cluster HA

        2、Yarn cluster HA

2.1. Standalone叢集模式高可用

        對于Standalone叢集模式下的JobManager高可用通常的方案是:Flink叢集的任一時刻隻有一個leading JobManager,并且有多個standby JobManager。當leader失敗後,standby通過選舉出一個JobManager作為新的leader。這個方案可以保證沒有單點故障的問題。對于standby和master JobManager執行個體來說,其實沒有明确的差別,每一個JobManager能夠當擔master或standby角色。

2.1.1.相關配置

      為了保證JobManager高可用,你需要設定Zookeeper為recovery mode(恢複模式),配置一個Zookeeper quorum并且對所有的JobManager節點和它們的Web UI端口号設定一個masters檔案。

  • Flink引入Zookeeper的目的主要是讓JobManager實作高可用(leader選舉)
  • Flink使用Zookeeper在所有運作的JobManager執行個體中進行分布式排程的協調。Zookeeper在Flink中是一個獨立的服務,它能夠通過leader選舉和輕量級的一緻性狀态存儲來提供高度可靠的分布式協調器

    Master File(masters)

  • 為了啟動一個HA-cluster,需要在conf/masters中配置masters。
  • masters檔案:masters檔案包含所有的hosts,每個host啟動都JobManager,并且指定綁定的Web UI端口号:
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX:webUIPortX
           

配置檔案flink-conf.yaml

為了啟動一個HA-Cluster,需要在conf/flink-conf.yaml添加如下配置參數:

Recovery mode(必須的):recovery.mode: zookeeper
zookeeper quorum(必須的):recovery.zookeeper.quorum: address1:,...
Zookeeper root(推薦的):Flink在Zookeeper中的root節點,下面放置所有需要協調的資料recovery.zookeeper.path.root: /flink
           

      如果你運作多個Flink HA叢集,那麼你必須手工配置每個Flink叢集使用獨立的root節點

State backend and storage    directory(必須的):JobManager中繼資料在statebackend保持并且僅僅在Zookeeper中存儲,目前在HA模式中,僅支援filesystem。
      state.backend: filesystem
      state.backend.fs.checkpointdir:hdfs://namenode-host:port/flink-checkpoints
      recovery.zookeeper.storageDir: hdfs:///recovery
recovery.zookeeper.storageDir指定的路徑中存儲了所有的中繼資料,用來恢複失敗的JobManager
           

2.2. 兩個JobManager的Standalone模式下的叢集

conf/flink-conf.yaml檔案

配置恢複模式和Zookeeper quorum

[[email protected] conf]# vim flink-conf.yaml
   recovery.mode: zookeeper
   recovery.zookeeper.quorum: h002:,h003:,h004:
   recovery.zookeeper.path.root: /flink 
   state.backend: filesystem
   state.backend.fs.checkpointdir: hdfs://h001:/flink/checkpoints
   recovery.zookeeper.storageDir: hdfs://h001:/flink/recovery
           

在Hadoop檔案系統建立檔案夾

[root@h001 conf]# hadoop fs -mkdir -p /flink/checkpoints
[root@h001 conf]# hadoop fs -mkdir -p /flink/recovery
[root@h001 conf]# hadoop fs -chown -R hdfs:supergroup /flink/
           

配置conf/masters檔案

[[email protected] conf]# vim masters
       h001:8081
       h002:8081
           

配置conf/zoo.cfg檔案,添加Zookeeper叢集節點

[[email protected] conf]# vim zoo.cfg
    server=h002::
    server=h003::
    server=h004::
           

啟動Zookeeper叢集

啟動Flink叢集

經過測試kill掉其中一個jobmanager可切換主備。

2.2. YARN叢集模式高可用

        當運作一個高可用YARN叢集時,我們不需要運作多個JobManager(ApplicationMaster)執行個體,隻需要運作一個執行個體,如果失敗了通過YARN來進行重新開機

        Flink部署在Yarn上,僅作為yarn上“多租戶”的一個service而存在。Flink在yarn中容器的概念分為2種:

用于啟動JobManager(AM)的容器
      用于啟動TaskManager的容器
           

通過yarn-session.sh –help來看下啟動Flink On Yarn的參數資訊

Flink的高可用叢集環境Flink的高可用叢集環境

        其中-n代表taskmanager的容器數量,而不是taskmanager+jobmanager的容器數量

在配置HA前,先通過-q看一下我的yarn叢集的資源情況:

Flink的高可用叢集環境Flink的高可用叢集環境

      從圖中可以看出,我配置的每個NodeManager的記憶體是2048MB(yarn-site.xml),每個NodeManager的vcores數量是2。是以,目前yarn叢集中可用記憶體總量為6144,總cores是6

2.2.1. FLINK ON YARN HA 配置

配置準備

   在配置Flink On Yarn之前,必須保證hdfs和yarn都已經開啟,可以通過 HADOOPHOME/sbin/start−all.sh啟動hdfs和yarn配置(yarn−site.xml)此配置需要在 H A D O O P H O M E / s b i n / s t a r t − a l l . s h 啟 動 h d f s 和 y a r n 配 置 ( y a r n − s i t e . x m l ) 此 配 置 需 要 在 HADOOP_CONF_DIR 的yarn-site.xml添加

[[email protected] ~]# cd /usr/bigdata/hadoop/etc/hadoop/
[[email protected] hadoop]# vim yarn-site.xml
   <property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>4</value>
</property>
           

此配置代表application master在重新開機時,嘗試的最大次數

配置(flink-conf.yaml),此參數需要在$FLINK_HOME/conf 的flink-conf.yaml中配置

[root@h001 conf]# vim flink-conf.yaml
   yarn.application-attempts: 
           

      此參數代表Flink Job(yarn中稱為application)在Jobmanager(或者叫Application Master)恢複時,允許重新開機的最大次數。

      注意,Flink On Yarn環境中,當Jobmanager(ApplicationMaster)失敗時,yarn會嘗試重新開機JobManager(AM),重新開機後,會重新啟動Flink的Job(application)。是以,yarn.application-attempts的設定不應該超過yarn.resourcemanager.am.max-attemps

配置zookeeper資訊

      雖然flink-on-yarn cluster HA依賴于Yarn自己的叢集機制,但是Flink Job在恢複時,需要依賴檢查點産生的快照,而這些快照雖然配置在hdfs,但是其中繼資料資訊儲存在zookeeper中,是以我們還要配置zookeeper的HA資訊。其中,recovery.zookeeper.path.namespace也可以在啟動Flink on Yarn時通過-z參數覆寫。

      在yarn模式下,jobmanager.rpc.address不需要指定,因為哪一個容器作為jobManager由Yarn決定,而不由Flink配置決定;taskmanager.tmp.dirs也不需要指定,這個參數将被yarn的tmp參數指定,預設就是/tmp目錄下,儲存一些用于上傳到ResourceManager的jar或lib檔案。parrallelism.default也不需要指定,因為在啟動yarn時,通過-s指定每個taskmanager的slots數量。

完整的Flink配置資訊如下:

[email protected] conf]# vim flink-conf.yaml
       env.java.home: /usr/java/jdk1_111
       recovery.mode: zookeeper
       recovery.zookeeper.quorum: h002:,h003:,h004:
       recovery.zookeeper.path.root: /flink 
       recovery.zookeeper.path.namespace: /cluster_yarn
       state.backend: filesystem
       state.backend.fs.checkpointdir: hdfs://h001:/flink/checkpoints
       recovery.zookeeper.storageDir: hdfs://h001:/flink/recovery
       taskmanager.network.numberOfBuffers: 
       fs.hdfs.hadoopconf: /usr/bigdata/hadoop/etc/Hadoop
           

以上的yarn HA配置可在Standalone叢集模式下進一步添加幾個參數即可完成。

2.2.2.啟動FLINK YARN SESSION

在YARN上啟動一個Flink主要有兩種方式:

(1)、啟動一個YARN session(Start a long-running Flink cluster on YARN)
(2)、直接在YARN上送出運作Flink作業(Run a Flink job on YARN)
           

Flink YARN Session

啟動Flink Yarn Session有2種模式:

(1)、分離模式
 (2)、用戶端模式
           

      通過-d指定分離模式,即用戶端在啟動Flink Yarn Session後,就不再屬于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通過yarn application -kill 指令來停止

      這種模式下會啟動yarn session,并且會啟動Flink的兩個必要服務:JobManager和TaskManagers,然後你可以向叢集送出作業。同一個Session中可以送出多個Flink作業。需要注意的是,這種模式下Hadoop的版本至少是2.2,而且必須安裝了HDFS(因為啟動YARN session的時候會向HDFS上送出相關的jar檔案和配置檔案)。我們可以通過./bin/yarn-session.sh腳本啟動YARN Session。

      在啟動的是可以指定TaskManager的個數以及記憶體(預設是1G),也可以指定JobManager的記憶體,但是JobManager的個數隻能是一個。

采用用戶端模式來啟動Flink Yarn Session:

[[email protected] flink-.]# bin/yarn-session.sh -n 4 -jm 4096 -tm 8192 -s 2 -nm FlinkOnYarnSession -d -st
  或者
 ./bin/yarn-session.sh -n  -tm  -s 
           

參數說明:

-n:--container 指YARN container配置設定的個數(即TaskManagers的個數)
-jm:--jobManagerMemory 指JobManager Containe的記憶體大小,機關為MB
-tm:--taskManagerMemory 指每個TaskManagerContainer的記憶體大小,機關為MB
-s :指每個TaskManager的slot個數
           

可以通過yarn的webUI檢視一下目前啟動的Application

Flink的高可用叢集環境Flink的高可用叢集環境

通過ApplicationMaster tracking一下Flink的WebUI

http://192.168.xxx.xxx:8088/proxy/application_1500340359200_0002/#/overview

送出作業

使用bin/flink腳本送出作業,同樣我們來看看這個腳本支援哪些參數:

[[email protected] flink-1.5.1]#  bin/flink run
    bin/flink run ./examples/batch/WordCount.jar     \
          --input hdfs:///user/test/LICENSE     \
          --output hdfs:///user/test/result.txt
           

後面相應的跟上參數送出作業即可。

Run a single Flink job on YARN

      上面的YARN session是在Hadoop YARN環境下啟動一個Flink cluster叢集,裡面的資源是可以共享給其他的Flink作業。我們還可以在YARN上啟動一個Flink作業。這裡我們還是使用./bin/flink,但是不需要事先啟動YARN session:

[[email protected] flink-1.5.1]# bin/flink run -m yarn-cluster -yn 2    ./examples/batch/WordCount.jar      \
          --input hdfs:///user/test/LICENSE    \                          
          --output hdfs:///user/test/result.txt
           

      上面的指令同樣會啟動一個類似于YARN session啟動的頁面。其中的-yn是指TaskManager的個數,必須指定。

繼續閱讀