window操作就是視窗函數。Spark Streaming提供了滑動視窗操作的支援,進而讓我們可以對一個滑動視窗内的資料執行計算操作。每次掉落在視窗内的RDD的資料,會被聚合起來執行計算操作,然後生成的RDD,會作為window DStream的一個RDD。比如下圖中,就是對每三秒鐘的資料執行一次滑動視窗計算,這3秒内的3個RDD會被聚合起來進行處理,然後過了兩秒鐘,又會對最近三秒内的資料執行滑動視窗計算。是以每個滑動視窗操作,都必須指定兩個參數,視窗長度以及滑動間隔,而且這兩個參數值都必須是batch間隔的整數倍。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9UFVPBTRU10MBRVT3V1MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL0EzM3EDMzkDM4AjMxAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
案例示範
以nc作為源頭進行測試
nc -lk mypc01 10087
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}
object WindowDemo1 extends App {
private val conf = new SparkConf().setAppName("test").setMaster("local[*]")
private val duration: Duration = Seconds(10)
//建構StreamingContext
private val ssc: StreamingContext = new StreamingContext(conf, duration)
//以socket作為源
private val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)
reduceByKeyAndWindowDemo().print()
ssc.start()
ssc.awaitTermination()
def reduceByKeyAndWindowDemo() = {
val value: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(30), Seconds(20))
val value2: DStream[(String, Int)] = value.transform((rdd: RDD[(String, Int)]) => {
//降序排序并取前三
val tuples: Array[(String, Int)] = rdd.sortBy((_._2), ascending = false).take(3)
//Array轉為RDD,因為transform要傳回一個RDD
val value1: RDD[(String, Int)] = ssc.sparkContext.makeRDD(tuples)
value1
})
value2
}
}
方法解析
通過在此DStream的滑動視窗上應用reduceByKey來傳回新的DStream。 與DStream.reduceByKey()類似,但将其應用于滑動視窗。 新的DStream生成與該DStream具有相同間隔的RDD。 哈希分區用于生成具有Spark預設分區數的RDD。
參數:
reduceFunc –關聯和交換的reduce函數
windowDuration –視窗的寬度; 必須是此DStream批處理間隔的倍數