天天看點

Flink countWindow 使用

1. 說明

countWindows 包括滾動視窗類型和滑動視窗類型。以下通過代碼和輸出來說明 countWindows()邏輯。資料源代碼:

public static class StreamDataSource extends RichParallelSourceFunction<Tuple2<String, String>> {
        private volatile boolean running = true;

        @Override
        public void run(SourceContext<Tuple2<String, String>> ctx) throws InterruptedException {

            Tuple2[] elements = new Tuple2[]{
                Tuple2.of("a", "1"),
                Tuple2.of("a", "2"),
                Tuple2.of("a", "3"),
                Tuple2.of("a", "4"),
                Tuple2.of("a", "5"),
                Tuple2.of("a", "6"),
                Tuple2.of("b", "7"),
                Tuple2.of("b", "8"),
                Tuple2.of("b", "9"),
                Tuple2.of("b", "0")
            };

            int count = 0;
            while (running && count < elements.length) {
                ctx.collect(new Tuple2<>((String) elements[count].f0, (String) elements[count].f1));
                count++;
                Thread.sleep(1000);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
           

2. 滾動計數視窗

設定 windowSize=3,即每3個元素進來計算一次,這個好像比較簡單,不額外說明了。

2.1 代碼

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class FlinkCountWindowDemo {

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        final int windowSize = params.getInt("window", 3);

        // read source data
        DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource());

        // calculate
        DataStream<Tuple2<String, String>> outStream = inStream
                                                           .keyBy(0)
                                                           .countWindow(windowSize)
                                                           .reduce(
                                                               new ReduceFunction<Tuple2<String, String>>() {
                                                                   @Override
                                                                   public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
                                                                       return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1);
                                                                   }
                                                               }
                                                           );
        outStream.print();
        env.execute("WindowWordCount");
    }
}
           

2.2 輸出

(a,123)
(a,456)
(b,789)
           

2.3 說明

結果顯示丢了一條資料

Tuple2.of("b", "0")

,因為最後一條資料已經無法被觸發計算了。輸出符合預期。

3. 滑動計數視窗

盜用Flink 原理與實作:Window 機制中的一張圖,假設有一個滑動計數視窗,每2個元素計算一次最近4個元素的總和,那麼視窗工作示意圖如下所示:

Flink countWindow 使用

以下代碼是字元串相加示例,和這個圖的邏輯幾乎一緻。以下輸出正好可以驗證這個圖。

3.1 代碼

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class FlinkCountWindowDemo {

    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(1);
        final int windowSize = params.getInt("window", 3);
        final int slideSize = params.getInt("slide", 2);

        // read source data
        DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource());

        // calculate
        DataStream<Tuple2<String, String>> outStream = inStream
                                                           .keyBy(0)
                                                           .countWindow(windowSize, slideSize)
                                                           .reduce(
                                                               new ReduceFunction<Tuple2<String, String>>() {
                                                                   @Override
                                                                   public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
                                                                       return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1);
                                                                   }
                                                               }
                                                           );
        outStream.print();
        env.execute("WindowWordCount");
    }
}
           

3.2 輸出

(a,12)
(a,234)
(a,456)
(b,78)
(b,890)
           

3.3 說明

其實就是每進來兩個元素,就對最近的三個元素計算一遍,結果符合預期的。

繼續閱讀