天天看點

flink 自定義 視窗_Flink從入門到放棄(九)-window&time概念了解Window的定義官方文檔中關于Window的說明:window VS timeWindowfilnk中的Time類型Processing Time:Event Time:Ingestion Time:Time圖

開始到window了,先回顧下入門版概念中對window的定義:

Window的定義

  • window:用來對一個無限的流設定一個有限的集合,在有界的資料集上進行操作的一種機制。window 又可以分為基于時間(Time-based)的 window 以及基于數量(Count-based)的 window。 KeyedStream→WindowedStream,注意datasource類型有變化 可以在已經分區的KeyedStream上定義Windows。Windows根據某些特征(例如,在最後5秒内到達的資料)對每個Keys中的資料進行分組。有關https://flink.xskoo.com/dev/stream/operators/windows.html的完整說明,請參見windows。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
           
  • tumbling time windows(翻滾時間視窗) -- 不會有視窗重疊,也就是一個元素隻能出現在一個視窗中
  • sliding time windows(滑動時間視窗)--會有視窗重疊,也就是一個元素可以出現在多個視窗中
data.keyBy(1)        .timeWindow(Time.minutes(1)) //tumbling time window 每分鐘統計一次數量和        .sum(1);        data.keyBy(1)        .timeWindow(Time.minutes(1), Time.seconds(30)) //sliding time window 每隔 30s 統計過去一分鐘的數量和        .sum(1);
           
  • timeWindow: 如上所說,根據時間來聚合流資料。例如:一分鐘的 tumbling time window 收集一分鐘的元素,并在一分鐘過後對視窗中的所有元素應用于一個函數。
  • windowAll: DataStream→AllWindowedStream,可以在非分區的資料上直接做windowAll操作 Windows可以在正常DataStream上定義。Windows根據某些特征(例如,在最後5秒内到達的資料)對所有流事件進行分組。有關https://flink.xskoo.com/dev/stream/operators/windows.html的完整說明,請參見windows。
**警告:**在許多情況下,這**是非并行**轉換。所有記錄将收集在windowAll 算子的一個任務中。
           
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
           

官方文檔中關于Window的說明:

Windows是處理無限流的核心。Windows将流拆分為有限大小的“桶”,我們可以在其上應用計算。本文檔重點介紹如何在Flink中執行視窗,以及程式員如何從其提供的函數中獲益最大化。

視窗Flink程式的一般結構如下所示。第一個片段指的是被Keys化流,而第二個片段指的是非被Keys化流。正如人們所看到的,唯一的差別是window(...)針對keyby之後的keyedStream,而windowAll(...)針對非被Key化的資料流。

被Keys化Windows

stream       .keyBy(...)               
           

非被Keys化Windows

stream       .windowAll(...)           
           

在上面,方括号([...])中的指令是可選的。這表明Flink允許您以多種不同方式自定義視窗邏輯,以便最适合您的需求。

window VS timeWindow

flink中keyedStream中還有一個timeWindow方法,這個方法是在window的基礎上做的封裝,看下代碼實作:

/**     * Windows this {@code KeyedStream} into tumbling time windows.     *     * 
           

This is a shortcut for either {@code .window(TumblingEventTimeWindows.of(size))} or * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic * set using * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)} * * @param size The size of the window. */ public WindowedStream timeWindow(Time size) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(TumblingProcessingTimeWindows.of(size)); } else { return window(TumblingEventTimeWindows.of(size)); } } public WindowedStream timeWindow(Time size, Time slide) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(SlidingProcessingTimeWindows.of(size, slide)); } else { return window(SlidingEventTimeWindows.of(size, slide)); } }

可以看到,不管是tumbling time windows,還是sliding time windows,底層都是window方法,是以在具體實作時,大多數情況下可以直接使用timewindow來替換window。

filnk中的Time類型

從上面代碼中可以看到,有Processing Time、Event Time,其實還有一種,叫Ingestion Time,以下解釋轉自flink官網及《從0到1學習Flink》(http://www.54tianzhisheng.cn/)—— Flink 中幾種 Time 詳解中的說明,如下:

Processing Time:

Processing Time 是指事件被處理時機器的系統時間。

當流程式在 Processing Time 上運作時,所有基于時間的操作(如時間視窗)将使用當時機器的系統時間。每小時 Processing Time 視窗将包括在系統時鐘訓示整個小時之間到達特定操作的所有事件。

例如,如果應用程式在上午 9:15 開始運作,則第一個每小時 Processing Time 視窗将包括在上午 9:15 到上午 10:00 之間處理的事件,下一個視窗将包括在上午 10:00 到 11:00 之間處理的事件。

Processing Time 是最簡單的 “Time” 概念,不需要流和機器之間的協調,它提供了最好的性能和最低的延遲。但是,在分布式和異步的環境下,Processing Time 不能提供确定性,因為它容易受到事件到達系統的速度(例如從消息隊列)、事件在系統内操作流動的速度以及中斷的影響。

Processing time refers to the system time of the machine that is executing the respective operation.

When a streaming program runs on processing time, all time-based operations (like time windows) will use the system clock of the machines that run the respective operator. An hourly processing time window will include all records that arrived at a specific operator between the times when the system clock indicated the full hour. For example, if an application begins running at 9:15am, the first hourly processing time window will include events processed between 9:15am and 10:00am, the next window will include events processed between 10:00am and 11:00am, and so on.

Processing time is the simplest notion of time and requires no coordination between streams and machines. It provides the best performance and the lowest latency. However, in distributed and asynchronous environments processing time does not provide determinism, because it is susceptible to the speed at which records arrive in the system (for example from the message queue), to the speed at which the records flow between operators inside the system, and to outages (scheduled, or otherwise).

簡單來說,processing time 就是在flink叢集上,目前資料被處理的時間;以flink叢集目前時間為準。

不過就像上面說的,在分布式和異步的場景下,PT無法保證資料處理的時間跟資料真正發生的時間是一緻的,因為MQ可能會亂序到達、重試之後到達;而資料在flink中處理時,并發下,某些線程處理速度的快慢也有可能會導緻某些資料後發而先至。

Event Time:

Event Time 是事件發生的時間,一般就是資料本身攜帶的時間。這個時間通常是在事件到達 Flink 之前就确定的,并且可以從每個事件中擷取到事件時間戳。在 Event Time 中,時間取決于資料,而跟其他沒什麼關系。Event Time 程式必須指定如何生成 Event Time 水印,這是表示 Event Time 進度的機制。

完美的說,無論事件什麼時候到達或者其怎麼排序,最後處理 Event Time 将産生完全一緻和确定的結果。但是,除非事件按照已知順序(按照事件的時間)到達,否則處理 Event Time 時将會因為要等待一些無序事件而産生一些延遲。由于隻能等待一段有限的時間,是以就難以保證處理 Event Time 将産生完全一緻和确定的結果。

假設所有資料都已到達, Event Time 操作将按照預期運作,即使在處理無序事件、延遲事件、重新處理曆史資料時也會産生正确且一緻的結果。 例如,每小時事件時間視窗将包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何。

請注意,有時當 Event Time 程式實時處理實時資料時,它們将使用一些 Processing Time 操作,以確定它們及時進行。

Event time is the time that each individual event occurred on its producing device. This time is typically embedded within the records before they enter Flink, and that event timestamp can be extracted from each record. In event time, the progress of time depends on the data, not on any wall clocks. Event time programs must specify how to generate Event Time Watermarks, which is the mechanism that signals progress in event time. This watermarking mechanism is described in a later section, https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#event-time-and-watermarks.

In a perfect world, event time processing would yield completely consistent and deterministic results, regardless of when events arrive, or their ordering. However, unless the events are known to arrive in-order (by timestamp), event time processing incurs some latency while waiting for out-of-order events. As it is only possible to wait for a finite period of time, this places a limit on how deterministic event time applications can be.

Assuming all of the data has arrived, event time operations will behave as expected, and produce correct and consistent results even when working with out-of-order or late events, or when reprocessing historic data. For example, an hourly event time window will contain all records that carry an event timestamp that falls into that hour, regardless of the order in which they arrive, or when they are processed. (See the section on https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_time.html#late-elements for more information.)

Note that sometimes when event time programs are processing live data in real-time, they will use some processing time operations in order to guarantee that they are progressing in a timely fashion.

簡單來說,event time就是資料在各自的業務伺服器上産生的時間,跟flink無關。

不過由于ET資料到達的方式可能會出現亂序,flink在處理資料的時候就需要等待一些時間,確定一些無序事件都被處理掉,也就導緻了會出現延遲。

另外,由于ET資料處理和flink中時間無關,是以要指定watermark,即水印,用于表示目前資料處理的進度。

Ingestion Time:

Ingestion Time 是事件進入 Flink 的時間。 在源操作處,每個事件将源的目前時間作為時間戳,并且基于時間的操作(如時間視窗)會利用這個時間戳。

Ingestion Time 在概念上位于 Event Time 和 Processing Time 之間。 與 Processing Time 相比,它稍微重一些,但結果更可預測。因為 Ingestion Time 使用穩定的時間戳(在源處配置設定一次),是以對事件的不同視窗操作将引用相同的時間戳,而在 Processing Time 中,每個視窗操作符可以将事件配置設定給不同的視窗(基于機器系統時間和到達延遲)。

與 Event Time 相比,Ingestion Time 程式無法處理任何無序事件或延遲資料,但程式不必指定如何生成水印。

在 Flink 中,,Ingestion Time 與 Event Time 非常相似,但 Ingestion Time 具有自動配置設定時間戳和自動生成水印功能。

Ingestion time is the time that events enter Flink. At the source operator each record gets the source’s current time as a timestamp, and time-based operations (like time windows) refer to that timestamp.

Ingestion time sits conceptually in between event time and processing time. Compared to processing time, it is slightly more expensive, but gives more predictable results. Because ingestion time uses stable timestamps (assigned once at the source), different window operations over the records will refer to the same timestamp, whereas in processing time each window operator may assign the record to a different window (based on the local system clock and any transport delay).

Compared to event time, ingestion time programs cannot handle any out-of-order events or late data, but the programs don’t have to specify how to generate watermarks.

Internally, ingestion time is treated much like event time, but with automatic timestamp assignment and automatic watermark generation.

介于PT和ET之間,指資料進入到Flink中的時間。

Time圖

下面一張圖表示了各個時間的産生(來自官網):

flink 自定義 視窗_Flink從入門到放棄(九)-window&time概念了解Window的定義官方文檔中關于Window的說明:window VS timeWindowfilnk中的Time類型Processing Time:Event Time:Ingestion Time:Time圖

這個圖畫的很清晰,可以很清楚的看到每種類型的時間産生的位置。

參考資料:

http://www.54tianzhisheng.cn/2018/12/11/Flink-time/

http://www.54tianzhisheng.cn/2018/12/08/Flink-Stream-Windows/

本文由部落格一文多發平台 https://openwrite.cn?from=article_bottom 釋出!

繼續閱讀