Flink 架構及相關介紹
1.架構圖
分層設計說明(相關術語解釋)
實體部署層-deploy層
負責解決Flink的部署模式問題,
支援多種部署模式:本地部署、叢集部署(Standalone/Yarn/Mesos)、雲(GCE/EC2)以及kubernetes。
通過該層支援不同平台的部署,使用者可以根據自身場景和需求選擇使用對應的部署模式。
Runtime核心層
是Flink分布式計算架構的核心實作層,負責對上層不同接口提供基礎服務。
支援分布式Stream作業的執行、JobGraph到ExecutionGraph的映射轉換以及任務排程等。
将DataStream和DataSet轉成統一的可執行的Task Operator,達到在流式計算引擎下同時處理批量計算和流式計算的目的。
API & Libraries層
負責更好的開發使用者體驗,包括易用性、開發效率、執行效率、狀态管理等方面。
Flink同時提供了支撐流計算和批處理的接口,同時在這基礎上抽象出不同的應用類型的元件庫,如:
基于流處理的CEP(複雜事件處理庫)
Table & Sql庫
基于批處理的FlinkML(機器學習庫)
圖處理庫(Gelly)
API層包括兩部分
流計算應用的DataStream API
批處理應用的DataSet API
統一的API,友善用于直接操作狀态和時間等底層資料
提供了豐富的資料處理進階API,例如Map、FllatMap操作等,
并提供了比較低級的Process Function API
2.運作流程
下面是從Flink官網截取的一張架構圖:
在Flink運作時涉及到的程序主要有以下兩個: JobManager:主要負責排程task,協調checkpoint已經錯誤恢複等。當用戶端将打包好的任務送出到JobManager之後,JobManager就會根據注冊的TaskManager資源資訊将任務配置設定給有資源的TaskManager,然後啟動運作任務。TaskManger從JobManager擷取task資訊,然後使用slot資源運作task; TaskManager:執行資料流的task,一個task通過設定并行度,可能會有多個subtask。 每個TaskManager都是作為一個獨立的JVM程序運作的。他主要負責在獨立的線程執行的operator。其中能執行多少個operator取決于每個taskManager指定的slots數量。Task slot是Flink中最小的資源機關。假如一個taskManager有3個slot,他就會給每個slot配置設定1/3的記憶體資源,目前slot不會對cpu進行隔離。同一個taskManager中的slot會共享網絡資源和心跳資訊。
當然在Flink中并不是一個slot隻可以執行一個task,在某些情況下,一個slot中也可能執行多個task,如下:
一般情況下,flink都是預設允許共用slot的,即便不是相同的task,隻要都是來同一個job即可。共享slot的好處有以下兩點:
- 當Job的最高并行度正好和flink叢集的slot數量相等時,則不需要計算總的task數量。例如,最高并行度是6時,則隻需要6個slot,各個subtask都可以共享這6個slot; 2. 共享slot可以優化資源管理。如下圖,非資源密集型subtask source/map在不共享slot時會占用6個slot,而在共享的情況下,可以保證其他的資源密集型subtask也能使用這6個slot,保證了資源配置設定。
3. Flink中的資料
Flink中的資料主要分為兩類:有界資料流(Bounded streams)和無界資料流(Unbounded streams)。
3.1 無界資料流
顧名思義,無界資料流就是指有始無終的資料,資料一旦開始生成就會持續不斷的産生新的資料,即資料沒有時間邊界。無界資料流需要持續不斷地處理。
3.2 有界資料流
相對而言,有界資料流就是指輸入的資料有始有終。例如資料可能是一分鐘或者一天的交易資料等等。處理這種有界資料流的方式也被稱之為批處理:
需要注意的是,我們一般所說的資料流是指資料集,而流資料則是指資料流中的資料。
4. Flink中的程式設計模型
4.1 程式設計模型
在Flink,程式設計模型的抽象層級主要分為以下4種,越往下抽象度越低,程式設計越複雜,靈活度越高。
這4層中,一般用于開發的是第三層,即DataStrem/DataSetAPI。使用者可以使用DataStream API處理無界資料流,使用DataSet API處理有界資料流。同時這兩個API都提供了各種各樣的接口來處理資料。例如常見的map、filter、flatMap等等,而且支援python,scala,java等程式設計語言,後面的demo主要以scala為主。
4.2 程式結構
與其他的分布式處理引擎類似,Flink也遵循着一定的程式架構。下面以常見的WordCount為例:
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("/path/to/file")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath, "\n", " ")
下面我們分解一下這個程式。
第一步,我們需要擷取一個ExecutionEnvironment(如果是實時資料流的話我們需要建立一個StreamExecutionEnvironment)。這個對象可以設定執行的一些參數以及添加資料源。是以在程式的main方法中我們都要通過類似下面的語句擷取到這個對象:
val env = ExecutionEnvironment.getExecutionEnvironment
第二步,我們需要為這個應用添加資料源。這個程式中是通過讀取文本檔案的方式擷取資料。在實際開發中我們的資料源可能有很多中,例如kafka,ES等等,Flink官方也提供了很多的connector以減少我們的開發時間。一般都是都通addSource方法添加的,這裡是從文本讀入,是以調用了readTextFile方法。當然我們也可以通過實作接口來自定義source。
val text = env.readTextFile("/path/to/file")
第三步,我們需要定義一系列的operator來對資料進行處理。我們可以調用Flink API中已經提供的算子,也可以通過實作不同的Function來實作自己的算子,這個我們會在後面讨論。這裡我們隻需要了解一般的程式結構即可。
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
上面的就是先對輸入的資料進行分割,然後轉換成(word,count)這樣的Tuple,接着通過第一個字段進行分組,最後sum第二個字段進行聚合。
第四步,資料處理完成之後,我們還要為它指定資料的存儲。我們可以從外部系統導入資料,亦可以将處理完的資料導入到外部系統,這個過程稱為Sink。同Connector類似,Flink官方提供了很多的Sink供使用者使用,使用者也可以通過實作接口自定義Sink。
counts.writeAsCsv(outputPath, "\n", " ")