天天看點

Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API

一、視窗概述

流式計算是一種被設計用于處理無限資料集的資料處理引擎,而無限資料集是指一種不斷增長的本質上無限的資料集,而Window視窗是一種切割無限資料為有限塊進行處理的手段。

Window是無限資料流處理的核心,Window将一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。

Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API

二、視窗類型

Window可以分成兩類:

1、時間視窗(TimeWindow):按照時間生成Window,根據視窗實作原理可以分成三類:

  1. 滾動視窗(Tumbling Window)

    将資料依據固定的視窗長度對資料進行切片。滾動視窗配置設定器将每個元素配置設定到一個指 定視窗大小的視窗中,滾動視窗有一個固定的大小,并且不會出現重疊。

    Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API
    适用場景:适合做BI統計等(做每個時間段的聚合計算)
env.setParallelism(1)
val fileDS = env.socketTextStream("localhost", 9999)
val wordDS = fileDS.flatMap(_.split(" "))
val word2OneDS = wordDS.map((_,1))
val wordKS = word2OneDS.keyBy(_._1)
val wordWS = wordKS.timeWindow(Time.seconds(3))
val sumDS = wordWS.sum(1)
sumDS.print("window>>>")
env.execute()
           
  1. 滑動視窗(Sliding Window)

    滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組 成。滑動視窗配置設定器将元素配置設定到固定長度的視窗中,與滾動視窗類似,視窗的大小由 視窗大小參數來配置,另一個視窗滑動參數控制滑動視窗開始的頻率。是以,滑動視窗 如果滑動參數小于視窗大小的話,視窗是可以重疊的,在這種情況下元素會被配置設定到多 個視窗中。

    Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API
    适用場景:對最近一個時間段内的統計, 比如求最近1小時内每5分鐘的水位變化
env.setParallelism(1)
val fileDS = env.socketTextStream("localhost", 9999)
val wordDS = fileDS.flatMap(_.split(" "))
val word2OneDS = wordDS.map((_,1))
val wordKS = word2OneDS.keyBy(_._1)
val wordWS = wordKS.timeWindow(Time.seconds(3), Time.seconds(1))
val sumDS = wordWS.sum(1)
sumDS.print("window>>>")
env.execute()
           
  1. 會話視窗(Session Window)

    由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應用的session, 也就是一段時間沒有接收到新資料就會生成新的視窗。

    Flink 之Window介紹與使用一、視窗概述二、視窗類型三、視窗使用API
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val fileDS = env.socketTextStream("localhost", 9999)
val wordDS = fileDS.flatMap(_.split(" "))
val word2OneDS = wordDS.map((_,1))
val wordKS = word2OneDS.keyBy(_._1)
val wordWS = wordKS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
val sumDS = wordWS.sum(1)
sumDS.print("window>>>")
env.execute()
           

2、計數視窗(CountWindow):按照指定的資料條數生成一個Window,與時間無關。根據視窗實作原理可以分成兩類:

  1. 滾動視窗

    預設的CountWindow是一個滾動視窗,隻需要指定視窗大小即可,當元素數量達到窗 口大小時,就會觸發視窗的執行。

val dataDS: DataStream[String] = env.socketTextStream("hadoop01", 9999)

val mapDS = dataDS.map(data=>{
    val datas = data.split(",")
    (datas(0),1)
})

val reduceDS = mapDS.keyBy(_._1)
    .countWindow(3).reduce(
    (t1, t2) => {
        (t1._1, t1._2 + t2._2)
    }
)

reduceDS.print()
           
  1. 滑動視窗

    滑動視窗和滾動視窗的函數名是完全一緻的,隻是在傳參數時需要傳入兩個參數,一個 是window_size,一個是sliding_size。下面代碼中的sliding_size設定為了2,也就是說, 每收到兩個相同key的資料就計算一次,每一次計算的window範圍是3個元素。

val dataDS = env.socketTextStream("hadoop02", 9999)

val mapDS = dataDS.map(data=>{
    val datas = data.split(",")
    (datas(0),datas(2).toInt)
})

val reduceDS = mapDS.keyBy(_._1)
    .countWindow(3,2).reduce(
    (t1, t2) => {
        (t1._1, t1._2 + t2._2)
    }
)

reduceDS.print()
           

三、視窗使用API

window function 定義了要對視窗中收集資料後所做的計算操作,主要可以分為兩類:

1、增量聚合函數(incremental aggregation functions)

每條資料到來就進行計算,保持一個簡單的狀态。典型的增量聚合函數有:

1)ReduceFunction

val dataDS: DataStream[String] =
 env.socketTextStream("hadoop02", 9999)

val mapDS = dataDS.map(data=>{
    val datas = data.split(",")
    (datas(0),datas(2).toInt)
})

val reduceDS: DataStream[(String, Int)] = mapDS.keyBy(_._1)
    .timeWindow(Time.seconds(3)).reduce(
        new ReduceFunction[(String, Int)] {
            override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) = {
                (t._1, t._2 + t1._2)
            }
        }
    )
reduceDS.print()
           

2)AggregateFunction

val dataDS: DataStream[String] =
 env.socketTextStream("hadoop02", 9999)

val mapDS = dataDS.map(data=>{
    val datas = data.split(",")
    (datas(0),datas(2).toInt)
})

val aggregateDS: DataStream[(String, Int)] = mapDS.keyBy(_._1)
    .countWindow(3).aggregate(
        // TODO 此處聚合函數類似于Spark中的累加器
        new AggregateFunction[(String, Int), (Int, Int), (String, Int)] {
            override def createAccumulator(): (Int, Int) = {
                (0,0)
            }

            override def add(in: (String, Int), acc: (Int, Int)): (Int, Int) = {
                (in._2 + acc._1, acc._2 + 1)
            }

            override def getResult(acc: (Int, Int)): (String, Int) = {
                ("sensor", (acc._1 / acc._2))
            }

            override def merge(acc: (Int, Int), acc1: (Int, Int)): (Int, Int) = {
                (acc._1 + acc1._1, acc._2 + acc1._2)
            }
        }
    )

aggregateDS.print()
           

2、全視窗函數(full window functions)

先把視窗所有資料收集起來,等到計算的時候會周遊所有資料。

ProcessWindowFunction

就是一個對整個視窗中資料處理的函數。

val dataDS: DataStream[String] =
 env.socketTextStream("hadoop02", 9999)

val mapDS = dataDS.map(data=>{
    val datas = data.split(",")
    (datas(0),datas(2).toInt)
})

val processDS: DataStream[String] = mapDS.keyBy(_._1)
    .countWindow(3)
    .process(new ProcessWindowFunction[(String, Int), String, String, GlobalWindow] {
        override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[String]): Unit = {
            //println(elements.mkString(","))
            out.collect(elements.mkString(","))
        }
    })
processDS.print()
           

3、其它可選API

  • .trigger() —— 觸發器:定義 window 什麼時候關閉,觸發計算并輸出結果
  • .evitor() —— 移除器:定義移除某些資料的邏輯
  • .allowedLateness() —— 允許處理遲到的資料
  • .sideOutputLateData()—— 将遲到的資料放入側輸出流
  • .getSideOutput() —— 擷取側輸出流

繼續閱讀