視窗
在Flink中資料是從開始一直流動的,隻有開始沒有結束,視窗就是一些資料的集合,根據視窗的劃分方式可以按照時間片段來劃分某一段時間内的資料劃分為一個視窗,也可以按照資料條數的個數來劃分,一定量的資料為一個視窗。對視窗的資料的研究有利于我們分析總結資料流。這裡的視窗如果是按照時間來劃分就比較像Spark Streaming中的一個微批的資料。
視窗的類别
- 滑動視窗
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9UEWjZmVYF2c5cVWwh2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwATNyIDNzEjM3AzNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
上圖中window size就是視窗大到小,window slide就是滑動步長,紅色、藍色、綠色、紫色的框分别代表四個視窗
按照時間劃分視窗,劃分出來的視窗就是一個前閉後開的視窗區間,假如視窗大小為15分鐘滑動步長為5分鐘,視窗為[10:00,10:15)[10:05,10:20)[10:10,10:25)
視窗按照處理時間來看可以劃分為處理時間和事件時間此處以處理時間為例的代碼
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); Time slide = Time.seconds(5); // 對events進行開窗 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(SlidingProcessingTimeWindows.of(windowSize, slide)) .sum("times") .map(event -> String.format("使用者%s在10秒内的點選次數為%d", event.getUserId(), event.getTimes())) .print("使用者點選次數統計"); env.execute(); |
滑動視窗以上是按照時間處理時間來劃分的視窗,也可以按照個數來劃分。
假如按照元素個數來劃分,視窗大小為10個,滑動步長為5個那麼
就是這樣一個狀态,按照元素個數來劃分視窗大小
- 滾動視窗
滾動視窗和滑動視窗類似也是按照時間(包括事件時間和處理時間)來劃分,隻不過就是滾動視窗沒有步長就是一個挨着一個的視窗,視窗之間沒有重疊。
假如視窗大小為15分鐘那麼劃分出來的視窗就是[10:00,10:15)[10:15,10:30)
按照處理時間舉例API操作為
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 對events進行開窗 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)) .sum("times") .map(event -> String.format("使用者%s在10秒内的點選次數為%d", event.getUserId(), event.getTimes())) .print("使用者點選次數統計"); env.execute(); |
滾動視窗也是可以按照視窗個數來劃分視窗的,加入視窗大小為5,那麼就是下圖
- 會話視窗
會話視窗,也是按照時間來劃分的但是沒有固定的開始也沒有固定的結束,視窗的開啟和結束完全和資料有關系。
會話視窗中有一個東西叫會話視窗的間隔gap就是決定視窗開啟和結束的關鍵。
假如使用者有這樣一系列的行為且會話視窗的時間間隔為10s
{"event_type":"sf","times":1,"ts":1622887747000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887751000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887758000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887769000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887780000,"user_id":"10033"} |
同樣的一個使用者,浏覽事件前三個事件會落在第一個視窗,第四個事件第二個視窗,第五個事件第三個視窗。劃分依據就是時間間隔,前三個事件的時間間隔都在10s以内,是以視窗不會關閉,前三個一個視窗,第四個事件的時間對于第三個來說已經間隔超過10s是以此時第一個視窗已經關上了,第四個事件隻能再開一個視窗,第五個同理。
操作API
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time gapSize = Time.seconds(10); // 固定時間間隔 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(ProcessingTimeSessionWindows.withGap(gapSize)) .sum("times") .map(event -> String.format("使用者%s在會話期間的點選次數為%d", event.getUserId(), event.getTimes())) .print("固定時間間隔使用者會話期間點選次數統計"); env.execute(); // 不固定時間間隔 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<UserBehaviorEvent>() { @Override public long extract(UserBehaviorEvent element) { String userId = element.getUserId(); // 假設使用者id結尾是偶數說明是A類使用者這樣的使用者在本站的會話時間就給他長一點20s,其他使用者為B類使用者普通使用者會話時長是10s if (Integer.parseInt(userId) % 2 == 0) { return Time.seconds(20).toMilliseconds(); } else { return Time.seconds(10).toMilliseconds(); } } })) .sum("times") .map(event -> String.format("使用者%s在會話期間的點選次數為%d", event.getUserId(), event.getTimes())) .print("不固定時間間隔使用者會話期間點選次數統計"); env.execute(); |
這是是固定好了gap時間間隔,當然也可以不固定時間間隔。
- 全局視窗
全局視窗隻有開始沒有結束,想要中間打出點結果來得定義觸發器。
假如統計一個使用者累計的點選次數從任務開始以來,每間隔10次播報一次
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // Global全局視窗 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(GlobalWindows.create()) // 定義觸發器每滿5次就播報一次使用者的累計點選 .trigger(CountTrigger.of(5)) .sum("times") .map(event -> String.format("使用者%s累計點選次數為%d", event.getUserId(), event.getTimes())) .print("累計點選次數統計"); env.execute(); |
-
-
- 視窗函數
-
視窗函數,開窗之後是對視窗内的資料做統計分析和處理的,這種統計分析處理是由多種視窗函數來完成的。
- ReduceFunction(增量聚合)
增量聚合,來一條就累加到結果裡,函數輸入和輸出是一樣的類型。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 對events進行開窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> windowedStream = events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)); // reduce累加 windowedStream.reduce(new ReduceFunction<UserBehaviorEvent>() { @Override public UserBehaviorEvent reduce(UserBehaviorEvent value1, UserBehaviorEvent value2) throws Exception { value1.setTimes(value1.getTimes() + value2.getTimes()); return value1; } }) .map(event -> String.format("使用者%s在10s内的點選次數為%d", event.getUserId(), event.getTimes())) .print("使用者點選次數"); env.execute(); |
- AggregateFunction(增量聚合)
靈活度會比reduce高一些,可以自己定義輸出類型并且累加過程也可以自己定義。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 對events進行開窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> windowedStream = events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)); // reduce累加 windowedStream.aggregate(new AggregateFunction<UserBehaviorEvent, Tuple2<String, Long>, String>() { // 初始化累加 @Override public Tuple2<String, Long> createAccumulator() { Tuple2<String, Long> tuple2 = new Tuple2<>(); tuple2.f1 = 0L; return tuple2; } // 累計的方式 @Override public Tuple2<String, Long> add(UserBehaviorEvent value, Tuple2<String, Long> accumulator) { if (Objects.isNull(accumulator.f0)) { accumulator.f0 = value.getUserId(); } accumulator.f1 += value.getTimes(); return accumulator; } @Override public String getResult(Tuple2<String, Long> accumulator) { return String.format("使用者%s在10s内的點選次數為%d", accumulator.f0, accumulator.f1); } @Override public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) { return new Tuple2<>(a.f0, a.f1 + b.f1); } }).print("使用者點選次數"); env.execute(); |
- ProcessWindowFunction(全視窗資料處理)
自由度就更高了,就是簡單粗暴的把這一個視窗裡面收集到的元素都給你,自己去處理想累加或怎麼辦都可以。
加入我們使用Process來實作一個點選次數統計。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 對events進行開窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> windowedStream = events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)); // reduce累加 泛型分别為 輸入類型,輸出類型,key的類型,window類型 windowedStream.process(new ProcessWindowFunction<UserBehaviorEvent, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { // 所有視窗裡面的元素都被收集到了elements疊代器裡面 LongSummaryStatistics collect = ((List<UserBehaviorEvent>) IteratorUtils.toList(elements.iterator())) .stream().collect(Collectors.summarizingLong(UserBehaviorEvent::getTimes)); Long count = collect.getSum(); out.collect(String.format("使用者%s在10s内的點選次數為%d", key, count)); } }).print("使用者點選次數"); env.execute(); |
-
-
- Keyed Window和Non-Keyed Window
-
在做視窗處理之前需要确認是keyedStream還是沒有keyedStream如果進行了key分組後續的task是可以多個并行處理的key相同的都會分到一個task中進行處理。如果沒有設定key那麼所有的資料都會進入一個task中處理,那麼這個task的并行度也隻能是1,無論設定它有多少個并行度。
-
- 時間
在Flink裡面時間有多種時間,處理時間ProcessTime事件時間EventTime還有接收時間Ingestion Time。
時間描述了事件發生的時刻,如果是被記錄下來的描述像EventTime那就有可能亂序,如果是ProcessTime就不會亂序因為處理的時刻即為時間描述這個實體世界的時間順序是不會亂的。
- 處理時間
當流程式按處理時間運作時,所有基于時間的操作(如時間視窗)都将使用運作相應算子的機器的系統時鐘。每小時處理時間視窗将包括在系統時鐘訓示整小時之間到達特定操作員的所有記錄。例如,如果應用程式在上午 9:15 開始運作,則第一個每小時處理時間視窗将包括上午 9:15 至上午 10:00 之間處理的事件,下一個視窗将包括上午 10:00 至上午 11:00 之間處理的事件,依此類推上。
處理時間是最簡單的時間概念,不需要流和機器之間的協調。它提供最佳性能和最低延遲。然而,在分布式和異步環境中,處理時間不提供确定性,因為它容易受到記錄到達系統(例如來自消息隊列)的速度,以及記錄在系統内部操作員之間流動的速度的影響,以及中斷(計劃的或其他方式)。
- 事件時間
事件時間是每個單獨事件在其産生裝置上發生的時間。這個時間通常在記錄進入 Flink 之前嵌入在記錄中,并且可以從每條記錄中提取該事件時間戳。在事件時間中,時間的進度取決于資料,而不是任何挂鐘。事件時間程式必須指定如何生成事件時間水印,這是在事件時間中表示進度的機制。這個水印機制在後面的章節所描述的, 下面。
在一個完美的世界中,事件時間處理将産生完全一緻和确定性的結果,無論事件何時到達或它們的順序如何。但是,除非已知事件按順序到達(按時間戳),否則事件時間處理在等待亂序事件時會産生一些延遲。由于隻能等待一段有限的時間,這限制了事件時間應用程式的确定性。
假設所有資料都已到達,事件時間操作将按預期運作,即使在處理亂序或延遲事件,或重新處理曆史資料時,也會産生正确且一緻的結果。例如,每小時事件時間視窗将包含所有帶有屬于該小時的事件時間戳的記錄,無論它們到達的順序或處理時間如何。(有關更多資訊,請參閱有關遲到事件的部分。)
6.3 事件時間和水印
事件時間是和消息同在的也就是說成了一種标記,不會像處理時間一樣,處理時間是随着時間的流逝機器上的時鐘是一直按照順序往後走的,不會說一會10點一會又9點,是以不存在亂序的問題。而事件時間因為時間是被記錄下來的考慮到有順序不同或延遲,要對此做出處理,另一方面時間被記錄下來處理存量資料時不用真的等到某個時間再處理按照記錄下來的時間如果速度快的話也能很快就處理完存量幾天的資料按照視窗的方式來處理。
如果是簡單的ETL清洗資料隻關注目前處理的這一條資料可以不用記錄資料的狀态,但是如果處理目前資料需要前一條或者前幾條資料配合處理或者處于同一視窗的資料就需要記錄資料的狀态。
假如都是有序的那麼水印就如下
假如都存在無序那麼水印就如下
從上面兩張圖可以看出第二張圖是對資料有一定的包容性,遲到資料也可以包容一下進來處理。當然上面這個圖沒能看出它設定的允許最大延遲是多少。
假如一條資料進來就判斷以下目前的水印。
假如允許遲到時間是5,上面是時間戳,下面是對應生成的水印,這樣對嗎?
肯定是不對的,水印是描述事件處理時間進展的,這個時間進度能倒退嗎顯然是不能的,時間是不能倒退的,是以應該是下面這樣。
水印随着時間流動隻能越來越大,被動拔高不能降低。
水印對于基于事件時間處理的流程式來說是至關重要的,如果說我知道我處理的資料遲到時間最大不會超過某個值,如果超過了這個處理機制就是存在問題的,比如本身就是實時性非常高的資料延遲卻很高就說明資料流轉的過程出現了性能問題。
其實針對遲到資料Flink也是提供了遲到處理的機制,即使真的遲到了也是可以處理的,不會漏掉。
6.3.1 時間語義和水印的設定
時間語義給定和水印生成是通過時間戳配置設定器完成的
- 内置的時間戳配置設定器
// 1. 不設定水印,此時可以配合處理時間來處理資料 WatermarkStrategy<UserBehaviorEvent> watermarks = WatermarkStrategy.<UserBehaviorEvent>noWatermarks(); // 2. 不允許遲到,也就是設定了水印,目前來的資料的時間的最高點就是水印 watermarks = WatermarkStrategy.<UserBehaviorEvent>forMonotonousTimestamps(); // 3. 允許固定時間水印,包容固定時間的遲到,下面是設定水印固定在最高點-1分鐘 watermarks = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMinutes(1L)); // 在設定好水印的處理方式後根據資料給定水印的依據也就是時間戳提取 watermarks = watermarks.withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } }); |
第四種就是自定義水印生成
- 周期性水印生成
// 自定義水印生成器,這裡是周期性發射水印 WatermarkStrategy<UserBehaviorEvent> watermarks = WatermarkStrategy.<UserBehaviorEvent>forGenerator(new WatermarkStrategy<UserBehaviorEvent>() { @Override public WatermarkGenerator<UserBehaviorEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { // 定義一個水印生成器 return new WatermarkGenerator<UserBehaviorEvent>() { // 允許最大的遲到時間 private final long lateTime = maxLateTime; // 目前最大的時間戳,這裡要在周期性發射裡面使用,裡面 - lateTime - 1, // 如果直接使用long最小值減去一個值就使結果成為一個最大值, // 這樣就違背了初始化的時候使maxTs為最小值的初衷是以在初始化的時候先提前加上 private long maxTs = Long.MIN_VALUE + lateTime + 1; // 當元素來的時候如何生成水印 @Override public void onEvent(UserBehaviorEvent event, long eventTimestamp, WatermarkOutput output) { maxTs = Math.max(maxTs, eventTimestamp); } // 周期性的發射水印,預設觸發器是200ms執行一次 @Override public void onPeriodicEmit(WatermarkOutput output) { Watermark watermark = new Watermark(maxTs - lateTime - 1); System.out.println("目前水印:" + watermark); output.emitWatermark(watermark); } }; } }); |
- 間歇式水印生成
// 自定義水印生成器,這裡是間歇性發射水印 WatermarkStrategy<UserBehaviorEvent> watermarks = WatermarkStrategy.<UserBehaviorEvent>forGenerator(new WatermarkStrategy<UserBehaviorEvent>() { @Override public WatermarkGenerator<UserBehaviorEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { // 定義一個水印生成器 return new WatermarkGenerator<UserBehaviorEvent>() { // 允許最大的遲到時間 private final long lateTime = maxLateTime; // 目前最大的時間戳,這裡要在周期性發射裡面使用,裡面 - lateTime - 1, // 如果直接使用long最小值減去一個值就使結果成為一個最大值, // 這樣就違背了初始化的時候使maxTs為最小值的初衷是以在初始化的時候先提前加上 private long maxTs = Long.MIN_VALUE + lateTime + 1; // 當元素來的時候如何生成水印,間歇性在這裡來發射水印 @Override public void onEvent(UserBehaviorEvent event, long eventTimestamp, WatermarkOutput output) { maxTs = Math.max(maxTs, eventTimestamp); // 事件來臨時發射水印,不來就不發射 Watermark watermark = new Watermark(maxTs - lateTime - 1); System.out.println("目前水印:" + watermark); output.emitWatermark(watermark); } // 周期性的發射水印,預設觸發器是200ms執行一次,如果是間歇性就不需要寫該方法 @Override public void onPeriodicEmit(WatermarkOutput output) { } }; } }); |
6.3.2 并行情況下的水印傳遞情況
在同一task不同的分區下面這幾個分區的水印是怎麼管理的,是統一的是按照水印最低的那個發射給下一個task中的。總是以最低的那個水印為準。
6.3.3 處理遲到資料
當使用事件時間且後來的資料超過了水印容忍的限度,那就存在了遲到資料,在正常情況下應該是不會有遲到的,但是出現了遲到資料也是有處理的方式的。當然這種做法隻能使用在事件時間上。
處理遲到資料API
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 對source進行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 設定好水印 WatermarkStrategy<UserBehaviorEvent> watermarkStrategy = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness( Duration.ofSeconds(1)) .withTimestampAssigner((SerializableTimestampAssigner<UserBehaviorEvent>) (element, recordTimestamp) -> element.getTimestamp()); OutputTag<UserBehaviorEvent> lateDataTag = new OutputTag<UserBehaviorEvent>("lateDataTag") { }; // 先做開窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> window = events.assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 對遲到給出聲明 .allowedLateness(Time.seconds(1)) .sideOutputLateData(lateDataTag); // 對正常開窗資料進行處理,注意下面得使用process對結果進行處理才能通過測輸出流得到遲到資料,使用reduce得不到遲到資料 SingleOutputStreamOperator<String> result = window // .reduce(new ReduceFunction<UserBehaviorEvent>() { // @Override // public UserBehaviorEvent reduce(UserBehaviorEvent value1, UserBehaviorEvent value2) throws Exception { // value1.setTimes(value1.getTimes() + value2.getTimes()); // return value1; // } // }) // .map(event -> String.format("%s使用者10s視窗pv統計%d", event.getUserId(), event.getTimes())); .process(new ProcessWindowFunction<UserBehaviorEvent, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { int size = IteratorUtils.toList(elements.iterator()).size(); out.collect(String.format("%s使用者10s視窗pv統計%d", s, size)); } }); // 列印處理結果 result.print("pv統計結果"); // 對遲到資料進行處理 result.getSideOutput(lateDataTag) .print("遲到資料"); env.execute(); |
側輸出流除了可以處理遲到資料還可以用來分離流資料,怎麼分離,比如埋點事件,要分成一個全部事件的主流再分出一個隻有點選事件的點選流。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 對source進行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 設定好水印 WatermarkStrategy<UserBehaviorEvent> watermarkStrategy = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness( Duration.ofSeconds(1)) .withTimestampAssigner((SerializableTimestampAssigner<UserBehaviorEvent>) (element, recordTimestamp) -> element.getTimestamp()); OutputTag<UserBehaviorEvent> clickDataTag = new OutputTag<UserBehaviorEvent>("clickDataTag") { }; KeyedStream<UserBehaviorEvent, String> keyedStream = events.assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(UserBehaviorEvent::getUserId); SingleOutputStreamOperator<UserBehaviorEvent> splitOperator = keyedStream.process(new KeyedProcessFunction<String, UserBehaviorEvent, UserBehaviorEvent>() { @Override public void processElement(UserBehaviorEvent value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { out.collect(value); // 将點選事件單獨處理 if ("click".equals(value.getEventType())) { ctx.output(clickDataTag, value); } } }); splitOperator.print("全部事件"); splitOperator.getSideOutput(clickDataTag).print("點選事件"); env.execute(); |
-
- 底層API操作
底層API就是process,其内部根據process的不同也是分為多種的process,這一層的API相對較為自由,所有的上層API都是可以基于process封裝出來。
- ProcessFunction
ProcessFunction是作用在普通的DataStream上面的process方法,加入使用process完成map操作
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象,使用process來完成 SingleOutputStreamOperator<UserBehaviorEvent> events = source.process(new ProcessFunction<String, UserBehaviorEvent>() { @Override public void processElement(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { // 自由處理給出了element元素,給出了ctx上下文環境,給出了結果收集器out UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } }); events.print("對象流"); env.execute(); |
- KeyedProcessFuntion
KeyedProcessFunction是作用在keyedStream後的Process上面,主要針對分組後的流資料做處理。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 使用KeyedProcessFunction需要先對流進行keyBy的處理 KeyedStream<String, String> keyedStream = source.keyBy(eventMessage -> JSON.parseObject(eventMessage).getString("user_id")); // 将分組之後的資料轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = keyedStream.process(new KeyedProcessFunction<String, String, UserBehaviorEvent>() { @Override public void processElement(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { // 自由處理給出了element元素,給出了ctx上下文環境,給出了結果收集器out UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } }); events.print("對象流"); env.execute(); |
- CoProcessFunction
Co開頭就是作用在ConnectedStream後面的操作,是以CoProcessFunction是作用在兩個流連接配接後的ConnectedStream的process上面
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); DataStreamSource<String> source2 = env.socketTextStream("hadoop02", 9999); // Co開頭的都是作用在ConnectedStream上面多個流連接配接起來的流 ConnectedStreams<String, String> connectedStreams = source1.connect(source2); // 分别對兩個流進行處理 SingleOutputStreamOperator<UserBehaviorEvent> events = connectedStreams.process(new CoProcessFunction<String, String, UserBehaviorEvent>() { @Override public void processElement1(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } @Override public void processElement2(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } }); events.print("合并後的對象流"); env.execute(); |
- ProcessJoinFunction
兩個流之間的關聯後的處理函數
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); DataStreamSource<String> source2 = env.socketTextStream("hadoop02", 9999); SingleOutputStreamOperator<UserBehaviorEvent> left = source1.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); SingleOutputStreamOperator<UserBehaviorEvent> right = source2.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); WatermarkStrategy<UserBehaviorEvent> watermark1 = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMillis(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } }); WatermarkStrategy<UserBehaviorEvent> watermark2 = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMillis(3)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } }); left = left.assignTimestampsAndWatermarks(watermark1); right = right.assignTimestampsAndWatermarks(watermark2); DataStream<String> result = left.join(right).where(UserBehaviorEvent::getUserId).equalTo(UserBehaviorEvent::getUserId) .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) .apply(new JoinFunction<UserBehaviorEvent, UserBehaviorEvent, String>() { @Override public String join(UserBehaviorEvent first, UserBehaviorEvent second) throws Exception { return String.format("left message:%s\tright message:%s", first, second); } }); result.print("join結果"); env.execute(); |
這裡面兩個流之間的關聯如果有水位線的話,下遊水位線依然是按照最低的那個流的水位線為标準。
- ProcessWindowFunction
在window後使用,是在keyedWindow後面使用process
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); SingleOutputStreamOperator<UserBehaviorEvent> left = source1.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); SingleOutputStreamOperator<String> result = left.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } })).keyBy(UserBehaviorEvent::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction<UserBehaviorEvent, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { int size = IteratorUtils.toList(elements.iterator()).size(); String message = String.format("%s使用者10s内的浏覽次數為%d", key, size); out.collect(message); } }); result.print("join結果"); env.execute(); } |
- ProcessAlWindowFunction
在windowAll後處理函數process裡使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); SingleOutputStreamOperator<UserBehaviorEvent> left = source1.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); SingleOutputStreamOperator<String> result = left.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } })).windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new ProcessAllWindowFunction<UserBehaviorEvent, String, TimeWindow>() { @Override public void process(Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { int size = IteratorUtils.toList(elements.iterator()).size(); String message = String.format("所有使用者5m内的浏覽次數為%d", size); out.collect(message); } }); result.print("join結果"); env.execute(); |
-
- 定時器Timer
定時器在接收到一個元素後可以注冊一個定時器,在指定的時間執行,可以依據處理時間來建立也可以依據事件時間建立。當然也可以删除定時器。
定時器是在TimeService上注冊或删除的,這個timeservice是在context或OnTimerContext中持有的,比如ProcessFunction中就有所有繼承自RichFunction的函數都持有這個上下文對象。
基于處理時間的定時器:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 對source進行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); events.keyBy(UserBehaviorEvent::getUserId) .process(new KeyedProcessFunction<String, UserBehaviorEvent, String>() { @Override public void processElement(UserBehaviorEvent value, Context ctx, Collector<String> out) throws Exception { TimerService timerService = ctx.timerService(); long currentProcessingTime = timerService.currentProcessingTime(); System.out.println(currentProcessingTime); timerService.registerProcessingTimeTimer(currentProcessingTime + Time.seconds(5).toMilliseconds()); out.collect(value.toString()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { System.out.println("定時執行:" + timestamp); } }).print("流資料"); env.execute(); |
基于事件時間的定時器:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 對source進行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); events = events.assignTimestampsAndWatermarks(WatermarkStrategy .<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMillis(1)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } })); events.keyBy(UserBehaviorEvent::getUserId) .process(new KeyedProcessFunction<String, UserBehaviorEvent, String>() { @Override public void processElement(UserBehaviorEvent value, Context ctx, Collector<String> out) throws Exception { TimerService timerService = ctx.timerService(); long watermark = timerService.currentWatermark(); System.out.println(watermark); timerService.registerEventTimeTimer(watermark + Time.milliseconds(5).toMilliseconds()); out.collect(value.toString()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { System.out.println("定時執行:" + timestamp); } }).print("流資料"); env.execute(); |
-
- 狀态程式設計
雖然資料流中的許多操作一次隻檢視一個單獨的事件(例如事件解析器),但有些操作會記住多個事件的資訊(例如視窗操作符)。這些操作稱為有狀态的。
- 當應用程式搜尋某些事件模式時,狀态将存儲到目前為止遇到的事件序列。
- 當聚合每分鐘/小時/天的事件時,狀态持有待處理的聚合。
- 在資料點流上訓練機器學習模型時,狀态儲存模型參數的目前版本。
- 當需要管理曆史資料時,狀态允許有效通路過去發生的事件。
狀态還可以用來作為檢查點和儲存點的容錯依據
未來還可以做到外部可查詢的狀态
狀态的管理可以使用不同的狀态後端來進行管理
-
-
- 狀态的分類
-
Flink包括兩種基本類型的狀态Managed State和Raw State
Managed State | Raw State | |
狀态管理方式 | Flink Runtime托管, 自動存儲, 自動恢複, 自動伸縮 | 使用者自己管理 |
狀态資料結構 | Flink提供多種常用資料結構, 例如:ListState, MapState等 | 位元組數組: byte[] |
使用場景 | 絕大數Flink算子 | 所有算子 |
-
-
-
- ManagedState
-
-
分為兩類KeyedState監控狀态
用在監控流上可以根據Key來記錄流資料的狀态
OperatorState算子狀态
普通算子狀态,一個算子任務對應一種狀态
Operator State | Keyed State | |
适用用算子類型 | 可用于所有算子: 常用于source, 例如 FlinkKafkaConsumer | 隻适用于KeyedStream上的算子 |
狀态配置設定 | 一個算子的子任務對應一個狀态 | 一個Key對應一個State: 一個算子會處理多個Key, 則通路相應的多個State |
建立和通路方式 | 實作CheckpointedFunction或ListCheckpointed(已經過時)接口 | 重寫RichFunction, 通過裡面的RuntimeContext通路 |
橫向擴充 | 并發改變時有多重重寫配置設定方式可選: 均勻配置設定和合并後每個得到全量 | 并發改變, State随着Key在執行個體間遷移 |
支援的資料結構 | ListState和BroadCastState | ValueState, ListState,MapState ReduceState, AggregatingState |
6.5.1.1 鍵控流狀态操作
- K-V形式的流,指定Key來對資料進行分區可以保證相同Key的被分到同樣的分區裡,友善通路統一分區的資料狀态。
-
- 狀态的儲存
-
Flink使用流重放和檢查點組合來實作容錯,任務失敗允許從儲存的記錄中恢複過來繼續執行,對于處理過程的容錯可以保證Exactly-Once嚴格一次性。
鍵控流操作都可以通過RichFunction中RuntimeContext runtimeContext = getRuntimeContext();擷取到運作時上下文環境,來獲得state狀态。
- VaueState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map來處理永遠記錄相同key情況下times最大的那個對象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, UserBehaviorEvent>() { // 記錄使用者times最大的那個值 ValueState<UserBehaviorEvent> maxTimesState; @Override public UserBehaviorEvent map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); maxTimesState = runtimeContext.getState(new ValueStateDescriptor<UserBehaviorEvent>("maxTimes", UserBehaviorEvent.class)); if (maxTimesState.value() == null) { maxTimesState.update(value); } else { if (maxTimesState.value().getTimes() < value.getTimes()) { maxTimesState.update(value); } } return maxTimesState.value(); } }).print("最大次數"); env.execute(); |
- ListState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map來處理永遠記錄top3的times的對象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, List<UserBehaviorEvent>>() { // 記錄使用者times最大的那個值 ListState<UserBehaviorEvent> top3TimesState; @Override public List<UserBehaviorEvent> map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); top3TimesState = runtimeContext.<UserBehaviorEvent>getListState(new ListStateDescriptor<UserBehaviorEvent>("top3Times", UserBehaviorEvent.class)); top3TimesState.add(value); List<UserBehaviorEvent> top3Events = ((List<UserBehaviorEvent>) IteratorUtils.toList(top3TimesState.get().iterator())); top3Events.sort(new Comparator<UserBehaviorEvent>() { @Override public int compare(UserBehaviorEvent e1, UserBehaviorEvent e2) { return (int) (e2.getTimes() - e1.getTimes()); } }); top3TimesState.clear(); top3Events = top3Events.stream().limit(3).collect(Collectors.toList()); top3TimesState.addAll(top3Events); return top3Events; } }).print("最大3次"); env.execute(); |
- ReducingState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map來處理永遠記錄top3的times的對象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, String>() { // 記錄使用者times最大的那個值 ReducingState<UserBehaviorEvent> reducingTimesState; @Override public String map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); reducingTimesState = runtimeContext.<UserBehaviorEvent>getReducingState(new ReducingStateDescriptor<UserBehaviorEvent>("top3Times", new ReduceFunction<UserBehaviorEvent>() { @Override public UserBehaviorEvent reduce(UserBehaviorEvent value1, UserBehaviorEvent value2) throws Exception { value1.setTimes(value1.getTimes() + value2.getTimes()); return value1; } }, UserBehaviorEvent.class)); reducingTimesState.add(value); UserBehaviorEvent event = reducingTimesState.get(); return String.format("%s使用者累計通路次數%d", event.getUserId(), event.getTimes()); } }).print("累計通路次數"); env.execute(); |
- AggregateState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map來處理永遠記錄top3的times的對象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, String>() { // 記錄使用者times最大的那個值 AggregatingState<UserBehaviorEvent, String> aggregatingState; @Override public String map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); aggregatingState = runtimeContext.<UserBehaviorEvent, Long, String>getAggregatingState(new AggregatingStateDescriptor<UserBehaviorEvent, Long, String>("agg", new AggregateFunction<UserBehaviorEvent, Long, String>() { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehaviorEvent value, Long accumulator) { return value.getTimes() + accumulator; } @Override public String getResult(Long accumulator) { return accumulator.toString(); } @Override public Long merge(Long a, Long b) { return a + b; } }, Long.class)); aggregatingState.add(value); return String.format("%s使用者累計通路次數%s", value.getUserId(), aggregatingState.get()); } }).print("累計通路次數"); env.execute(); |
- MapState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将使用者行為轉化為對象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 按照使用者id進行去重永遠取得最早的那個 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, String>() { // 記錄使用者times最大的那個值 MapState<String, UserBehaviorEvent> mapState; @Override public String map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); mapState = runtimeContext.<String, UserBehaviorEvent>getMapState(new MapStateDescriptor<String, UserBehaviorEvent>("mapState", String.class, UserBehaviorEvent.class)); if (mapState.get(value.getUserId()) == null) { mapState.put(value.getUserId(), value); } return String.format("%s使用者通路最早是在%d", value.getUserId(), mapState.get(value.getUserId()).getTimestamp()); } }).print("累計通路次數"); env.execute(); |
6.5.1.2 算子狀态操作
這兩種算子狀态可以不做keyBy就可以使用,隻要Function繼承自CheckpointedFunction就可以使用,需要實作一個初始化方法一個snapshot快照方法。
ListState
BroadState
-
-
- 狀态後端
-
狀态後端就是配合checkpoint完成狀态存儲的,将資料處理儲存在TaskManager之外再儲存在外部存儲一份。
三種狀态後端存儲方式,Memory、Fs、RocksDB
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 首先開啟使用checkpoint env.isUnalignedCheckpointsEnabled(); // 再設定開啟checkpoint後狀态資料儲存在什麼樣的後端 // 記憶體後端 env.setStateBackend(new MemoryStateBackend()); // 外部檔案後端 env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/user/flink/ck")); // rocksDB後端 env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop01:9000/user/flink/ck")); |
-
- 容錯機制
一緻性,三個級别至少一次、最多一次、精準一次。從Flink資料流的各個階段來看不同階段的一緻性Source、flink流處理、Sink,首先flink流處理可以基于checkpoint完成精準一次性,除此之外Source得符合重新送出偏移量能從執行偏移量開始接收資料才能完成source端的精準一次,其次Sink端要麼Sink到支援幂等性的存儲要麼實作兩段送出(也就是内部實作一個事務)。
Checkpoint檢查點如何容錯
資料處理過程中出錯如何恢複達到精準一次性,假設場景是這樣的
一個輸入資料流經過Source,Source記錄一下目前的偏移量,然後發送給後面的處理算子,兩個并行的處理算子上面用于sum偶數下面sum奇數,當5這條資料被後面的sum處理完後這三個位置一起記錄一個checkpoint可用于恢複的檢查點。
然後假設當下一條資料出現了錯誤
當Source目前記錄是7,正在處理7的過程中發生了錯誤,上一次資料處理完記錄的快照是5,6,9然後把source偏移量進行調整至6開始接收資料,sum_even位置是6,sum_odd是9,這樣就恢複了從6開始的消費然後正常進行下面的處理。
注意:檢查點不是每一條資料都會做檢查點,檢查點是周期性的,不是說7失敗了一定會從6個點記錄的檢查點開始恢複(因為周期性記錄的原因6完成後不一定記錄了快照)
這裡存在一個問題?
如果5這個偏移量的時候要記錄快照了,sum_even和sum_odd是如何知道5這個偏移量的資料已經處理完了呢,5是奇數sum_odd處理了這條資料很容易能夠感覺,但是sum_even是如何知道5已經從整個的資料流已經處理完了呢,不知道。這樣就得使用某種機制讓sum_even也知道5已經處理完了。
這裡就是插入一個Barrier(分界線),假如偏移量到了5,source接收到是5,将這個分界線5以廣播的方式發送到下遊的各個subtask,當然sum_even不處理這條資料但是已經知道偏移量到了5可以給出一個快照就是sum_even:5:6,sum_odd處理完5也可以給出一個快照sum_odd:5:9,這樣就在checkpoint裡面制作好了一個完整的基于5的一個快照,等處理後面的資料出現了差錯而且還沒來的及制作下一快照時就可以使用5這個快照來做狀态恢複。
這裡還有個問題就是對齊和非對齊的Barrier的問題,就是如果資料源是多份,同時存在兩個并行的Source來接收資料。
上圖中,數字流和字母流,如果中間圖執行比如CoProcessFunction雙流操作的時候,一個流比較快一個流比較慢需,在一個Operator算子處理的時候如果Barrier不齊,它會等到對齊之後再處理,如果先來的資料會怎麼辦呢,先儲存起來,當Barrier對齊了之後再處理緩存起來的資料。這樣是有可能引起反壓機制的,處理不過來資料緩存過大為了Barrier對齊,降低了處理速度。
當然也是可以不保證Barrier對齊再處理的,這樣就可能不會完成嚴格一次性的要求了。
SavePoint&Checkpoint操作
代碼中配置檢查點配置
System.setProperty("HADOOP_USER_NAME","hadoop"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 設定狀态後端,也就是聲明資料儲存的位置 env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink/ck")); env.setParallelism(1); // 2. 配置檢查點 // 設定允許checkpoint env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 觸發下一個檢查點之前的最小暫停 checkpointConfig.setMinPauseBetweenCheckpoints(500); // 設定checkpoint檢查設定的時長,如果超過這個時長就會抛棄做該檢查點 checkpointConfig.setCheckpointTimeout(60000); // 設定同一時間檢查點的并發量 checkpointConfig.setMaxConcurrentCheckpoints(1); // 設定檢查點會不會被cancel後備删除,如果删除将無法從檢查點恢複 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 啟用未對齊的檢查點這樣可以增大吞吐量但是精準一次性會被忽略 checkpointConfig.enableUnalignedCheckpoints(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); SingleOutputStreamOperator<String> words = source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] s = value.split(" "); for (String word : s) { out.collect(word); } } }); SingleOutputStreamOperator<Tuple2<String, Long>> maps = words.map(word -> new Tuple2<String, Long>(word, 1L)).returns(TupleTypeInfo.getBasicTupleTypeInfo(String.class, Long.class)); SingleOutputStreamOperator<Tuple2<String, Long>> result = maps.keyBy(tuple -> tuple.f0).sum(1); result.print("result"); env.execute(); |
- 送出任務時設定savepoint
正常送出任務
bin/flink run -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yjm 1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
設定對應任務的檢查點
bin/flink savepoint -m yarn-cluster -yid application_1623244631654_0003 69819b8ef328cd01cc04bd6d7e46791e hdfs://hadoop01:9000/flink/savepoint
- 從savepoint恢複
bin/flink run -s hdfs://hadoop01:9000/flink/savepoint/savepoint-69819b-7b0d45c8b11f -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yj1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
如果是使用checkpoint
- 正常送出任務
bin/flink run -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yjm 1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
- 從檢查點恢複
bin/flink run -s hdfs://hadoop01:9000/flink/ck/21fdd3943b2b5454480915cfc047395f/chk-5 -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yjm 1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
兩階段送出
前面提到整個的過程如果都實作了Exactly-once才能真正的實作。
- Source端讀取資料需要支援指定位點擷取資料(Kafka支援)
- 處理的過程中支援狀态儲存(Checkpoint可以實作)
- Sink端支援事務(儲存資料要麼資料庫可以幂等寫入,要麼得支援事務先flush等checkpoint完成後再次确認commit動作)