天天看點

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解

一、Watermark簡介

    Watermark是一種衡量Event Time進展的機制,它是資料本身的一個隐藏屬性。通常基于Event Time的資料,自身都包含一個timestamp.watermark是用于處理亂序事件的,而正确的處理亂序事件,通常用watermark機制結合window來實作。簡單來說,我們可以把他了解為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的

    在實際的生産中,由于業務系統背壓或網絡延遲導緻事件的建立時間和處理時間不一緻,導緻流處理的結果跟實際結果有較大的差異。但是對于延遲資料,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特别的機制,就是watermark。其作用定義一個最大亂序時間,舉個例子,比如某條消息記錄時間為2020-04-06 10:00:10,如果亂序最大允許時間為10s,那麼就認為2020-04-06 10:00:00之前産生的所有消息記錄都到齊了,可以進行計算。

二、Watermark生成方式

    一般情況下,Flink在接收到Source資料後,應該立即生成Watermark,但是我們可以在經過了簡單的operator後再生成Watermark,需要注意的是,如果多次生成了Watermark,後面的會覆寫前面的;生成watermark的方式主要有2大類:

1).Periodic - 一定時間間隔或者達到一定的記錄條數會産生一個watermark,通常這種用的比較多。

2).Punctuated – 基于event time通過一定的邏輯産生watermark,比如收到一個資料就産生一個WaterMark,時間是event time + 10秒。

    以上兩種産生方式,都有機制來保證産生的watermark是單調遞增的。即使有了watermark,如果現實中,資料沒有滿足watermark所保證的條件怎麼辦?比如Flink處理了08:01的watermark,但是之後遇到了event time是07:00~08:00之間的資料怎麼辦?首先如果這種事情出現的機率非常小,不影響所要求的準确度,可以直接把資料丢棄;如果這種事情出現的機率比較大,就要調整産生water mark的機制了。

    除了把違反watermark機制的資料丢棄,也有不丢棄的處理方法,比如通過一些機制來更新之前統計的結果,這種方式會有一定的性能開銷。

三、Watermark代碼執行個體

    下面我們通過一個執行個體來示範下Periodic Watermark,我們從socket接收資料,然後講過簡單的operator之後,立刻抽取timetamp并生成Watermark,之後應用window來看看watermark和event time如何變化,才導緻window被觸發計算的。

    先把主函數代碼貼一下,仔細看代碼中的注釋:

package com.hadoop.ljs.flink110.window;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;import org.apache.flink.streaming.api.watermark.Watermark;import org.apache.flink.streaming.api.windowing.time.Time;import javax.annotation.Nullable;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-06 15:54 * @version: v1.0 * @description: com.hadoop.ljs.flink110.window * 在實作滾動視窗 設定EventTime為時間處理标準,統計每個視窗單詞出現次數 視窗時間是30秒,消息的最大延遲時間是5秒 */public class TumblingWindowWatermarkWordCount {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*設定使用EventTime作為Flink的時間處理标準,不指定預設是ProcessTime*/        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        //這裡為了便于了解,設定并行度為1,預設并行度是目前機器的cpu數量        senv.setParallelism(1);        /*指定資料源 從socket的9000端口接收資料,先進行了不合法資料的過濾*/        DataStream sourceDS = senv.socketTextStream("localhost", 9000)            .filter(new FilterFunction() {                @Override                public boolean filter(String line) throws Exception {                    if(null==line||"".equals(line)) {                        return false;                    }                    String[] lines = line.split(",");                    if(lines.length!=2){                        return false;                    }                    return true;                }            });        /*做了一個簡單的map轉換,将資料轉換成Tuple2格式,第一個字段代表是時間 第二個字段代表的是單詞,第三個字段固定值出現了1次*/        DataStream> wordDS = sourceDS.map(new MapFunction>() {            @Override            public Tuple3map(String line) throws Exception {                String[] lines = line.split(",");                return new Tuple3(Long.valueOf(lines[0]), lines[1],1);            }        });        /*設定Watermark的生成方式為Periodic Watermark,并實作他的兩個函數getCurrentWatermark和extractTimestamp*/        DataStream> wordCount = wordDS.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {            private Long currentMaxTimestamp = 0L;            /*最大允許的消息延遲是5秒*/            private final Long maxOutOfOrderness = 5000L;            @Nullable            @Override            public Watermark getCurrentWatermark() {                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);            }            @Override            public long extractTimestamp(Tuple3 element, long previousElementTimestamp) {                long timestamp = element.f0;                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);                return timestamp;            }        /*這裡根據第二個元素  單詞進行統計 時間視窗是30秒  最大延時是5秒,統計每個視窗單詞出現的次數*/        }).keyBy(1)        /*時間視窗是30秒*/        .timeWindow(Time.seconds(30))        .sum(2);        wordCount.print("\n單詞統計:");        senv.execute("Window WordCount");    }}
           

下面我簡單示範下計算步驟:

  1.這裡時間視窗是30秒,延遲時間是5秒,我這裡先後輸入一下資料,第一個字段代表是EventTime,第二個是出現的單詞,

10000,a20000,b29999,c30000,a34998,a
           

   這裡時間視窗是30秒,Flink的時間視窗是左閉右開的[0,30000),如果這裡我們設定消息延遲是0秒,輸入29999,c 就應該觸發,視窗計算, 可是這裡我們設計的最大延遲是5秒,那什麼時候觸發第一次視窗計算呢?應該是29999+5000=34999,如下圖所示,輸入34998,a沒有觸發計算,跟我們的推測是一緻的,如下圖所示:

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解

2.接下來我們繼續輸入,34999,b應該會第一次觸發視窗計算進行資料的輸出,至此我們應該輸入了6行資料,那輸出的計算結果是什麼呢?這裡應該是輸出[0,30000)時間段之間的資料,如下圖所示:

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解

3.接下來我們還要驗證兩件事,一是第二次視窗觸發時間,按照這樣計算,那下次觸發計算的的時間應該是30000+29999+5000=64999,另一個是:第一次視窗觸發計算完成後,又來了一條25000,a資料,該如何處理呢,由于觸發了計算之後watermark應該更新成了30000,比他小的資料會被丢棄,我們驗證下:

    輸入了25000,a和64998,c沒有觸發計算,如圖所示:

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解

    繼續輸入64999,b,觸發第二次視窗計算,在[3000,59999]視窗,截圖寫錯了,應該都是閉區間,這裡最30000,a和34998,a兩次出現a進行了單詞求和結果為2,對于b隻在34999,b出現了一次,是以求和為1,而在觸發第一次視窗計算之後,出現了資料25000,a,這裡由于超過了最大的延遲5秒,Flink直接丢棄資料不做處理,如圖所示:

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解

    至此,Watermark機制相關知識講解完畢,大家可通過我的代碼執行個體,自己去示範一遍,不然不太好了解。感謝關注!!

   如果覺得我的文章能幫到您,請關注微信公衆号“大資料開發運維架構”,并轉發朋友圈,謝謝支援!!!

無限滾動執行個體_Flink1.10入門:Watermark機制及執行個體講解