天天看點

Flink之window函數詳解

1、官網:

 https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/programming-model.html#windows      (建議大家多看看官網) 

2、什麼是Window

      Flink 認為 Batch 是 Streaming 的一個特例,是以 Flink 底層引擎是一個流式引擎,在上面實作了流處理和批處理。而視窗(window)就是從 Streaming 到 Batch 的一個橋梁。Flink 提供了 非常完善的視窗機制,這是我認為的 Flink 最大的亮點之一。

     在流處理應用中,資料是連續不斷的,是以我們不可能等到所有資料都到了才開始處理。當 然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過 去的 1 分鐘内有多少使用者點選了我們的網頁。在這種情況下,我們必須定義一個視窗,用來 收集最近一分鐘内的資料,并對這個視窗内的資料進行計算。

       聚合事件(例如 count、sum)在流上的工作方式與在批進行中不同。例如,不可能計算流中的 所有元素,因為流通常是無限的(無界的)。相反,流上的聚合(count、sum 等)由視窗限定範 圍,例如“過去 5 分鐘内的計數”或“最後 100 個元素的總和”。也就是說,流資料的計算 可以把連續不斷的資料按照一定的規則拆分成大量的片段,在片段内進行統計和計算。比如 可以把一小時内的資料儲存到一個小的資料庫表裡,然後對這部分資料進行計算和統計,這 時流計算是提供自動切割的一種機制-視窗。

Window 可以是時間驅動的(例如:每 30 秒),也可以是資料驅動的(例如:每 100 個元素):

  •     以時間為機關的 Time Window,例如:每 1 秒鐘、每 1 個小時等
  •     以資料的數量為機關的 Count Window,例如:每 100 個元素

Flink 給我們提供了一些通用的時間視窗模型:

  1. 翻滾視窗(tumbling window,沒有重疊)
  2. 滑動視窗(sliding window,有重疊)
  3. 會話視窗(session window,中間有一個不活動的間隙)

3、滾動視窗 Tumbling Window

      滾動視窗配置設定器将每個元素配置設定給固定視窗大小的視窗。滾動視窗大小固定的并且不重疊。 例如,如果指定大小為 5 分鐘的滾動視窗,則将執行目前視窗,并且每五分鐘将啟動一個新 視窗,如下圖所示:

Flink之window函數詳解

執行個體:

  1、需求:我們需要統計每一分鐘中使用者購買的商品的總數,需要将使用者的行為事件按每一分鐘進 行切分

Tumbling Time Window      使用 DataStream API

代碼:

package com.qyl.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是員
 * 時間: 2019/05/10
 * 描述:
 * wordcount time window 操作
 */
object Window_Time {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
 /**
 * 每隔 4s 統計一次詞頻
 */
 val resultDataStream: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .timeWindow(Time.seconds(4))
 .sum(1)
 resultDataStream.print()
 streamEnv.execute("WordCount every 10 second")
 }
}
           

 2、需求: 我們想要每 100 個使用者購買行為事件統計購買總數,那麼每當視窗中填滿 100 個元素了, 就會對視窗進行計算

Tumbling Count Window         使用 DataStream API

代碼:

package com.mazh.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是員
 * 時間: 2019/05/10 
 * 描述:
 * Tumbling Count Window
 */
object Window_Count {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
/**
 * 每隔 10 個單詞統計一次詞頻
 */
 val resultDataStream: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .countWindow(10)
 .sum(1)
 resultDataStream.print()
 streamEnv.execute("WordCount every 10 word")
 }
}
           

4、滑動視窗    Sliding Window

       滑動視窗配置設定器将每個元素配置設定給固定視窗大小的視窗。類似于滾動視窗配置設定器,視窗的大 小由視窗大小參數配置。另外一個視窗滑動參數控制滑動視窗的啟動頻率(how frequently a sliding window is started)。是以,如果滑動大小小于視窗大小,滑動窗可以重疊。在這種情 況下,元素被配置設定到多個視窗。例如,你可以使用視窗大小為 10 分鐘的視窗,滑動大小為 5 分鐘。這樣,每 5 分鐘會生成一個視窗,包含最後 10 分鐘内到達的事件,如下圖所示。

Flink之window函數詳解

執行個體:

1、 需求:每 30 秒計算一次最近一分鐘使用者購買的商品總數

Sliding Time Window

package com.qyl.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是員
 * 時間: 2019/5/10 
 * 描述:
 * wordcount time window 操作
 */
object Window_Time {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
 /**
 * 每隔 30s 統計過去 60s 的 wordcount
 */
 val resultDataStream1: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .timeWindow(Time.seconds(30), Time.seconds(60))
 .sum(1)
 resultDataStream1.print()
 streamEnv.execute("WordCount every 10 second")
 }
}
           

2、需求: 每隔 10 個單詞統計過去 20 個單詞

Sliding Count Window

代碼:

package com.mazh.window
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
 * 作者: 不是猿是員
 * 時間: 2019/05/10 
 * 描述:
 * Tumbling Count Window
 */
object Window_Count {
 def main(args: Array[String]): Unit = {
 val tool = ParameterTool.fromArgs(args)
 val port = tool.getInt("port")
 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
 val dataStream: DataStream[String] = streamEnv.socketTextStream("hadoop02",
port)
 /**
 * 每隔 10 個單詞統計過去 20 個單詞的 wordcount
 */
 val resultDataStream1: DataStream[(String, Int)] = dataStream.flatMap(x =>
x.split(" "))
 .map((_, 1))
 .keyBy(0)
 .countWindow(10, 20)
 .sum(1)
 resultDataStream1.print()
 streamEnv.execute("WordCount every 10 word")
 }
}
           

5、會話視窗  Session Window

         會話視窗配置設定器通過活動會話分組元素。與滾動視窗和滑動視窗相比,會話視窗不會重疊, 也沒有固定的開始和結束時間。相反,當會話視窗在一段時間内沒有接收到元素時會關閉, 例如,不活動的間隙時。會話視窗配置設定器配置會話間隙,定義所需的不活動時間長度(defineshow long is the required period of inactivity)。當此時間段到期時,目前會話關閉,後續元素被 配置設定到新的會話視窗。

Flink之window函數詳解

       在這種使用者互動事件流中,我們首先想到的是将事件聚合到會話視窗中(一段使用者持續活躍 的周期),由非活躍的間隙給分隔開。就是需要計算每個使用者在活躍期間總共購買的商品數 量,如果使用者 30 分鐘沒有活動則視為會話斷開(假設 raw data stream 是單個使用者的購買行 為流)。

5、對比

      一般而言,window 是在無限的流上定義了一個有限的元素集合。這個集合可以是基于時間 的,元素個數的,時間和個數結合的,會話間隙的,或者是自定義的。Flink 的 DataStream API 提供了簡潔的算子來滿足常用的視窗操作,同時提供了通用的視窗機制來允許使用者自己 定義視窗配置設定邏輯。

Flink之window函數詳解

6、自定義Window

     得益于 Flink Window API 松耦合設計,我們可以非常靈活地定義符合特定業務的視窗。Flink 中定義一個視窗主要需要以下三個元件。

  1. Window Assigner:用來決定某個元素被配置設定到哪個/哪些視窗中去。
  2. Trigger:觸發器。決定了一個視窗何時能夠被計算或清除,每個視窗都會擁有一個自 己的 Trigger。
  3. Evictor:可以譯為“驅逐者”。在 Trigger 觸發之後,在視窗被處理之前,Evictor(如果 有 Evictor 的話)會用來剔除視窗中不需要的元素,相當于一個 filter。

上述三個元件的不同實作的不同組合,可以定義出非常複雜的視窗。Flink 中内置的視窗也 都是基于這三個元件構成的,當然内置視窗有時候無法解決使用者特殊的需求,是以 Flink 也 暴露了這些視窗機制的内部接口供使用者實作自定義的視窗。

Flink之window函數詳解

繼續閱讀