1. 說明
countWindows 包括滾動視窗類型和滑動視窗類型。以下通過代碼和輸出來說明 countWindows()邏輯。資料源代碼:
public static class StreamDataSource extends RichParallelSourceFunction<Tuple2<String, String>> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Tuple2<String, String>> ctx) throws InterruptedException {
Tuple2[] elements = new Tuple2[]{
Tuple2.of("a", "1"),
Tuple2.of("a", "2"),
Tuple2.of("a", "3"),
Tuple2.of("a", "4"),
Tuple2.of("a", "5"),
Tuple2.of("a", "6"),
Tuple2.of("b", "7"),
Tuple2.of("b", "8"),
Tuple2.of("b", "9"),
Tuple2.of("b", "0")
};
int count = 0;
while (running && count < elements.length) {
ctx.collect(new Tuple2<>((String) elements[count].f0, (String) elements[count].f1));
count++;
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
2. 滾動計數視窗
設定 windowSize=3,即每3個元素進來計算一次,這個好像比較簡單,不額外說明了。
2.1 代碼
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class FlinkCountWindowDemo {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
final int windowSize = params.getInt("window", 3);
// read source data
DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource());
// calculate
DataStream<Tuple2<String, String>> outStream = inStream
.keyBy(0)
.countWindow(windowSize)
.reduce(
new ReduceFunction<Tuple2<String, String>>() {
@Override
public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1);
}
}
);
outStream.print();
env.execute("WindowWordCount");
}
}
2.2 輸出
(a,123)
(a,456)
(b,789)
2.3 說明
結果顯示丢了一條資料
Tuple2.of("b", "0")
,因為最後一條資料已經無法被觸發計算了。輸出符合預期。
3. 滑動計數視窗
盜用Flink 原理與實作:Window 機制中的一張圖,假設有一個滑動計數視窗,每2個元素計算一次最近4個元素的總和,那麼視窗工作示意圖如下所示:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczLcVmds92czlGZvwVP9EUTDZ0aRJkSwk0LcxGbpZ2LcBDM08CXlpXazRnbvZ2LcRlMMVDT2EWNvwFdu9mZvwleohkW6h2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2LcRHelR3LcJzLctmch1mclRXY39DO3MzM0ETN2EjNxkDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
以下代碼是字元串相加示例,和這個圖的邏輯幾乎一緻。以下輸出正好可以驗證這個圖。
3.1 代碼
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class FlinkCountWindowDemo {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
final int windowSize = params.getInt("window", 3);
final int slideSize = params.getInt("slide", 2);
// read source data
DataStreamSource<Tuple2<String, String>> inStream = env.addSource(new StreamDataSource());
// calculate
DataStream<Tuple2<String, String>> outStream = inStream
.keyBy(0)
.countWindow(windowSize, slideSize)
.reduce(
new ReduceFunction<Tuple2<String, String>>() {
@Override
public Tuple2<String, String> reduce(Tuple2<String, String> value1, Tuple2<String, String> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + "" + value2.f1);
}
}
);
outStream.print();
env.execute("WindowWordCount");
}
}
3.2 輸出
(a,12)
(a,234)
(a,456)
(b,78)
(b,890)
3.3 說明
其實就是每進來兩個元素,就對最近的三個元素計算一遍,結果符合預期的。