一、視窗概述
流式計算是一種被設計用于處理無限資料集的資料處理引擎,而無限資料集是指一種不斷增長的本質上無限的資料集,而Window視窗是一種切割無限資料為有限塊進行處理的手段。
Window是無限資料流處理的核心,Window将一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNyZuBnL1ETN4ITMwkTMwITOwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
二、視窗類型
Window可以分成兩類:
1、時間視窗(TimeWindow):按照時間生成Window,根據視窗實作原理可以分成三類:
-
滾動視窗(Tumbling Window)
将資料依據固定的視窗長度對資料進行切片。滾動視窗配置設定器将每個元素配置設定到一個指 定視窗大小的視窗中,滾動視窗有一個固定的大小,并且不會出現重疊。
适用場景:适合做BI統計等(做每個時間段的聚合計算)Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API
env.setParallelism(1)
val fileDS = env.socketTextStream("localhost", 9999)
val wordDS = fileDS.flatMap(_.split(" "))
val word2OneDS = wordDS.map((_,1))
val wordKS = word2OneDS.keyBy(_._1)
val wordWS = wordKS.timeWindow(Time.seconds(3))
val sumDS = wordWS.sum(1)
sumDS.print("window>>>")
env.execute()
-
滑動視窗(Sliding Window)
滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組 成。滑動視窗配置設定器将元素配置設定到固定長度的視窗中,與滾動視窗類似,視窗的大小由 視窗大小參數來配置,另一個視窗滑動參數控制滑動視窗開始的頻率。是以,滑動視窗 如果滑動參數小于視窗大小的話,視窗是可以重疊的,在這種情況下元素會被配置設定到多 個視窗中。
适用場景:對最近一個時間段内的統計, 比如求最近1小時内每5分鐘的水位變化Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API
env.setParallelism(1)
val fileDS = env.socketTextStream("localhost", 9999)
val wordDS = fileDS.flatMap(_.split(" "))
val word2OneDS = wordDS.map((_,1))
val wordKS = word2OneDS.keyBy(_._1)
val wordWS = wordKS.timeWindow(Time.seconds(3), Time.seconds(1))
val sumDS = wordWS.sum(1)
sumDS.print("window>>>")
env.execute()
-
會話視窗(Session Window)
由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應用的session, 也就是一段時間沒有接收到新資料就會生成新的視窗。
Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val fileDS = env.socketTextStream("localhost", 9999)
val wordDS = fileDS.flatMap(_.split(" "))
val word2OneDS = wordDS.map((_,1))
val wordKS = word2OneDS.keyBy(_._1)
val wordWS = wordKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
val sumDS = wordWS.sum(1)
sumDS.print("window>>>")
env.execute()
2、計數視窗(CountWindow):按照指定的資料條數生成一個Window,與時間無關。根據視窗實作原理可以分成兩類:
-
滾動視窗
預設的CountWindow是一個滾動視窗,隻需要指定視窗大小即可,當元素數量達到窗 口大小時,就會觸發視窗的執行。
val dataDS: DataStream[String] = env.socketTextStream("hadoop01", 9999)
val mapDS = dataDS.map(data=>{
val datas = data.split(",")
(datas(0),1)
})
val reduceDS = mapDS.keyBy(_._1)
.countWindow(3).reduce(
(t1, t2) => {
(t1._1, t1._2 + t2._2)
}
)
reduceDS.print()
-
滑動視窗
滑動視窗和滾動視窗的函數名是完全一緻的,隻是在傳參數時需要傳入兩個參數,一個 是window_size,一個是sliding_size。下面代碼中的sliding_size設定為了2,也就是說, 每收到兩個相同key的資料就計算一次,每一次計算的window範圍是3個元素。
val dataDS = env.socketTextStream("hadoop02", 9999)
val mapDS = dataDS.map(data=>{
val datas = data.split(",")
(datas(0),datas(2).toInt)
})
val reduceDS = mapDS.keyBy(_._1)
.countWindow(3,2).reduce(
(t1, t2) => {
(t1._1, t1._2 + t2._2)
}
)
reduceDS.print()
三、視窗使用API
window function 定義了要對視窗中收集資料後所做的計算操作,主要可以分為兩類:
1、增量聚合函數(incremental aggregation functions)
每條資料到來就進行計算,保持一個簡單的狀态。典型的增量聚合函數有:
1)ReduceFunction
val dataDS: DataStream[String] =
env.socketTextStream("hadoop02", 9999)
val mapDS = dataDS.map(data=>{
val datas = data.split(",")
(datas(0),datas(2).toInt)
})
val reduceDS: DataStream[(String, Int)] = mapDS.keyBy(_._1)
.timeWindow(Time.seconds(3)).reduce(
new ReduceFunction[(String, Int)] {
override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
(t._1, t._2 + t1._2)
}
}
)
reduceDS.print()
2)AggregateFunction
val dataDS: DataStream[String] =
env.socketTextStream("hadoop02", 9999)
val mapDS = dataDS.map(data=>{
val datas = data.split(",")
(datas(0),datas(2).toInt)
})
val aggregateDS: DataStream[(String, Int)] = mapDS.keyBy(_._1)
.countWindow(3).aggregate(
// TODO 此處聚合函數類似于Spark中的累加器
new AggregateFunction[(String, Int), (Int, Int), (String, Int)] {
override def createAccumulator(): (Int, Int) = {
(0,0)
}
override def add(in: (String, Int), acc: (Int, Int)): (Int, Int) = {
(in._2 + acc._1, acc._2 + 1)
}
override def getResult(acc: (Int, Int)): (String, Int) = {
("sensor", (acc._1 / acc._2))
}
override def merge(acc: (Int, Int), acc1: (Int, Int)): (Int, Int) = {
(acc._1 + acc1._1, acc._2 + acc1._2)
}
}
)
aggregateDS.print()
2、全視窗函數(full window functions)
先把視窗所有資料收集起來,等到計算的時候會周遊所有資料。
ProcessWindowFunction
就是一個對整個視窗中資料處理的函數。
val dataDS: DataStream[String] =
env.socketTextStream("hadoop02", 9999)
val mapDS = dataDS.map(data=>{
val datas = data.split(",")
(datas(0),datas(2).toInt)
})
val processDS: DataStream[String] = mapDS.keyBy(_._1)
.countWindow(3)
.process(new ProcessWindowFunction[(String, Int), String, String, GlobalWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
//println(elements.mkString(","))
out.collect(elements.mkString(","))
}
})
processDS.print()
3、其它可選API
- .trigger() —— 觸發器:定義 window 什麼時候關閉,觸發計算并輸出結果
- .evitor() —— 移除器:定義移除某些資料的邏輯
- .allowedLateness() —— 允許處理遲到的資料
- .sideOutputLateData()—— 将遲到的資料放入側輸出流
- .getSideOutput() —— 擷取側輸出流