天天看點

Flink——WaterMark

WaterMark(水位線):

  對由于網絡、分布式等問題造成的亂序資料,事件不是嚴格地按照事件的 EventTime 順序排列。一旦出現亂序,隻根據 EventTime 決定 window 運作,不能明確定證資料是否到位。WaterMark 就是保證一個特定的事件後,必須出發 window 進行計算。

  可以将 WaterMark 了解成一個延時觸發機制,設定 WaterMark 的延時時長為 t,每次系統都會校驗已經到達的資料中最大的 maxEventTime, 然後認定 EventTime 小于 maxEventTime - t 的所有資料到達,如果有視窗的停止時間等于 maxEventTime - t ,那麼視窗觸發執行。

WaterMark的引入:

// 引入時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//配置 WaterMark
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SenorReading>(Time.seconds(1)) {
    @Override
    public long extractTimestamp(SenorReading sensor) {
        return sensor.getTimestamp();
    }
});      

在1.12版本後,建議使用 WatermarkStrategy

dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReaing>forBoundedOutOfOrderness(Duration.ofSeconds(1)));      

除了上述配置設定時間戳,Flink 還可以通過實作 TimestampAssigner 接口自定義地從事件中抽取時間戳。

  TimestampAssigner 有兩個子接口:

    AssignerWithPeriodicWaterMarks:顧名思義,周期性地生成 WaterMark(系統會周期性地将watermark插入到流中),預設周期200毫秒。Flink 會定期調用 AssignerWithPeriodicWaterMarks 的 getCurrentTimestamp 方法,隻有方法傳回一個大于之前水位的時間戳,新的 watermark 就會插入流中。保證 水位線 單調遞增。

// 設定 每隔5s 生成一個 WaterMark
env.getConfig().setAutoWatermarkInterval(5000);      
1     public static class MyPunctuatedAssigner implements AssignerWithPunctuatedWatermarks<SensorReaing> {
 2 
 3         private static final Long bound = 60 * 1000L;
 4 
 5         @org.jetbrains.annotations.Nullable
 6         @Override
 7         public Watermark checkAndGetNextWatermark(SensorReaing sensorReaing, long l) {
 8             if (sensorReaing.getId().equals("sensor_1")){
 9                 return new Watermark(l - bound);
10             }
11             return null;
12         }
13 
14         @Override
15         public long extractTimestamp(SensorReaing sensorReaing, long l) {
16             return sensorReaing.getTimestamp() * 1000L;
17         }
18     }