天天看點

Flink叢集部署-standalone部署模式

  簡單研究下Flink的任務部署。我們在IDEA 開發工具中用代碼跑Flink 的時候,實際是會虛拟出一個小型的Flink 叢集,當執行execute 的時候是将上面的代碼作為一個job 送出到Flink 的JobManager中。

  參考: ​​https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/​​

1.簡介

  Flink 送出作業和執行任務,需要以下幾個元件:用戶端(Client)、作業管理器(JobManager)、任務管理器(TaskManager)。我們的代碼由用戶端擷取,之後送出給JobManager,做進一步轉換處理後,然後分發給任務衆多的TaskManager。這裡的TaskManager是真正幹活的人,資料處理操作是他們處理的。

2. 單機版部署

1. 資源下載下傳:

​​https://archive.apache.org/dist/flink/flink-1.13.0/​​  下載下傳scala_2.12.tgz

2. 開始啟動叢集

  下載下傳下來,直接啟動即可。就回啟動一個單節點的叢集。JobManager和TaskManager 都在同一個機子。

[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh      

然後檢視Java 相關程序

[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
4362 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
2991 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint      

3. 啟動後會啟動一個webui 控制台。預設端口是8081,這些預設的端口都是可以修改的。在配置檔案 conf/flink-conf.yaml

  注意這裡有個可用的任務槽, 可以了解為Worker 節點數量。(後面跑任務的并行度需要小于等于該值)

Flink叢集部署-standalone部署模式

4. 将之前的項目打包成jar 包

(1)修改類監聽端口為 192.168.13.107

Flink叢集部署-standalone部署模式
Flink叢集部署-standalone部署模式
package cn.qz;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class SocketStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 建立執行環境(流處理執行環境)
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2. 讀取檔案
        DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777);
        // 3. 轉換資料格式
        SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
        // 4. 分組
        KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
        // 6. 列印
        sum.print();
        // 7. 執行
        executionEnvironment.execute();
    }
}      

View Code

(2) 打包

5. 上傳jar包然後送出任務(107 伺服器先nc -l 7777 監聽端口)

Flink叢集部署-standalone部署模式

  可以選擇入口類,1 是并行度(任務槽)。 也可以輸入程式參數和儲存點儲存的路徑。(這裡的操作應該是用戶端先跑一遍代碼,把任務摘出來,然後交給TaskManager,TaskManager 分發給JobManager)

6. 測試: 

發送資料:

hello flink
hello china      

7. 檢視flink 控制台

  上面的日志實際會輸出到 workers 伺服器的std. 這裡就是107 伺服器的。

Flink叢集部署-standalone部署模式

 8. 停止任務(Running Jobs -》 選中任務-》Cancel Job)

Flink叢集部署-standalone部署模式

9. 曆史任務可以到 Completed Jobs 檢視執行過的任務的資訊

10. 指令行送出任務和取消任務 

(1) 送出任務

[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink run -c cn.qz.SocketStreamWordCount -p 1 ./study-flink-1.0-SNAPSHOT.jar 
Job has been submitted with JobID 46b0c1daf825694fbc1692add3477ba9      

(2) webui 檢視任務

Flink叢集部署-standalone部署模式

(3) nc -l 視窗輸入資訊進行測試

(4) 取消任務

[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink cancel 46b0c1daf825694fbc1692add3477ba9
Cancelling job 46b0c1daf825694fbc1692add3477ba9.
Cancelled job 46b0c1daf825694fbc1692add3477ba9.      

11. 關閉叢集

[root@k8smaster01 flink]# ./flink-1.13.0/bin/stop-cluster.sh      

3. 叢集版部署

1. 節點規劃

redisnode01 192.168.13.111 worker
redisnode2 192.168.13.112 worker
k8smaster01 192.168.13.107 masters+workers      

2. 三個節點配置SSH免密登入

3. 三個節點配置hosts 檔案,通過主機名稱可以通路

[root@k8smaster01 flink-1.13.0]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.13.107 k8smaster01
192.168.13.111 redisnode01
192.168.13.112      

通過scp 将三個檔案複制到其他節點:

scp /etc/hosts 192.168.13.107:/etc/      

4. 主節點修改flink配置

主要是修改管理節點和工作節點的資訊

(1) 修改conf/flink-con.yaml中如下:

jobmanager.rpc.address: 192.168.13.107      

(2) 修改masters内容為主節點加WEBUI端口

192.168.13.107:8081      

(3) 修改workers(也可以是主機名稱,自己配置了hosts 檔案)

192.168.13.107
192.168.13.111
192.168.13.112      

5. 将主節點的配置拷貝到另外兩個機子(scp 複制)=三個節點的配置保持一緻

scp -r ./flink-1.13.0 192.168.13.111:/opt/flink/      

6. 三個節點都配置環境變量

其實master 節點可以不配置,master 節點可以通過全路徑進行啟動叢集。workers 節點必須配置環境變量。

/etc/profile 增加如下配置:

export JAVA_HOME=/home/javatest/jdk8/jdk1.8.0_291/
export PATH=$PATH:$JAVA_HOME/bin:/opt/flink/flink-1.13.0/bin      

7. 啟動叢集

啟動叢集的時候, 在主節點啟動即可。workers 節點不需要再次啟動,主節點啟動後會通過網絡去自動啟動worker 節點

[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh      

(1) 主節點檢視程序

[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
11860 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
12187      

(2) 工作節點111 檢視jps 程序

[root@redisnode01 log]# jps -l | grep -v jps
17122      

8. web 檢視資訊,可以看出有三個任務槽

Flink叢集部署-standalone部署模式

(1)可以看出三個workers 節點資訊

Flink叢集部署-standalone部署模式

(2)點進去可以看到相關的記憶體資訊

Flink叢集部署-standalone部署模式

 9. 107 節點nc -l 7777 監聽端口,然後從web界面再次添加任務

Flink叢集部署-standalone部署模式

将上面的并行度設定為2,也就是說會占用兩個任務槽。

10. 測試

(1) 輸入如下資訊:

-[root@k8smaster01 flink-1.13.0]#nc -l 7777
hello china this is new task test !!!      

 (2) 檢視輸出。 可以看出上面的輸出是在兩個worker 節點的标準輸出

Flink叢集部署-standalone部署模式

 (3)檢視輸出詳情

107:

Flink叢集部署-standalone部署模式

 111:

Flink叢集部署-standalone部署模式

 (4) 也可以從日志檔案檢視輸出。 下面以111 為例子

[root@redisnode01 log]# ll -tr
total 112
-rw-r--r--. 1 root root 20231 Jun 14 09:47 flink-root-taskexecutor-0-redisnode01.log.1
-rw-r--r--. 1 root root 19747 Jun 14 09:55 flink-root-taskexecutor-0-redisnode01.log.2
-rw-r--r--. 1 root root     0 Jun 14 11:04 flink-root-taskexecutor-1-redisnode01.out
-rw-r--r--. 1 root root 19747 Jun 14 11:05 flink-root-taskexecutor-1-redisnode01.log
-rw-r--r--. 1 root root 19628 Jun 14 11:05 flink-root-taskexecutor-0-redisnode01.log.3
-rw-r--r--. 1 root root 25746 Jun 14 11:10 flink-root-taskexecutor-0-redisnode01.log
-rw-r--r--. 1 root root    38 Jun 14 11:11 flink-root-taskexecutor-0-redisnode01.out
[root@redisnode01 log]# cat flink-root-taskexecutor-0-redisnode01.out
1> (hello,1)
1> (china,1)
1> (this,1)      

11. 再次檢視首頁的任務槽 發現為1,也就是有任務占着的時候,其任務槽不會釋放。也就是說不能同時使用(需要将任務cancel)

Flink叢集部署-standalone部署模式

 12. 終止叢集

   和之前一樣用 stop-cluster 腳本即可。

4. 叢集部署模式-standalone模式

  Flink 提供了三種模式, 三種模式的差別主要是對資源的競争以及main 方法的執行(是在client 還是JobManager)。總的來說三個都屬于獨立模式(Standalone-不借助于其他元件,一個Flink 叢集就搞定),這種模式有個缺點就是資源不足或者出現故障,需要手動處理。這種模式一般在開發或者作業較少的情況下使用。

1. 會話模式(Session Mode)

  上面的部署可以了解為一個會話模式。TaskManager 和 JonManager 在啟動後會立即建立。下面兩種模式都是作業送出才會建立TaskManager 和 JonManager 。

  首先啟動一個叢集保持一個會話,在這個會話中通過用戶端送出作業。上面啟動後檢視JVM 程序也可以看到有一個主類為StandaloneSessionClusterEntrypoint。 該模式下作業需要送出上去,且作業之間搶占資源,且作業運作的時候不釋放資源。 該模式适合單個規模小、執行時間段的大量作業。(這種模式一般會結合其他部署平台使用,比如k8s)。

2. 每作業模式(Per-Task),已經為過時的模式

  每個作業一個微型的叢集(送出作業才會啟動叢集,每個作業一個叢集(一個主類可能會拆成多個作業,比如從兩個流讀入資料)),作業完成後叢集就會關閉。 FLink 本身不提供單作業模式,需要結合其他資源管理平台,比如k8s。

3. 應用模式(Application Mode)

  該模式類似于上面的單作業模式,都是先送出作業才會建立叢集。隻是單作業模式是用戶端先執行一遍代碼将作業拆分出來,每個作業一個叢集。這種模式是JobManager 執行應用程式,并且一段代碼即使包含了多個作業,也隻建立一個叢集(可以了解為一個jar 包一個叢集)。Flink 提供這種模式的送出。

1. 拷貝lib到flink安裝目錄的lib 目錄
2. 啟動JobManager
./bin/standalone-job.sh start --job-classname cn.qz.SocketStreamWordCount
3. 啟動TaskManager
./bin/taskmanager.sh start
4. nc -l 建立輸入資訊
5. 檢視輸出
[root@k8smaster01 log]# cat flink-root-taskexecutor-0-k8smaster01.out
(this,1)
(is,1)
(application,1)
(test,1)
6. 停止叢集
[root@k8smaster01 flink-1.13.0]# ./bin/standalone-job.sh stop
Stopping standalonejob daemon (pid: 32587) on host k8smaster01.
[root@k8smaster01 flink-1.13.0]# ./bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 32924) on host k8smaster01.      

結合K8S部署參考: ​​https://github.com/tkestack/flink-on-k8s-operator​​