https://github.com/Wasabi1234/Flink-Tutorial
掌握Flink中三種常用的Time處理方式,掌握Flink中滾動視窗以及滑動視窗的使用,了解Flink中的watermark。
Flink 在流處理工程中支援不同的時間概念。
1 處理時間(Processing time)
執行相應算子操作的機器的系統時間.
當流程式在處理時間運作時,所有基于時間的 算子操作(如時間視窗)将使用運作相應算子的機器的系統時鐘。每小時處理時間視窗将包括在系統時鐘訓示整個小時之間到達特定算子的所有記錄。
例如,如果應用程式在上午9:15開始運作,則第一個每小時處理時間視窗将包括在上午9:15到上午10:00之間處理的事件,下一個視窗将包括在上午10:00到11:00之間處理的事件
處理時間是最簡單的時間概念,不需要流和機器之間的協調
它提供最佳性能和最低延遲。但是,在分布式和異步環境中,處理時間不提供确定性,因為它容易受到記錄到達系統的速度(例如從消息隊列)到記錄在系統内的算子之間流動的速度的影響。和停電(排程或其他)。
2 事件時間(Event time)
每個單獨的事件在其生産裝置上發生的時間.
此時間通常在進入Flink之前内置在記錄中,并且可以從每個記錄中提取該事件時間戳。
在事件時間,時間的進展取決于資料,而不是任何挂鐘。
事件時間程式必須指定如何生成事件時間水印,這是表示事件時間進度的機制.
在一個完美的世界中,事件時間處理将産生完全一緻和确定的結果,無論事件何時到達,或者順序.
但是,除非事件已知按順序到達(按時間戳),否則事件時間處理會在等待無序事件時産生一些延遲。由于隻能等待一段有限的時間,是以限制了确定性事件時間應用程式的可能性。
假設所有資料都已到達,算子操作将按預期運作,即使在處理無序或延遲事件或重新處理曆史資料時也會産生正确且一緻的結果。
例如,每小時事件時間視窗将包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何,或者何時處理它們。(有關更多資訊,請參閱有關遲發事件的部分。)
請注意,有時當事件時間程式實時處理實時資料時,它們将使用一些處理時間 算子操作,以確定它們及時進行。
3 攝取時間(Ingestion time)
事件進入Flink的時間.
在源算子處,每個記錄将源的目前時間作為時間戳,并且基于時間的算子操作(如時間視窗)引用該時間戳。
在概念上位于事件時間和處理時間之間。
與處理時間相比 ,它成本稍微高一些,但可以提供更可預測的結果。因為使用穩定的時間戳(在源處配置設定一次),是以對記錄的不同視窗 算子操作将引用相同的時間戳,而在處理時間中,每個視窗算子可以将記錄配置設定給不同的視窗(基于本地系統時鐘和任何運輸延誤)
與事件時間相比,無法處理任何無序事件或後期資料,但程式不必指定如何生成水印。
在内部,攝取時間與事件時間非常相似,但具有自動時間戳配置設定和自動水印生成函數
4 設定時間特性
Flink DataStream程式的第一部分通常設定基本時間特性
顯然,在Flink的流式處理環境中,預設使用處理時間
該設定定義了資料流源的行為方式(例如,它們是否将配置設定時間戳),以及視窗 算子操作應該使用的時間概念,比如
KeyedStream.timeWindow(Time.seconds(30))。
以下示例顯示了一個Flink程式,該程式在每小時時間視窗中聚合事件。視窗的行為适應時間特征。
- Java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 可選的:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
- Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
stream
.keyBy( _.getUser )
.timeWindow(Time.hours(1))
.reduce( (a, b) => a.add(b) )
.addSink(...)
請注意,為了在事件時間運作此示例,程式需要使用直接為資料定義事件時間的源并自行發出水印,或者程式必須在源之後注入時間戳配置設定器和水印生成器。這些函數描述了如何通路事件時間戳,以及事件流表現出的無序程度。
5 Windows
5.1 簡介
Windows是處理無限流的核心。Windows将流拆分為有限大小的“桶”,我們可以在其上應用計算。我們重點介紹如何在Flink中執行視窗,以及程式員如何從其提供的函數中獲益最大化。
視窗Flink程式的一般結構如下所示
第一個片段指的是被Keys化流
而第二個片段指的是非被Keys化流
正如所看到的,唯一的差別是keyBy(…)呼籲Keys流和window(…)成為windowAll(…)非被Key化的資料流。這也将作為頁面其餘部分的路線圖。
Keyed Windows
Non-Keyed Windows
在上面,方括号([…])中的指令是可選的。這表明Flink允許您以多種不同方式自定義視窗邏輯,以便最适合您的需求。
5.2 視窗生命周期
簡而言之,隻要應該屬于此視窗的第一個資料元到達,就會建立一個視窗,當時間(事件或處理時間)超過其結束時間戳加上使用者指定 時,視窗将被完全删除allowed lateness(請參閱允許的延遲))。Flink保證僅删除基于時間的視窗而不是其他類型,例如全局視窗(請參閱視窗配置設定器)。例如,使用基于事件時間的視窗政策,每5分鐘建立一個非重疊(或翻滾)的視窗,并允許延遲1分鐘,Flink将建立一個新視窗,用于間隔12:00和12:05當具有落入此間隔的時間戳的第一個資料元到達時,當水印通過12:06 時間戳時它将删除它。
此外,每個視窗将具有Trigger和一個函數(ProcessWindowFunction,ReduceFunction, AggregateFunction或FoldFunction)連接配接到它。該函數将包含要應用于視窗内容的計算,而Trigger指定視窗被認為準備好應用該函數的條件。
觸發政策可能類似于“當視窗中的資料元數量大于4”時,或“當水印通過視窗結束時”。
觸發器還可以決定在建立和删除之間的任何時間清除視窗的内容。在這種情況下,清除僅指視窗中的資料元,而不是視窗中繼資料。這意味着仍然可以将新資料添加到該視窗。
除了上述内容之外,您還可以指定一個Evictor,它可以在觸發器觸發後以及應用函數之前和/或之後從視窗中删除資料元。
5.3 被Keys化與非被Keys化Windows
要指定的第一件事是您的流是否應該鍵入。必須在定義視窗之前完成此 算子操作。使用the keyBy(…)将您的無限流分成邏輯被Key化的資料流。如果keyBy(…)未調用,則表示您的流不是被Keys化的。
對于被Key化的資料流,可以将傳入事件的任何屬性用作鍵(此處有更多詳細資訊)。擁有被Key化的資料流将允許您的視窗計算由多個任務并行執行,因為每個邏輯被Key化的資料流可以獨立于其餘任務進行處理。引用相同Keys的所有資料元将被發送到同一個并行任務。
在非被Key化的資料流的情況下,您的原始流将不會被拆分為多個邏輯流,并且所有視窗邏輯将由單個任務執行,即并行度為1。
6 視窗配置設定器
指定流是否已鍵入後,下一步是定義一個視窗配置設定器.
視窗配置設定器定義如何将資料元配置設定給視窗,這是通過WindowAssigner 在window(…)(對于被Keys化流)或windowAll()(對于非被Keys化流)調用中指定您的選擇來完成的
WindowAssigner負責将每個傳入資料元配置設定給一個或多個視窗
Flink帶有預定義的視窗配置設定器,用于最常見的用例,即
滾動視窗
滑動視窗
會話視窗
全局視窗
還可以通過擴充WindowAssigner類來實作自定義視窗配置設定器。所有内置視窗配置設定器(全局視窗除外)都根據時間為視窗配置設定資料元,這可以是處理時間或事件時間。請檢視我們關于活動時間的部分,了解處理時間和事件時間之間的差異以及時間戳和水印的生成方式。
基于時間的視窗具有開始時間戳(包括)和結束時間戳(不包括),它們一起描述視窗的大小。
在代碼中,Flink在使用TimeWindow基于時間的視窗時使用,該視窗具有查詢開始和結束時間戳的方法maxTimestamp()傳回給定視窗的最大允許時間戳
下圖顯示了每個配置設定者的工作情況。紫色圓圈表示流的資料元,這些資料元由某個鍵(在這種情況下是使用者1,使用者2和使用者3)劃分。x軸顯示時間的進度。
6.1 滾動視窗
一個滾動視窗配置設定器的每個資料元配置設定給指定的視窗的視窗大小。滾動視窗具有固定的尺寸,不重疊.
例如,如果指定大小為5分鐘的翻滾視窗,則将評估目前視窗,并且每五分鐘将啟動一個新視窗,如下圖所示
以下代碼段顯示了如何使用滾動視窗。
DataStream<T> input = ...;
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
6.2 滑動視窗
該滑動視窗配置設定器配置設定元件以固定長度的視窗。與滾動視窗配置設定器類似,視窗大小由視窗大小參數配置
附加的視窗滑動參數控制滑動視窗的啟動頻率。是以,如果幻燈片小于視窗大小,則滑動視窗可以重疊。在這種情況下,資料元被配置設定給多個視窗。
例如,您可以将大小為10分鐘的視窗滑動5分鐘。有了這個,你每隔5分鐘就會得到一個視窗,其中包含過去10分鐘内到達的事件,如下圖所示。
以下代碼段顯示了如何使用滑動視窗
DataStream<T> input = ...;
// 滑動 事件時間 視窗
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// 滑動 處理時間 視窗
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
val input: DataStream[T] = ...
// tumbling event-time windows
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// tumbling processing-time windows
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)