天天看點

Flink 必知必會經典課程2:Stream Processing with Apache Flink

作者|崔星燦

本篇内容包含三部分展開介紹Stream Processing with Apache Flink:
  1. 并行處理和程式設計範式
  2. DataStream API概覽及簡單應用
  3. Flink 中的狀态和時間

一、并行處理和程式設計範式

衆所周知,對于計算密集型或資料密集型這樣需要計算量比較大的工作,并行計算或分而治之是解決這一類問題非常有效的手段。在這個手段中比較關鍵的部分是,如何對一個已有任務的劃分,或者說如何對計算資源進行合理配置設定。

舉例說明,上學期間老師有時會找同學來協助批閱考試試卷。假如卷子裡面一共有ABC三個題,那麼同學可能會有如下分工協作方式。

  • 方式一:将所有試卷的三個題分别交給不同的人來批閱。這種方式,每個批閱的同學批自己負責的題目後就可以把試卷傳給下一個批閱同學,進而形成一種流水線的工作效果。因為總共隻有三道題目,這種流水線的協作方式會随着同學數量的增加而難以繼續擴充。
  • 方式二:分工方式一的擴充,同一題目允許多個同學來共同批閱,比如A題目由兩個同學共同批閱,B題目由三個同學批閱,C題目隻由一個同學批閱。這時候我們就需要考慮怎樣進一步的對計算任務做劃分。比如,可以把全部同學分成三組,第一組負責A題目,第二個組負責B題目第三個組負責C。第一個組的同學可以再次再組内進行分工,比如A組裡第一個同學批一半的卷子,第二個同學批另一半卷子。他們分别批完了之後,再将自己手裡的試卷傳遞給下一個組。

像上述按照試卷内題目進行劃分,以及講試卷本身進行劃分,就是所謂的計算的并行性和資料并行性。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

我們可以用上面有向無環圖來表示這種并行性。

在圖中,批閱A題目的同學,假設還承擔了一些額外任務,比如把試卷從老師的辦公室拿到批閱試卷的地點;負責C題的同學也額外任務,就是等所有同學把試卷批完後,進行總分的統計和記錄上交的工作。據此,可以把圖中所有的節點劃分為三個類别。第一個類别是Source,它們負責擷取資料(拿試卷);第二類是資料處理節點,它們大多時候不需要和外部系統打交道;最後一個類别負責将整個計算邏輯寫到某個外部系統(統分并上交記錄)。這三類節點分别就是Source節點、Transformation節點和Sink節點。DAG圖中,節點表示計算,節點之間的連線代表計算之間的依賴。

關于程式設計的一些内容

Flink 必知必會經典課程2:Stream Processing with Apache Flink

假設有一個資料集,其中包含1~10十個數字,如果把每一個數字都乘以2并做累計求和操作(如上圖所示)怎麼操作呢?辦法有很多。

如果用程式設計來解決有兩個角度:第一種是采取指令式程式設計方式,一步一步的相當于告訴機器應該怎樣生成一些資料結構,怎樣的用這些資料結構去存儲一些臨時的中間結果,怎樣把這些中間結果再轉換成為最終的結果,相當于一步一步告訴機器如何去做;第二種是聲明的方式,聲明式程式設計裡通常隻需要告訴機器去完成怎樣的任務,而不需要像指令式那樣詳細傳遞。例如我們可以把原有的資料集轉化成一個Stream,然後再把 Stream轉化成一個Int類型的Stream,在此過程中,把每一個數字都乘2,最後再調用 Sum方法,就可以獲得所有數字的和。

聲明式程式設計語言的代碼更簡潔,而簡潔的開發方式,正是計算引擎追求的效果。是以在 Flink 裡所有與任務編寫相關的API,都是偏向聲明式的。

二、DataStream API概覽及簡單應用

在詳細介紹DataStream API之前,我們先來看一下 Flink API的邏輯層次。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

在舊版本的 Flink 裡,它的API層次遵循上圖左側這樣四層的關系。最上層表示我們可以用比較進階的API,或者說聲明程度更高的Table API以及SQL的方式來編寫邏輯。所有SQL和Table API編寫的内容都會被Flink内部翻譯和優化成一個用DataStream API實作的程式。再往下一層,DataStream API的程式會被表示成為一系列Transformation,最終 Transformation會被翻譯成JobGraph(即上文介紹的DAG)。

而在較新版本的 Flink 裡發生了一些改變,主要的改變展現在 Table API 和 SQL 這一層上。它不再會被翻譯成 DataStream API 的程式,而是直接到達底層 Transformation 一層。換句話說,DataStream API 和 Table API 這兩者的關系,從一個下層和上層的關系變為了一個平級的關系,這樣流程的簡化,會相應地帶來一些查詢優化方面的好處。

接下來我們用一個簡單的 DataStream API 程式作為示例來介紹,還是上文乘2再求和的需求。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

如果用 Flink 表示,它的基本代碼如上圖所示。看上去比單機的示例要稍微的複雜一點,我們一步一步來分解看。

  • 首先,用 Flink 實作任何功能,一定要擷取一個相應的運作環境,也就是 Sream Execution Environment;
  • 其次,在擷取環境後,可以調用環境的 add Source 方法,來為邏輯添加一個最初始資料源的輸入;設定完資料源後可以拿到資料源的引用,也就是 Data Source 對象;
  • 最後,可以調用一系列的轉換方法來對 Data Source 中的資料進行轉化。

這種轉化如圖所示,就是把每個數字都×2,随後為了求和我們必須利用 keyBy 對資料進行分組。傳入的常數表示把所有的資料都分到一組裡邊,最後再對這個組裡邊的所有的資料,按照第一個字段進行累加,最終得到結果。在得到結果後,不能簡單的像單機程式那樣把它輸出,而是需要在整個邏輯裡面加一個的 Sink 節點,把所有的資料寫到目标位置。上述工作完成後,要去調用 Environment 裡面 Execute 方法,把所有上面編寫的邏輯統一送出到遠端或者本地的一個叢集上執行。

Flink DataStream API 編寫程式和單機程式最大的不同就在于,它前幾步的過程都不會觸發資料的計算,而像在繪制一個 DAG 圖。等整個邏輯的 DAG 圖繪制完畢之後,就可以通過 Execute 方法,把整個的圖作為一個整體,送出到叢集上去執行。

介紹到這裡,就把Flink DataStream API和DAG圖聯系在一起了。 事實上,Flink 任務具體的産生過程比上面描述的要複雜得多,它要經過一步步轉化和優化等,下圖展示了Flink 作業的具體生成過程。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

DataStream API裡提供的轉換操作

就像上文在示例代碼中展示的,每一個 DataStream對象,在被調用相應方法的時候,都會産生一個新的轉換。相應的,底層會生成一個新的算子,這個算子會被添加到現有邏輯的DAG圖中。相當于添加一條連線來指向現有DAG圖的最後一個節點。所有的這些API在調動它的時候都會産生一個新的對象,然後可以在新的對象上去繼續調用它的轉換方法。就是像這種鍊式的方式,一步一步把這個DAG圖給畫出來。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

上述解釋涉及到了一些高階函數思想。每去調用 DataStream上的一個轉換時,都需要給它傳遞的一個參數。換句話說,轉換決定了你想對這個資料進行怎樣的操作,而實際傳遞的包在算子裡面的函數決定了轉換操作具體要怎樣完成。

上圖中,除了左邊列出來的 API, Flink DataStream API 裡面還有兩個非常重要的功能,它們是 ProcessFunction以及 CoProcessFunction。這兩個函數是作為最底層的處理邏輯提供給使用者使用的。上圖所有左側藍色涉及的轉換,理論上來講都可以用底層的ProcessFunction和CoProcessFunction去完成。

關于資料分區

資料分區是指在傳統的批進行中對資料Shuffle的操作。如果把撲克牌想成資料,傳統批處理裡的Shuffle操作就相當于理牌的過程。一般情況下在抓牌過程中,我們都會把牌理順排列好,相同的數字還要放在一起。這樣做最大的好處是,出牌時可以一下子找到想出的牌。Shuffle是傳統的批處理的方式。因為流處理所有的資料都是動态來的,是以理牌的過程或者說處理資料,進行分組或分區的過程,也是線上來完成的。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

例如上圖右側所示,上遊有兩個算子A的處理執行個體,下遊是三個算子B處理執行個體。這裡展示的流處理等價于Shuffle的操作被稱為資料分區或資料路由。它用來表示A處理完資料後,要把結果發到下遊B的哪個處理執行個體上。

Flink 裡提供的分區政策

圖X是 Flink 提供的分區政策。需要注意的是, DataStream調用keyBy方法後,可以把整個資料按照一個Key值進行分區。但要嚴格來講,其實keyBy并不算是底層實體分區政策,而是一種轉換操作,因為從API角度來看,它會把DataStream轉化成 KeyedDataStream的類型,而這兩者所支援的操作也有所不同。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

所有這些分區政策裡,稍微難了解的可能是Rescale。Rescale涉及到上下遊資料本地性的問題,它和傳統的Rebalance,即Round-Pobin,輪流配置設定類似。差別在于Rescale是它會盡量避免資料跨網絡的傳輸。

如果所有上述的分區政策都不适用的話,我們還可以自己調用 PartitionCustom去自定義一個資料的分區。值得注意的是,它隻是自定義的單點傳播,即對每一個資料隻能指定它一個下遊所要發送的執行個體,而沒有辦法把它複制成多份發送到下遊的多個執行個體中。

Flink支援的連接配接器

上文介紹過,圖X裡有兩個關鍵的節點:A節點,需要去連接配接外部系統,從外部系統把資料讀取到 Flink的處理叢集裡;C節點,即Sink節點,它需要彙總處理完的結果,然後把這個結果寫入到某個外部系統裡。這裡的外部系統可以是一個檔案系統,也可以是一個資料庫等。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

Flink 裡的計算邏輯可以沒有資料輸出,也就是說可以不把最終的資料寫出到外部系統,因為Flink裡面還有一個State的狀态的概念。在中間計算的結果實際上是可以通過 State暴露給外部系統,是以允許沒有專門的Sink。但每一個 Flink 應用都肯定有Source,也就是說必須從某個地方把資料讀進來,才能進行後續的處理。

關于 Source和Sink兩類連接配接器需要關注的點如下:

  • 對于Sourse而言,我們往往比較關心是否支援續監測并接入資料更新,然後把相應的更新資料再給傳輸到這個系統當中來。舉例來說,Flink對于檔案有相應的FileSystem連接配接器,例如CSV檔案。CSV檔案連接配接器在定義時,可以通過參數指定是否持續監測某個目錄的檔案變化,并接入更新後的檔案。
  • 對于Sink來講,我們往往關心要寫出的外部系統是否支援更新已經寫出的結果。比如要把資料寫到Kafka裡,通常情況下資料寫入是一種Append-Only,即不能修改已經寫入系統裡的記錄(社群正在利用Kafka Compaction實作Upsert Sink);如果是寫入資料庫,那麼通常可以支援利用主鍵對現有資料進行更新。

以上兩個特性,決定了Flink 裡連接配接器是面向靜态資料還是面向動态的資料的關鍵點。

提醒,上面截圖是 Flink 1.11版本之後的文檔,連接配接器在 Flink 1.11 版本裡有較大重構。另外,關于Table、SQL、API這個層面的連接配接器,比起DataStream層面的連接配接器,會承擔更多的任務。比如是否支援一些謂詞或投影操作的下推等等。這些功能可以幫助提高資料處理的整體性能。

三、Flink 中的狀态和時間

如果想要深入地了解DataStream API,狀态和時間是必須掌握的要點。

所有的計算都可以簡單地分為無狀态計算和有狀态計算。無狀态計算相對而言比較容易。假設這裡有個加法算子,每進來一組資料,都把它們全部加起來,然後把結果輸出去,有點純函數的味道。純函數指的是每一次計算結果隻和輸入資料有關,之前的計算或者外部狀态對它不會産生任何影響。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

這裡我們主要講一下Flink裡邊的有狀态計算。用撿樹枝的小遊戲來舉例。這個遊戲在我看來做的非常好的一點是它自己記錄了非常多的狀态,比如幾天沒上線,然後再去和裡邊的 NPC對話的時候,它就會告訴你已經有好久沒有上線了。換句話說,它會把之前上線的時間作為一種狀态給記錄下來,在生成它NPC對話的時候,是會受到這個狀态的影響。

實作這種有狀态的計算,要做的一點就是把之前的狀态記錄下來,然後再這個狀态注入到新的一次計算中,具體實作方式也有下面兩種:

  • 第一種,把狀态資料進入算子之前就給提取出來,然後把這個狀态資料和輸入資料合并在一起,再把它們同時輸入到算子中,得到一個輸出。這種方式是被用在 Spark的StructureStreaming裡邊。其好處是是可以重用已有的無狀态算子。
  • 第二種,是 Flink 現在的方法,就是算子本身是有狀态的,算子在每一次到新資料之後做計算的時候,同時考慮新輸資料和已有的狀态對計算過程的影響,最終把結果輸出出去。

計算引擎也應該像上面提到的遊戲一樣變得越來越智能,可以自動學習資料中潛在的規律,然後來自适應地優化計算邏輯,保持較高的處理性能。

Flink 的狀态原語

Flink的狀态原語涉及如何通過代碼使用 Flink的狀态。其基本思想是在程式設計的時候抛棄原生語言(例如Java或Scala)提供的資料容器,把它們更換為 Flink 裡面的狀态原語。

作為對狀态支援比較好的系統, Flink 内部提供了可以使用的很多種可選的狀态原語。從大的角度看,所有狀态原語可以分為Keyed State和Operator State兩類。Operator State應用相對比較少,我們在這裡不展開介紹。下面重點看一下Keyed State。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

Keyed State,即分區狀态。分區狀态的好處是可以把已有狀态按邏輯提供的分區分成不同的塊。塊内的計算和狀态都是綁定在一起的,而不同的Key值之間的計算和狀态的讀寫都是隔離的。對于每個Key值,隻需要管理好自己的計算邏輯和狀态就可以了,不需要去考慮其它Key值所對應的邏輯和狀态。

Keyed State可以進一步劃分為下面的5類,它們分别是:

  • 比較常用的:ValueState、ListState、MapState
  • 不太常用的:ReducingState和AggregationState

Keyed State隻能在RichFuction中使用,RichFuction與普通、傳統的Function相比,最大的不同就是它有自己的生命周期。Key State的使用方法分為以下四個步驟:

  • 第一步,将 State聲明為RichFunction裡的執行個體的變量
  • 第二步,在RichFunction對應的 open方法中,為 State進行一個初始化的指派操作。指派操作要有兩步:先建立一個StateDescriptor,在建立中需要給State指定一個名稱;然後再去調用RichFuntion中的getRuntimeContext().getState(…),把剛剛定義的StateDescriptor傳進去,就可以擷取State。

提醒:如果此流式應用是第一次運作,那麼獲得的State會是空内容的;如果State是從某個中間段重新開機的,它會根據配置和之前儲存的資料的基礎上進行恢複。

Flink 必知必會經典課程2:Stream Processing with Apache Flink
  • 第三步,得到State對象後,就可以在RichFunction裡,對對應的State進行讀寫。如果是ValueState,可以調用它的Value方法來擷取對應值。Flink 架構會控制好所有狀态的并發通路,并進行限制,是以使用者不需要考慮并發的問題。

Flink 的時間

時間也是 Flink非常重要的一點,它和State是相輔相成的。總體來看 Flink引擎裡邊提供的時間有兩類:第一類是Processing Time;第二類是Event Time。Processing Time表示的是真實世界的時間,Event Time是資料當中包含的時間。資料在生成的過程當中會攜帶時間戳之類的字段,因為很多時候需要将資料裡攜帶的時間戳作為參考,然後對資料進行分時間的處理。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

Processing Time處理起來相對簡單,因為它不需要考慮亂序等問題;而Event Time處理起來相對複雜。而由于Processing Time在使用時是直接調取系統的時間,考慮到多線程或分布式系統的不确定性,是以它每次運作的結果可能是不确定的;相反,因為Event Time時間戳是被寫入每一條資料裡的,是以在重放某個資料進行多次處理的時候,攜帶的這些時間戳不會改變,如果處理邏輯沒有改變的話,最後的結果也是比較确定的。

Processing Time和Event Time的差別。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

以上圖的資料為例,按照1~7的時間來排列的。對于機器時間而言,每個機器的時間會單調增加。在這種情況下,用Processing Time獲得的時間是完美的按照時間從小到大排序的資料。對于Event Time而言,由于延遲或分布式的一些原因,資料到來的順序可能和它們真實産生的順序有一定的出入,資料可能存在着一定程度的亂序。這時就要充分利用資料裡邊攜帶的時間戳,對資料進行一個粗粒度的劃分。例如可以把資料分為三組,第一組裡最小的時間是1,第二組最小的時間是4,第三組最小的時間是7。這樣劃分之後,資料在組群組之間就是按從小到大的順序排列好的。

怎樣充分的把一定程度的亂序化解掉,讓整個的系統看上去資料進來基本上是有順序的?一種解決方案是在資料中間插入被稱為Watermark的meta資料。在上圖的例子中,前三個資料到來之後,假設再沒有小于等于3的資料進來了,這時就可以插入一條Watermark 3到整個資料裡,系統在看到Watermark 3時就知道,以後都不會有小于或等于3的資料過來了,這時它就可以放心大膽地進行自己的一些處理邏輯。

總結一下,Processing Time在使用時,是一個嚴格遞增的;而Event Time會存在一定的亂序,需要通過Watermark這種辦法對亂序進行一定緩解。

從API的角度來看,怎樣去配置設定Timestamp或生成Watermark也比較容易,有兩種方式:

第一種,在SourceFunction當中調用内部提供的 collectWithTimestamp方法,把包含時間戳的資料提取出來;還可以在SourceFunction中使用 emitWatermark方法去産生一個Watermark,然後插入到資料流中。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

第二種,如果不在SourceFunction中可以調用DateStream.assignTimestampsAndWatermarks這個方法,同時傳入兩類Watermark生成器:

第一類是定期生成,相當在環境裡通過配置一個值,比如每隔多長時間(指真實時間)系統會自動調用Watermar生成政策。

第二類是根據特殊記錄生成,如果遇到一些特殊資料,可以采取AssignWithPunctuatedWatermarks這個方法來進行時間戳和Watermark的配置設定。

提醒:Flink 裡内置了一些常用的Assigner,即WatermarkAssigner。比如針對一個固定資料,它會把這個資料對應的時間戳減去固定的時間作為一個Watermark。關于Timestamp配置設定和Watermark生成接口,在後續的版本可能會有一定的改動。 注意,新版本的Flink裡面已經統一了上述兩類生成器。

時間相關API

Flink 在編寫邏輯時會用到的與時間相關的 API,下圖總結了Event Time和Processing Time相對應的API。

Flink 必知必會經典課程2:Stream Processing with Apache Flink

在應用邏輯裡通過接口支援可以完成三件事:

  • 第一,擷取記錄的時間。Event Time可以調context.getTimestamp,或在SQL算子内從資料字段中把對應的時間給提取出來。Processing Time可以直接調currentProcessingTime完成調取,它的内部是直接調用了擷取系統時間的靜态方法來傳回的值。
  • 第二,擷取Watermark。其實隻有在Event Time裡才有Watermark的概念,而Processing Time裡是沒有的。但在Processing Time中非要把某個東西當成Watermark,其實就是資料時間本身。也就是說第一次調用timerService.currentProcessingTime方法之後擷取的值。這個值既是目前記錄的這個時間,也是目前的Watermark值,因為時間總是往前流動的,第一次調用了這個值後,第二次調用時這個值肯定不會再比第一次值還小。
  • 第三,注冊定時器。定時器的作用是清理。比如需要對一個cache在未來某個時間進行清理工作。既然清理工作應該發生在未來的某個時間點,那麼可以調用 timerServicerEventTimeTimer或ProcessingTimeTimer方法注冊定時器,再在整個方法裡添加一個對定時器回調的處理邏輯。當對應的Event Time或者Processing Time的時間超過了定時器設定時間,它就會調用方法自己編寫定時器的毀掉邏輯。

以上就是關于StreamProcess with Apache Flink的介紹,下一篇内容将着重介紹Flink Runtime Architecture。

活動推薦:

僅需99元即可體驗阿裡雲基于 Apache Flink 建構的企業級産品-實時計算 Flink 版!點選下方連結了解活動詳情:

https://www.aliyun.com/product/bigdata/sc?utm_content=g_1000250506
Flink 必知必會經典課程2:Stream Processing with Apache Flink