天天看點

SparkStreaming視窗入門

window操作就是視窗函數。Spark Streaming提供了滑動視窗操作的支援,進而讓我們可以對一個滑動視窗内的資料執行計算操作。每次掉落在視窗内的RDD的資料,會被聚合起來執行計算操作,然後生成的RDD,會作為window DStream的一個RDD。比如下圖中,就是對每三秒鐘的資料執行一次滑動視窗計算,這3秒内的3個RDD會被聚合起來進行處理,然後過了兩秒鐘,又會對最近三秒内的資料執行滑動視窗計算。是以每個滑動視窗操作,都必須指定兩個參數,視窗長度以及滑動間隔,而且這兩個參數值都必須是batch間隔的整數倍。

SparkStreaming視窗入門

案例示範

以nc作為源頭進行測試

nc -lk mypc01 10087
           
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object WindowDemo1 extends App {
  private val conf = new SparkConf().setAppName("test").setMaster("local[*]")
  private val duration: Duration = Seconds(10)
  //建構StreamingContext
  private val ssc: StreamingContext = new StreamingContext(conf, duration)
  //以socket作為源
  private val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)
  reduceByKeyAndWindowDemo().print()
  ssc.start()
  ssc.awaitTermination()

  def reduceByKeyAndWindowDemo() = {
    val value: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(30), Seconds(20))
    val value2: DStream[(String, Int)] = value.transform((rdd: RDD[(String, Int)]) => {
    //降序排序并取前三
      val tuples: Array[(String, Int)] = rdd.sortBy((_._2), ascending = false).take(3)
      //Array轉為RDD,因為transform要傳回一個RDD
      val value1: RDD[(String, Int)] = ssc.sparkContext.makeRDD(tuples)
      value1
    })
    value2
  }
}
           

方法解析

通過在此DStream的滑動視窗上應用reduceByKey來傳回新的DStream。 與DStream.reduceByKey()類似,但将其應用于滑動視窗。 新的DStream生成與該DStream具有相同間隔的RDD。 哈希分區用于生成具有Spark預設分區數的RDD。

參數:

reduceFunc –關聯和交換的reduce函數

windowDuration –視窗的寬度; 必須是此DStream批處理間隔的倍數

繼續閱讀