一、滾動視窗(Tumbling Windows) 滾動視窗有固定的大小,是一種對資料進行均勻切片的劃分方式。視窗之間沒有重疊,也不會有間隔,是“首尾相接”的狀态。滾動視窗可以基于時間定義,也可以基于資料個數定義;需要的參數隻有一個,就是視窗的大小(window size)。
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwVZnFWbp1zczV2YvJHctM3cv1Ces0zaHRGcWdUYuVzVa9GczoVdG1mWfVGc5RHLwIzX39GZhh2csATMflHLwEzX4xSZz91ZsAzMfRHLGZkRGZkRfJ3bs92YskmNhVTYykVNQJVMRhXVEF1X0hXZ0xCNx8VZ6l2cssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLyAjN3kjMygTNhRzMhVTMzYzXyQDMyUTM1AzLclDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
在sparkstreaming中,滾動視窗需要設定視窗大小和滑動間隔,視窗大小和滑動間隔都是StreamingContext的間隔時間的倍數,同時視窗大小和滑動間隔相等,如:
.window(Seconds(10),Seconds(10)) 10秒的視窗大小和10秒的滑動大小,不存在重疊部分
package com.examples;
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
* Created by lj on 2022-07-12.
*/
public class SparkSql_Socket_Tumble {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
/**
* 設定日志的級别: 避免日志重複
*/
ssc.sparkContext().setLogLevel("ERROR");
//從socket源擷取資料
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(3), Durations.minutes(3)); //滾動視窗:需要設定視窗大小和滑動間隔,視窗大小和滑動間隔都是StreamingContext的間隔時間的倍數,同時視窗大小和滑動間隔相等。
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 建立臨時表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//輸出前20條資料
result.show();
}
});
//開始作業
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}
代碼中定義了一個3分鐘的時間視窗和3分鐘的滑動大小,運作結果可以看出資料沒有出現重疊,實作了滾動視窗的效果:
二、滑動視窗(Sliding Windows)與滾動視窗類似,滑動視窗的大小也是固定的。差別在于,視窗之間并不是首尾相接的,而是可以“錯開”一定的位置。如果看作一個視窗的運動,那麼就像是向前小步“滑動”一樣。定義滑動視窗的參數有兩個:除去視窗大小(window size)之外,還有一個滑動步長(window slide),代表視窗計算的頻率。
在sparkstreaming中,滑動視窗需要設定視窗大小和滑動間隔,視窗大小和滑動間隔都是StreamingContext的間隔時間的倍數,同時視窗大小和滑動間隔不相等,如:
.window(Seconds(10),Seconds(5)) 10秒的視窗大小和5秒的活動大小,存在重疊部分
package com.examples;
import com.pojo.WaterSensor;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.ArrayList;
import java.util.List;
/**
* Created by lj on 2022-07-12.
*/
public class SparkSql_Socket {
private static String appName = "spark.streaming.demo";
private static String master = "local[*]";
private static String host = "localhost";
private static int port = 9999;
public static void main(String[] args) {
//初始化sparkConf
SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);
//獲得JavaStreamingContext
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
/**
* 設定日志的級别: 避免日志重複
*/
ssc.sparkContext().setLogLevel("ERROR");
//從socket源擷取資料
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, port);
JavaDStream<WaterSensor> mapDStream = lines.map(new Function<String, WaterSensor>() {
private static final long serialVersionUID = 1L;
public WaterSensor call(String s) throws Exception {
String[] cols = s.split(",");
WaterSensor waterSensor = new WaterSensor(cols[0], Long.parseLong(cols[1]), Integer.parseInt(cols[2]));
return waterSensor;
}
}).window(Durations.minutes(4), Durations.minutes(2)); //滑動視窗:指定視窗大小 和 滑動頻率 必須是批處理時間的整數倍
mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
@Override
public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());
Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
// 建立臨時表
dataFrame.createOrReplaceTempView("log");
Dataset<Row> result = spark.sql("select * from log");
System.out.println("========= " + time + "=========");
//輸出前20條資料
result.show();
}
});
//開始作業
ssc.start();
try {
ssc.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
} finally {
ssc.close();
}
}
}