天天看點

二、flink--叢集環境搭建一、Flink環境搭建

一、Flink環境搭建

1.1 flink部署方式

Flink可以選擇的部署方式有:

Local、Standalone(資源使用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。

我們主要對Standalone模式和Yarn模式下的Flink叢集部署進行分析。

Standalone模式常用于單機進行程式測試,Yarn模式常用于實際線上生産環境。

1.2 叢集規劃

1、叢集規劃

節點名稱 master(jobManager) worker(taskManager) zookeeper
bigdata11
bigdata21
bigdata31

(注:zookeeper隻是用于實作master HA的必要元件,如果不需要master HA,則zookeeper可以去掉。)

2、軟體版本

jdk 1.8
scala 2.11.8
hadoop 2.8
3.4.10
flink 1.6.1

3、基礎環境

安裝好jdk、scala、hadoop(hdfs+yarn都要部署好)、zookeeper,部署方法看之前的相關文章。而且要注意的是,節點之間要配置好ssh秘鑰免登陸。

1.3 Standalone模式安裝

1、解壓程式:

tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module/修改配置檔案           

2、修改配置檔案

配置master節點位址:
[root@bigdata11 conf]$ sudo vi masters
bigdata11:8081

配置worker節點位址:
[root@bigdata11 conf]$ sudo vi slaves
bigdata12
bigdata13

修改flink工作參數:
[root@bigdata11 conf]$ sudo vi flink-conf.yaml 
taskmanager.numberOfTaskSlots:2   //52行
jobmanager.rpc.address: bigdata11  //33行  指定jobmanager 的rpc位址
可選配置:
•每個JobManager(jobmanager.heap.mb)的可用記憶體量,
•每個TaskManager(taskmanager.heap.mb)的可用記憶體量,
•每台機器(taskManager)的可用的slot數量(taskmanager.numberOfTaskSlots),
•每個job的并行度(parallelism.default)
•臨時目錄(taskmanager.tmp.dirs)           

3、配置環境變量

vim /etc/profile.d/flink.sh
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin

然後source /etc/profile.d/flink.sh 啟用環境變量           

4、拷貝配置好的/opt/module/flink-1.6.1到其他節點

使用scp或者rsync

scp -r /opt/module/flink-1.6.1 bigdata12:`pwd`
scp -r /opt/module/flink-1.6.1 bigdata13:`pwd`           

同時配置好其他兩台的環境變量

5、啟動flink叢集

[root@bigdata11 flink-1.6.1]$ ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host bigdata11.
Starting taskexecutor daemon on host bigdata12.
Starting taskexecutor daemon on host bigdata13.           

使用jps可以在對應的節點上檢視對應的程序

StandloneSessionClusterEntrypoint  這是jobmanager程序
TaskManagerRunner   這是taskmanager程序           

6、web UI 檢視

http://bigdata11:8081

7、運作測試任務

flink run -m bigdata11:8081 ./examples/batch/WordCount.jar --input /opt/module/datas/word.txt --output /tmp/word.output           

8、增減節點到叢集中

增加/減少jobmanager節點:
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all

增加/減少taskmanager節點(需要到目前節點去啟動):
bin/taskmanager.sh start|start-foreground|stop|stop-all           

1.4 standalone模式jobManager HA

​ 首先,我們需要知道 Flink 有兩種部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。對于 Standalone 來說,Flink 必須依賴于 Zookeeper 來實作 JobManager 的 HA(Zookeeper 已經成為了大部分開源架構 HA 必不可少的子產品)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 叢集會同時有多個活着的 JobManager,其中隻有一個處于工作狀态,其他處于 Standby 狀态。當工作中的 JobManager 失去連接配接後(如當機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 叢集。

​ 對于 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager 做 HA 了。其實這裡完全是 Yarn 的機制。對于 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢複,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由于完全依賴了 Yarn,是以不同版本的 Yarn 可能會有細微的差異。這裡不再做深究。

1、修改配置檔案

conf/flink-conf.yaml

注釋掉 
#jobmanager.rpc.address: bigdata11

修改下面的配置
high-availability: zookeeper   //73行 指定高可用方式為zookeeper

#指定高可用模式中zookeeper的位址清單 //88行
high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181

#指定将jobmanager狀态資料持久化儲存到hdfs中
high-availability.storageDir: hdfs:///flink/ha/       

#JobManager中繼資料儲存在檔案系統storageDir中,隻有指向此狀态的指針存儲在ZooKeeper中(必須) //沒有
high-availability.zookeeper.path.root: /flink         

#根ZooKeeper節點,在該節點下放置所有叢集節點(推薦),這是叢集節點資訊儲存位置
high-availability.cluster-id:/flinkCluster           

#自定義叢集(推薦),這裡是檢查點和儲存點的配置,儲存在hdfs中,非必須
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints           

conf/masters

将主備jobmanager位址都寫到該配置檔案中。
bigdata11:8081
bigdata12:8081           

conf/zoo.cfg

server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888           

修改完後同步配置到其他所有節點中。

2、啟動叢集

先啟動好zookeeper服務。
然後啟動hdfs服務。
最後啟動flink叢集。 start-cluster.sh           

1.5 yarn模式安裝

部署步驟和上面standalone基本一樣,這裡不重複。還要添加以下配置:

配置好hadoop(hdfs和yarn)環境,同時配置好HADOOP_HOME這個環境變量。

接着在yarn下啟動jobmanager和taskmanager。

/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 4 -jm 1024 -tm 1024 -nm test -d

其中:
-n(--container):TaskManager的數量。
-s(--slots):    每個TaskManager的slot數量,預設一個slot一個core,預設每個taskmanager的slot的個數為1,有時可以多一些taskmanager,做備援。
-jm:JobManager的記憶體(機關MB)。
-tm:每個taskmanager的記憶體(機關MB)。
-nm:yarn 的appName(現在yarn的ui上的名字)。 
-d:背景執行

會自動根據 conf/ 下的配置檔案啟動對應的jobmanager和taskmanager的           

啟動完成後,可以到yarn 的web頁面檢視到剛才送出會話任務:

http://bigdata11:8088           
YarnSessionClusterEntrypoint  這個就是剛剛送出的yarn-session維持的session程序           
./bin/flink run ./examples/batch/WordCount.jar --input 輸出資料路徑
--output 輸出資料路徑

可以手動使用 -m jobManagerAddress 指定jobmanager位址,但是flink client可以自動根據flink的配置檔案擷取到jobmanager位址,是以可以不用指定