天天看點

Flink-Watermark

一.簡介

watermark是一種衡量Event Time進展的機制,它是資料本身的一種隐藏屬性。通常基于Event Time的資料,自身都包含一個timestamp.watermark用來處理亂序事件,而正确的處理亂序事件,通常用watermark機制結合window來實作(javascript:void(0))。

流處理從事件産生,到流經source,再到operator,中間是有一個過程和時間,雖然大部分情況下,流到operator的資料都是按照事件産生的時間順序來的,但是也不排除由于網絡、背壓(短時負載高峰導緻系統接收資料的速率遠高于它處理資料的速率)等原因,導緻亂序的産生(out-of-order或者說late element)。

但是對于late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。此時就是watermark發揮作用了,它表示當達到watermark到達之後,在watermark之前的資料已經全部達到(即使後面還有延遲的資料)。

Flink-Watermark

圖檔來源網絡

二.了解

有一個帶時間戳的事件流,但是由于某種原因它們并不是按順序到達的。圖中的數字代表事件發生的時間戳,第一個到達的事件發生的時間4,然後它後面跟着更早時間(2)事件。

Flink-Watermark
了解1

資料流中第一個元素的時間4,但是不能直接把它按照第一個元素輸出,因為資料是亂序到達,也許有更早的資料沒有到達。事實上,我們能預見一些這個流的未來,也就是我們的排序算子至少要等到 2 這條資料的到達再輸出結果。

有緩存,就必然有延遲。

了解2

首先,我們應用程式從看到時間4的資料,然後看到時間2的資料。是否會有比2更早的資料,也許會有,也許不會,可以一直等待下去,可能一直阻塞。

必須在特定的時間間隔下,輸出流。

了解3

watermark 的作用,他們定義了何時不再等待更早的資料。

了解4

不同政策來生成 watermark。

我們知道每個事件都會延遲一段時間才到達,而這些延遲差異會比較大,是以有些事件會比其他事件延遲更多。一種簡單的方法是假設這些延遲不會超過某個最大值。Flink 把這種政策稱作 “有界無序生成政策”(bounded-out-of-orderness)。當然也有很多更複雜的方式去生成 watermark,但是對于大多數應用來說,固定延遲的方式已經足夠了。

如果想要建構一個類似排序的流應用,可以使用 Flink 的 ProcessFunction。它提供了對事件時間計時器(基于 watermark 觸發回調)的通路,還提供了可以用來緩存資料的托管狀态接口。

了解5

使用EventTime來處理資料流更準确,擷取方式:一種是data stream source内部處理,一種是通過timestam assigner/watermark generator。

在source裡頭定義的話,即使用SourceFunction裡頭定義的SourceContext接口的collectWithTimestamp、emitWatermark方法,前者用來assign event timestamp,後者用來emit watermark。

在source外頭定義的話,就是通過DataStream的assignTimestampsAndWatermarks方法,設定timestampAndWatermarkAssigner;它有兩種類型:

AssignerWithPeriodicWatermarks(定義了getCurrentWatermark方法,用于傳回目前的watermark;periodic間隔參數通過env.getConfig().setAutoWatermarkInterval(1000)來設定);

系統會以一個固定的時間值定期檢查event time的進展。

AssignerWithPunctuatedWatermarks(定義了checkAndGetNextWatermark方法,該方法會在extractTimestamp方法執行之後被調用(調用時通過方法參數傳遞剛擷取的extractedTimestamp)

決定是否産生一個新的watermark,不會周期性生産,隻根據event time來更新watermark。

三.示例

流計算,統計wordcount
object DataStreamDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設定時間配置設定器
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //設定并行度
    env.setParallelism(1)
    //每9秒發出一個watermark
    env.getConfig.setAutoWatermarkInterval(9000)
    val line = env.socketTextStream("localhost",9999)
    import org.apache.flink.api.scala._
    val counts  = line
      .filter(f=> !StringUtils.isNullOrWhitespaceOnly(f))
      .map(new LineSplitter)
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Tuple3[String, Long, Integer]](){
        var currentMaxTimestamp = 0L
        //這個控制失序已經延遲的度量
        val maxOutOfOrderness = 10000L
        //擷取Watermark
        override def getCurrentWatermark: Watermark = {
          val tmpTimestamp = currentMaxTimestamp - maxOutOfOrderness
          println(s"wall clock is ${System.currentTimeMillis()}  new watermark ${tmpTimestamp}")
          new Watermark(tmpTimestamp)
        }
        //擷取EventTime
        override def extractTimestamp(element: (String, Long, Integer), previousElementTimestamp: Long): Long = {
          val timestamp  = element._2
          currentMaxTimestamp = Math.max(timestamp,currentMaxTimestamp)
          println(s"get timestamp is $timestamp currentMaxTimestamp $currentMaxTimestamp")
          timestamp
        }
      }).keyBy(0)
      .timeWindow(Time.seconds(20))
      .sum(2)

    counts.print()
    env.execute("WordCount")
  }

}
//構造出element以及它的event time.然後把次數指派為1
class LineSplitter extends MapFunction[String,Tuple3[String, Long, Integer]]{
  override def map(value: String): (String, Long, Integer) = {
    val arrays = value.toLowerCase.split("\\W+")
    new Tuple3[String, Long, Integer](arrays(0), arrays(1).toLong, 1)
  }
}
           

maxOutOfOrderness 這個參數在設定的時候往往根據經驗來。

MaxOutOfOrderness設定的太小,而自身資料發送時由于網絡等原因導緻亂序或者late太多,那麼最終的結果就是會有很多單條的資料在window中被觸發,資料的正确性影響太大。如果設定太大,導緻設定的Watermark太小,使得Watermark沒有用,因為原本在很短的時間内,一個視窗的所有的資料都到達了,但是不得不等Watermark一點點變大, 才能觸發計算。

輸入
aa 1601365080
bb 1601465080
aa 1601365080
aa 1601365080
aa 1601365080
aa 1601466080
bb 1601466080
bb 1601467080
bb 1601468080
ee 1601469080
ee 1601479080
aa 1601489080
cc 1601589080
           
服務端
get timestamp is 1601365080 currentMaxTimestamp 1601365080
wall clock is 1601365690607  new watermark 1601355080
wall clock is 1601365699610  new watermark 1601355080
get timestamp is 1601465080 currentMaxTimestamp 1601465080
wall clock is 1601365708612  new watermark 1601455080
(aa,1601365080,1)
wall clock is 1601365717615  new watermark 1601455080
wall clock is 1601365726616  new watermark 1601455080
wall clock is 1601365735619  new watermark 1601455080
wall clock is 1601365744621  new watermark 1601455080
wall clock is 1601365753624  new watermark 1601455080
wall clock is 1601365762627  new watermark 1601455080
wall clock is 1601365771629  new watermark 1601455080
wall clock is 1601365780631  new watermark 1601455080
wall clock is 1601365789634  new watermark 1601455080
wall clock is 1601365798636  new watermark 1601455080
wall clock is 1601365807638  new watermark 1601455080
wall clock is 1601365816641  new watermark 1601455080
wall clock is 1601365825643  new watermark 1601455080
wall clock is 1601365834645  new watermark 1601455080
wall clock is 1601365843647  new watermark 1601455080
wall clock is 1601365852648  new watermark 1601455080
wall clock is 1601365861650  new watermark 1601455080
get timestamp is 1601365080 currentMaxTimestamp 1601465080
wall clock is 1601365870652  new watermark 1601455080
wall clock is 1601365879654  new watermark 1601455080
wall clock is 1601365888657  new watermark 1601455080
wall clock is 1601365897659  new watermark 1601455080
get timestamp is 1601365080 currentMaxTimestamp 1601465080
get timestamp is 1601365080 currentMaxTimestamp 1601465080
wall clock is 1601365906661  new watermark 1601455080
wall clock is 1601365915662  new watermark 1601455080
wall clock is 1601365924664  new watermark 1601455080
wall clock is 1601365933667  new watermark 1601455080
wall clock is 1601365942669  new watermark 1601455080
wall clock is 1601365951671  new watermark 1601455080
wall clock is 1601365960674  new watermark 1601455080
get timestamp is 1601466080 currentMaxTimestamp 1601466080
wall clock is 1601365969676  new watermark 1601456080
wall clock is 1601365978678  new watermark 1601456080
wall clock is 1601365987679  new watermark 1601456080
wall clock is 1601365996682  new watermark 1601456080
wall clock is 1601366005685  new watermark 1601456080
get timestamp is 1601466080 currentMaxTimestamp 1601466080
wall clock is 1601366014687  new watermark 1601456080
get timestamp is 1601467080 currentMaxTimestamp 1601467080
wall clock is 1601366023690  new watermark 1601457080
wall clock is 1601366032692  new watermark 1601457080
wall clock is 1601366041693  new watermark 1601457080
get timestamp is 1601468080 currentMaxTimestamp 1601468080
wall clock is 1601366050695  new watermark 1601458080
get timestamp is 1601469080 currentMaxTimestamp 1601469080
wall clock is 1601366059696  new watermark 1601459080
wall clock is 1601366068698  new watermark 1601459080
wall clock is 1601366077701  new watermark 1601459080
wall clock is 1601366086703  new watermark 1601459080
wall clock is 1601366095704  new watermark 1601459080
wall clock is 1601366104707  new watermark 1601459080
wall clock is 1601366113710  new watermark 1601459080
wall clock is 1601366122712  new watermark 1601459080
wall clock is 1601366131714  new watermark 1601459080
wall clock is 1601366140717  new watermark 1601459080
wall clock is 1601366149720  new watermark 1601459080
wall clock is 1601366158722  new watermark 1601459080
wall clock is 1601366167724  new watermark 1601459080
get timestamp is 1601479080 currentMaxTimestamp 1601479080
wall clock is 1601366176727  new watermark 1601469080
wall clock is 1601366185729  new watermark 1601469080
wall clock is 1601366194732  new watermark 1601469080
get timestamp is 1601489080 currentMaxTimestamp 1601489080
wall clock is 1601366203733  new watermark 1601479080
wall clock is 1601366212735  new watermark 1601479080
wall clock is 1601366221736  new watermark 1601479080
wall clock is 1601366230739  new watermark 1601479080
wall clock is 1601366239741  new watermark 1601479080
wall clock is 1601366248744  new watermark 1601479080
wall clock is 1601366257746  new watermark 1601479080
wall clock is 1601366266749  new watermark 1601479080
wall clock is 1601366275750  new watermark 1601479080
wall clock is 1601366284752  new watermark 1601479080
wall clock is 1601366293754  new watermark 1601479080
wall clock is 1601366302755  new watermark 1601479080
wall clock is 1601366311757  new watermark 1601479080
wall clock is 1601366320759  new watermark 1601479080
wall clock is 1601366329760  new watermark 1601479080
wall clock is 1601366338763  new watermark 1601479080
wall clock is 1601366347765  new watermark 1601479080
get timestamp is 1601589080 currentMaxTimestamp 1601589080
wall clock is 1601366356768  new watermark 1601579080
(bb,1601465080,4)
(aa,1601466080,1)
(ee,1601469080,2)
(aa,1601489080,1)
wall clock is 1601366365771  new watermark 1601579080
wall clock is 1601366374773  new watermark 1601579080
           

當監聽9999端口,開始輸入aa 1601365080 ,

(get timestamp is 1601365080 currentMaxTimestamp 1601365080),每9秒輸出(wall clock is 1601365690607 new watermark 1601355080)

接着輸入 bb 1601465080 ,此時watermark為1601455080,此時watermark超過了aa所在視窗的endtime (1601365080 + 20s),那麼會觸發計算,進而會有((aa,1601365080,1))統計輸出,觸發計算時間點:

  • watermark超過了window的endtime。
  • 在該window中有資料。

輸入後續:

aa 1601365080
aa 1601365080
aa 1601365080
aa 1601466080
bb 1601466080
bb 1601467080
bb 1601468080
ee 1601469080
ee 1601479080
aa 1601489080
cc 1601589080
           

最後一個 cc輸入,改變watermark,使得目前時間(1601589080),觸發計算,有很多時間戳(1601365080)沒有被統計,因為之前的視窗已經計算完相同時間的,它會被丢棄掉,原視窗中的内容不會立即被删除,而是會再次等待一段時間,即watermark小于end-time + allowedLateness時,後續的該視窗的資料到達時會納入到原視窗,再次觸發計算。而watermark >= end_time + allowedLateness,後續的還有屬于該視窗的資料到達時,那麼這種資料隻能被删除了,因為系統不會無限制的等下去,這既會增加window buffer的大小,也會引起不必要的性能下降。

參考

http://wuchong.me/blog/2018/11/18/flink-tips-watermarks-in-apache-flink-made-easy/