天天看點

Spark Streaming的DStream轉換

目錄

​​DStream轉換​​

​​1、無狀态轉換​​

​​2、有狀态轉換​​

​​2-1、updateStateByKey​​

​​2-2、Window Operations​​

DStream轉換

    DStream上的原語與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關的原語。

1、無狀态轉換

    無狀态轉化操作就是把簡單的 RDD 轉化操作應用到每個批次上,也就是轉化 DStream 中的每一個 RDD。 注意,針對鍵值對的 DStream 轉化操作(比 如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。

     1、def map[U: ClassTag](mapFunc: T => U): DStream[U]   将源DStream中的每個元素通過一個函數func進而得到新的DStreams。

     2、def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U]   和map類似,但是每個輸入的項可以被映射為0或更多項。

     3、def filter(filterFunc: T => Boolean): DStream[T]  選擇源DStream中函數func判為true的記錄作為新DStream

     4、def repartition(numPartitions: Int): DStream[T]   通過建立更多或者更少的partition來改變此DStream的并行級别。

     5、def union(that: DStream[T]): DStream[T]   将一個具有相同slideDuration(時間視窗)新的DStream和目前DStream進行合并,傳回新的DStream

     6、def count(): DStream[Long]   統計源DStreams中每個RDD所含元素的個數得到單元素RDD的新DStreams。

     7、def reduce(reduceFunc: (T, T) => T): DStream[T]   

            通過函數func(兩個參數一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStream。

     8、def countByValue(numPartitions: Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null): DStream[(T, Long)]   對于DStreams中元素類型為K調用此函數,得到包含(K,Long)對的新DStream,其中Long值表明相應的K在源DStream中每個RDD出現的次數。

     9、def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)]   

            對(K,V)對的DStream調用此函數,傳回同樣(K,V)對的新DStream,但是新DStream中的對應V為使用reduce函數整合而來

    10、def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))]   兩DStream分别為(K,V)和(K,W)對,傳回(K,(V,W))對的新DStream。

    11、def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))]   

            兩DStream分别為(K,V)和(K,W)對,傳回(K,(Seq[V],Seq[W])對新DStream

    12、def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U]   将RDD到RDD映射的函數func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作。注意的是,在這個轉換函數裡面能夠應用所有RDD的轉換操作。

2、有狀态轉換

2-1、updateStateByKey

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]

// Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of each key.Hash partitioning is used to generate the RDDs with Spark's default number of partitions.

    1、S是你需要儲存的狀态的類型。

    2、updateFunc 是定義了每一批次RDD如何來更新的狀态值。Seq[V] 是目前批次相同key的值的集合。Option[S] 是架構自動提供的,上一次儲存的狀态的值。

    3、updateStateByKey會傳回一個新的DStream,該DStream中儲存了(Key,State)的序列。

2-2、Window Operations

    Window Operations有點類似于Storm中的State,可以設定視窗的大小和滑動視窗的間隔來動态的擷取目前Steaming的允許狀态。基于視窗的操作會在一個比 StreamingContext 的批次間隔更長的時間範圍内,通過整合多個批次的結果,計算出整個視窗的結果。

Spark Streaming的DStream轉換

    所有基于視窗的操作都需要兩個參數,分别為視窗時長以及滑動步長,兩者都必須是 StreamContext 的批次間隔的整數倍。視窗時長控制每次計算最近的多少個批次的資料,其實就是最近的 windowDuration/batchInterval 個批次。如果有一個以 10 秒為批次間隔的源 DStream,要建立一個最近 30 秒的時間視窗(即最近 3 個批次),就應當把 windowDuration 設為 30 秒。而滑動步長的預設值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們隻希望每兩個批次計算一次視窗結果, 就應該把滑動步長設定為 20 秒。

Spark Streaming的DStream轉換

1、def window(windowDuration: Duration, slideDuration: Duration): DStream[T]   

    基于對源DStream窗化的批次進行計算傳回一個新的DStream,windowDuration是視窗大小,slideDuration滑動步長。

2、def countByWindow(windowDuration: Duration,slideDuration: Duration): DStream[Long]   注意,傳回的是window中記錄的條數。

3、def reduceByWindow(reduceFunc: (T, T) => T,windowDuration: Duration,slideDuration: Duration): DStream[T]

     通過使用自定義函數整合滑動區間流元素來建立一個新的單元素流。

4、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration): DStream[(K, V)]

    通過給定的視窗大小以滑動步長來應用reduceFunc函數,傳回DStream[(K, V)], K就是DStream中相應的K,V是window應用了reduce之後産生的最終值。

5、def reduceByKeyAndWindow(reduceFunc: (V, V) => V,invReduceFunc: (V, V) => V,windowDuration: Duration,slideDuration: Duration = self.slideDuration,numPartitions: Int = ssc.sc.defaultParallelism,filterFunc: ((K, V)) => Boolean = null): DStream[(K, V)]

    這個版本是4版本的優化版本,計算方式不同,采用增量計算的模式,每次計算隻是在上一次計算的基礎上減去丢失的值,加上增加的值。