簡單研究下Flink的任務部署。我們在IDEA 開發工具中用代碼跑Flink 的時候,實際是會虛拟出一個小型的Flink 叢集,當執行execute 的時候是将上面的代碼作為一個job 送出到Flink 的JobManager中。
參考: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/overview/
1.簡介
Flink 送出作業和執行任務,需要以下幾個元件:用戶端(Client)、作業管理器(JobManager)、任務管理器(TaskManager)。我們的代碼由用戶端擷取,之後送出給JobManager,做進一步轉換處理後,然後分發給任務衆多的TaskManager。這裡的TaskManager是真正幹活的人,資料處理操作是他們處理的。
2. 單機版部署
1. 資源下載下傳:
https://archive.apache.org/dist/flink/flink-1.13.0/ 下載下傳scala_2.12.tgz
2. 開始啟動叢集
下載下傳下來,直接啟動即可。就回啟動一個單節點的叢集。JobManager和TaskManager 都在同一個機子。
[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh
然後檢視Java 相關程序
[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
4362 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
2991 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
3. 啟動後會啟動一個webui 控制台。預設端口是8081,這些預設的端口都是可以修改的。在配置檔案 conf/flink-conf.yaml
注意這裡有個可用的任務槽, 可以了解為Worker 節點數量。(後面跑任務的并行度需要小于等于該值)
4. 将之前的項目打包成jar 包
(1)修改類監聽端口為 192.168.13.107
package cn.qz;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketStreamWordCount {
public static void main(String[] args) throws Exception {
// 1. 建立執行環境(流處理執行環境)
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 讀取檔案
DataStreamSource<String> txtDataSource = executionEnvironment.socketTextStream("192.168.13.107", 7777);
// 3. 轉換資料格式
SingleOutputStreamOperator<Tuple2<String, Long>> singleOutputStreamOperator = txtDataSource
.flatMap((String line, Collector<String> words) -> {
Arrays.stream(line.split(" ")).forEach(words::collect);
})
.returns(Types.STRING)
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));// lambda 使用泛型,由于泛型擦除,需要顯示的聲明類型資訊
// 4. 分組
KeyedStream<Tuple2<String, Long>, String> tuple2StringKeyedStream = singleOutputStreamOperator.keyBy(t -> t.f0);
// 5. 求和
SingleOutputStreamOperator<Tuple2<String, Long>> sum = tuple2StringKeyedStream.sum(1);
// 6. 列印
sum.print();
// 7. 執行
executionEnvironment.execute();
}
}
View Code
(2) 打包
5. 上傳jar包然後送出任務(107 伺服器先nc -l 7777 監聽端口)
可以選擇入口類,1 是并行度(任務槽)。 也可以輸入程式參數和儲存點儲存的路徑。(這裡的操作應該是用戶端先跑一遍代碼,把任務摘出來,然後交給TaskManager,TaskManager 分發給JobManager)
6. 測試:
發送資料:
hello flink
hello china
7. 檢視flink 控制台
上面的日志實際會輸出到 workers 伺服器的std. 這裡就是107 伺服器的。
8. 停止任務(Running Jobs -》 選中任務-》Cancel Job)
9. 曆史任務可以到 Completed Jobs 檢視執行過的任務的資訊
10. 指令行送出任務和取消任務
(1) 送出任務
[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink run -c cn.qz.SocketStreamWordCount -p 1 ./study-flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 46b0c1daf825694fbc1692add3477ba9
(2) webui 檢視任務
(3) nc -l 視窗輸入資訊進行測試
(4) 取消任務
[root@k8smaster01 flink]# ./flink-1.13.0/bin/flink cancel 46b0c1daf825694fbc1692add3477ba9
Cancelling job 46b0c1daf825694fbc1692add3477ba9.
Cancelled job 46b0c1daf825694fbc1692add3477ba9.
11. 關閉叢集
[root@k8smaster01 flink]# ./flink-1.13.0/bin/stop-cluster.sh
3. 叢集版部署
1. 節點規劃
redisnode01 192.168.13.111 worker
redisnode2 192.168.13.112 worker
k8smaster01 192.168.13.107 masters+workers
2. 三個節點配置SSH免密登入
3. 三個節點配置hosts 檔案,通過主機名稱可以通路
[root@k8smaster01 flink-1.13.0]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.13.107 k8smaster01
192.168.13.111 redisnode01
192.168.13.112
通過scp 将三個檔案複制到其他節點:
scp /etc/hosts 192.168.13.107:/etc/
4. 主節點修改flink配置
主要是修改管理節點和工作節點的資訊
(1) 修改conf/flink-con.yaml中如下:
jobmanager.rpc.address: 192.168.13.107
(2) 修改masters内容為主節點加WEBUI端口
192.168.13.107:8081
(3) 修改workers(也可以是主機名稱,自己配置了hosts 檔案)
192.168.13.107
192.168.13.111
192.168.13.112
5. 将主節點的配置拷貝到另外兩個機子(scp 複制)=三個節點的配置保持一緻
scp -r ./flink-1.13.0 192.168.13.111:/opt/flink/
6. 三個節點都配置環境變量
其實master 節點可以不配置,master 節點可以通過全路徑進行啟動叢集。workers 節點必須配置環境變量。
/etc/profile 增加如下配置:
export JAVA_HOME=/home/javatest/jdk8/jdk1.8.0_291/
export PATH=$PATH:$JAVA_HOME/bin:/opt/flink/flink-1.13.0/bin
7. 啟動叢集
啟動叢集的時候, 在主節點啟動即可。workers 節點不需要再次啟動,主節點啟動後會通過網絡去自動啟動worker 節點
[root@k8smaster01 flink-1.13.0]# ./bin/start-cluster.sh
(1) 主節點檢視程序
[root@k8smaster01 flink-1.13.0]# jps -l | grep -v jps
11860 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
12187
(2) 工作節點111 檢視jps 程序
[root@redisnode01 log]# jps -l | grep -v jps
17122
8. web 檢視資訊,可以看出有三個任務槽
(1)可以看出三個workers 節點資訊
(2)點進去可以看到相關的記憶體資訊
9. 107 節點nc -l 7777 監聽端口,然後從web界面再次添加任務
将上面的并行度設定為2,也就是說會占用兩個任務槽。
10. 測試
(1) 輸入如下資訊:
-[root@k8smaster01 flink-1.13.0]#nc -l 7777
hello china this is new task test !!!
(2) 檢視輸出。 可以看出上面的輸出是在兩個worker 節點的标準輸出
(3)檢視輸出詳情
107:
111:
(4) 也可以從日志檔案檢視輸出。 下面以111 為例子
[root@redisnode01 log]# ll -tr
total 112
-rw-r--r--. 1 root root 20231 Jun 14 09:47 flink-root-taskexecutor-0-redisnode01.log.1
-rw-r--r--. 1 root root 19747 Jun 14 09:55 flink-root-taskexecutor-0-redisnode01.log.2
-rw-r--r--. 1 root root 0 Jun 14 11:04 flink-root-taskexecutor-1-redisnode01.out
-rw-r--r--. 1 root root 19747 Jun 14 11:05 flink-root-taskexecutor-1-redisnode01.log
-rw-r--r--. 1 root root 19628 Jun 14 11:05 flink-root-taskexecutor-0-redisnode01.log.3
-rw-r--r--. 1 root root 25746 Jun 14 11:10 flink-root-taskexecutor-0-redisnode01.log
-rw-r--r--. 1 root root 38 Jun 14 11:11 flink-root-taskexecutor-0-redisnode01.out
[root@redisnode01 log]# cat flink-root-taskexecutor-0-redisnode01.out
1> (hello,1)
1> (china,1)
1> (this,1)
11. 再次檢視首頁的任務槽 發現為1,也就是有任務占着的時候,其任務槽不會釋放。也就是說不能同時使用(需要将任務cancel)
12. 終止叢集
和之前一樣用 stop-cluster 腳本即可。
4. 叢集部署模式-standalone模式
Flink 提供了三種模式, 三種模式的差別主要是對資源的競争以及main 方法的執行(是在client 還是JobManager)。總的來說三個都屬于獨立模式(Standalone-不借助于其他元件,一個Flink 叢集就搞定),這種模式有個缺點就是資源不足或者出現故障,需要手動處理。這種模式一般在開發或者作業較少的情況下使用。
1. 會話模式(Session Mode)
上面的部署可以了解為一個會話模式。TaskManager 和 JonManager 在啟動後會立即建立。下面兩種模式都是作業送出才會建立TaskManager 和 JonManager 。
首先啟動一個叢集保持一個會話,在這個會話中通過用戶端送出作業。上面啟動後檢視JVM 程序也可以看到有一個主類為StandaloneSessionClusterEntrypoint。 該模式下作業需要送出上去,且作業之間搶占資源,且作業運作的時候不釋放資源。 該模式适合單個規模小、執行時間段的大量作業。(這種模式一般會結合其他部署平台使用,比如k8s)。
2. 每作業模式(Per-Task),已經為過時的模式
每個作業一個微型的叢集(送出作業才會啟動叢集,每個作業一個叢集(一個主類可能會拆成多個作業,比如從兩個流讀入資料)),作業完成後叢集就會關閉。 FLink 本身不提供單作業模式,需要結合其他資源管理平台,比如k8s。
3. 應用模式(Application Mode)
該模式類似于上面的單作業模式,都是先送出作業才會建立叢集。隻是單作業模式是用戶端先執行一遍代碼将作業拆分出來,每個作業一個叢集。這種模式是JobManager 執行應用程式,并且一段代碼即使包含了多個作業,也隻建立一個叢集(可以了解為一個jar 包一個叢集)。Flink 提供這種模式的送出。
1. 拷貝lib到flink安裝目錄的lib 目錄
2. 啟動JobManager
./bin/standalone-job.sh start --job-classname cn.qz.SocketStreamWordCount
3. 啟動TaskManager
./bin/taskmanager.sh start
4. nc -l 建立輸入資訊
5. 檢視輸出
[root@k8smaster01 log]# cat flink-root-taskexecutor-0-k8smaster01.out
(this,1)
(is,1)
(application,1)
(test,1)
6. 停止叢集
[root@k8smaster01 flink-1.13.0]# ./bin/standalone-job.sh stop
Stopping standalonejob daemon (pid: 32587) on host k8smaster01.
[root@k8smaster01 flink-1.13.0]# ./bin/taskmanager.sh stop
Stopping taskexecutor daemon (pid: 32924) on host k8smaster01.
結合K8S部署參考: https://github.com/tkestack/flink-on-k8s-operator