天天看點

Spark Streaming轉換操作

在流計算應用場景中,資料流會源源不斷到達,Spark Streaming會把連續的資料流切分成一個又一個分段,然後,對每個分段内的DStream資料進行處理,也就是對DStream進行各種轉換操作,包括無狀态轉換操作和有狀态轉換操作。

DStream上的原語與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關的原語。

Transformation Meaning
map(func) 将源DStream中的每個元素通過一個函數func進而得到新的DStreams。
flatMap(func) 和map類似,但是每個輸入的項可以被映射為0或更多項。
filter(func) 選擇源DStream中函數func判為true的記錄作為新DStreams 
repartition(numPartitions) 通過建立更多或者更少的partition來改變此DStream的并行級别。
union(otherStream) 聯合源DStreams和其他DStreams來得到新DStream 
count() 統計源DStreams中每個RDD所含元素的個數得到單元素RDD的新DStreams。
reduce(func) 通過函數func(兩個參數一個輸出)來整合源DStreams中每個RDD元素得到單元素RDD的DStreams。這個函數需要關聯進而可以被并行計算。
countByValue() 對于DStreams中元素類型為K調用此函數,得到包含(K,Long)對的新DStream,其中Long值表明相應的K在源DStream中每個RDD出現的頻率。
reduceByKey(func, [numTasks]) 對(K,V)對的DStream調用此函數,傳回同樣(K,V)對的新DStream,但是新DStream中的對應V為使用reduce函數整合而來。Note:預設情況下,這個操作使用Spark預設數量的并行任務(本地模式為2,叢集模式中的數量取決于配置參數spark.default.parallelism)。你也可以傳入可選的參數numTaska來設定不同數量的任務。 
join(otherStream, [numTasks]) 兩DStream分别為(K,V)和(K,W)對,傳回(K,(V,W))對的新DStream。 
cogroup(otherStream, [numTasks]) 兩DStream分别為(K,V)和(K,W)對,傳回(K,(Seq[V],Seq[W])對新DStreams 
transform(func) 将RDD到RDD映射的函數func作用于源DStream中每個RDD上得到新DStream。這個可用于在DStream的RDD上做任意操作。 
updateStateByKey(func) 得到”狀态”DStream,其中每個key狀态的更新是通過将給定函數用于此key的上一個狀态和新值而得到。這個可用于儲存每個key值的任意狀态資料。 

DStream 的轉化操作可以分為無狀态(stateless)和有狀态(stateful)兩種。

• 在無狀态轉化操作中,每個批次的處理不依賴于之前批次的資料。常見的 RDD 轉化操作,例如 map()、filter()、reduceByKey() 等,都是無狀态轉化操作。

• 相對地,有狀态轉化操作需要使用之前批次的資料或者是中間結果來計算目前批次的資料。有狀态轉化操作包括基于滑動視窗的轉化操作和追蹤狀态變化的轉化操作。

無狀态轉化操作

 無狀态轉化操作就是把簡單的 RDD 轉化操作應用到每個批次上,也就是轉化 DStream 中的每一個 RDD。部分無狀态轉化操作列在了下表中。 注意,針對鍵值對的 DStream 轉化操作(比如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。

需要記住的是,盡管這些函數看起來像作用在整個流上一樣,但事實上每個 DStream 在内部是由許多 RDD(批次)組成,且無狀态轉化操作是分别應用到每個 RDD 上的。例如, reduceByKey() 會歸約每個時間區間中的資料,但不會歸約不同區間之間的資料。

舉個例子,在之前的wordcount程式中,我們隻會統計1秒内接收到的資料的單詞個數,而不會累加。 

無狀态轉化操作也能在多個 DStream 間整合資料,不過也是在各個時間區間内。例如,鍵 值對 DStream 擁有和 RDD 一樣的與連接配接相關的轉化操作,也就是 cogroup()、join()、 leftOuterJoin() 等。我們可以在 DStream 上使用這些操作,這樣就對每個批次分别執行了對應的 RDD 操作。

我們還可以像在正常的 Spark 中一樣使用 DStream 的 union() 操作将它和另一個 DStream 的内容合并起來,也可以使用 StreamingContext.union() 來合并多個流。

有狀态轉化操作

DStream有狀态轉換操作包括滑動操作和updateStateByKey操作。

滑動視窗轉換操作

Window Operations有點類似于Storm中的State,可以設定視窗的大小和滑動視窗的間隔來動态的擷取目前Steaming的允許狀态。

基于視窗的操作會在一個比 StreamingContext 的批次間隔更長的時間範圍内,通過整合多個批次的結果,計算出整個視窗的結果。

所有基于視窗的操作都需要兩個參數,分别為視窗時長以及滑動步長,兩者都必須是 StreamContext 的批次間隔的整數倍。視窗時長控制每次計算最近的多少個批次的資料,其實就是最近的 windowDuration/batchInterval 個批次。如果有一個以 10 秒為批次間隔的源 DStream,要建立一個最近 30 秒的時間視窗(即最近 3 個批次),就應當把 windowDuration 設為 30 秒。而滑動步長的預設值與批次間隔相等,用來控制對新的 DStream 進行計算的間隔。如果源 DStream 批次間隔為 10 秒,并且我們隻希望每兩個批次計算一次視窗結果, 就應該把滑動步長設定為 20 秒。

window(windowLength, slideInterval) 基于對源DStream窗化的批次進行計算傳回一個新的DStream
countByWindow(windowLength, slideInterval) 傳回一個滑動視窗計數流中的元素。
reduceByWindow(func, windowLength, slideInterval) 通過使用自定義函數整合滑動區間流元素來建立一個新的單元素流。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 當在一個(K,V)對的DStream上調用此函數,會傳回一個新(K,V)對的DStream,此處通過對滑動視窗中批次資料使用reduce函數來整合每個key的value值。Note:預設情況下,這個操作使用Spark的預設數量并行任務(本地是2),在叢集模式中依據配置屬性(spark.default.parallelism)來做grouping。你可以通過設定可選參數numTasks來設定不同數量的tasks。 
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 這個函數是上述函數的更高效版本,每個視窗的reduce值都是通過用前一個窗的reduce值來遞增計算。通過reduce進入到滑動視窗資料并”反向reduce”離開視窗的舊資料來實作這個操作。一個例子是随着視窗滑動對keys的“加”“減”計數。通過前邊介紹可以想到,這個函數隻适用于”可逆的reduce函數”,也就是這些reduce函數有相應的”反reduce”函數(以參數invFunc形式傳入)。如前述函數,reduce任務的數量通過可選參數來配置。注意:為了使用這個操作,檢查點必須可用。 
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 對(K,V)對的DStream調用,傳回(K,Long)對的新DStream,其中每個key的值是其在滑動視窗中頻率。如上,可配置reduce任務數量。

reduceByWindow() 和 reduceByKeyAndWindow() 讓我們可以對每個視窗更高效地進行歸約操作。它們接收一個歸約函數,在整個視窗上執行,比如 +。除此以外,它們還有一種特殊形式,通過隻考慮新進入視窗的資料和離開窗 口的資料,讓 Spark 增量計算歸約結果。這種特殊形式需要提供歸約函數的一個逆函數,比 如 + 對應的逆函數為 -。對于較大的視窗,提供逆函數可以大大提高執行效率  

val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(

  {(x, y) => x + y},
  {(x, y) => x - y},
  Seconds(30),
  Seconds(10))           

追蹤狀态變化UpdateStateByKey

UpdateStateByKey原語用于記錄曆史記錄,有時,我們需要在 DStream 中跨批次維護狀态(例如流計算中累加wordcount)。針對這種情況,updateStateByKey() 為我們提供了對一個狀态變量的通路,用于鍵值對形式的 DStream。給定一個由(鍵,事件)對構成的 DStream,并傳遞一個指定如何根據新的事件 更新每個鍵對應狀态的函數,它可以建構出一個新的 DStream,其内部資料為(鍵,狀态) 對。

updateStateByKey() 的結果會是一個新的 DStream,其内部的 RDD 序列是由每個時間區間對應的(鍵,狀态)對組成的。

updateStateByKey操作使得我們可以在用新資訊進行更新時保持任意的狀态。為使用這個功能,你需要做下面兩步: 

1. 定義狀态,狀态可以是一個任意的資料類型。 

2. 定義狀态更新函數,用此函數闡明如何使用之前的狀态和來自輸入流的新值對狀态進行更新。

使用updateStateByKey需要對檢查點目錄進行配置,會使用檢查點來儲存狀态。

下面就用一個案例來示範下

在"/usr/local/spark/mycode/streaming/stateful"目錄下建立一個代碼檔案NetworkWordCountStateful.scala,輸入代碼如下所示:

package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCountStateful {
  def main(args: Array[String]) {
    //定義狀态更新函數
    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)
      val previousCount = state.getOrElse(0)
      Some(currentCount + previousCount)
    }
      StreamingExamples.setStreamingLogLevels()  //設定log4j日志級别
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCountStateful")
    val sc = new StreamingContext(conf, Seconds(5))
    sc.checkpoint("file:///usr/local/spark/mycode/streaming/stateful/")    //設定檢查點,檢查點具有容錯機制
    //定義了一個"套接字流"類型的資料源,這個資料源可以用nc程式産生。需要注意的是,在代碼中,已經确定了Socket用戶端會向主機名為localhost的9999端口号發起Socket通信請求
    val lines = sc.socketTextStream("localhost", 9999)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    sc.start()
    sc.awaitTermination()
  }
}

           

然後建立一個StreamingExamples.scala檔案,代碼如下所示:

package org.apache.spark.examples.streaming 
import org.apache.spark.internal.Logging 
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}
           

在"/usr/local/spark/mycode/streaming/stateful/"目錄下建立一個simple.sbt檔案,然後,使用sbt工具進行編譯打包。打包成功後,在Linux終端執行如下指令送出運作程式:

Spark Streaming轉換操作
/usr/local/spark/bin/spark-submit \
--class "org.apache.spark.examples.streaming.NetworkWordCountStateful" \
./target/scala-2.11/simple-project_2.11-1.0.jar           

 執行上述指令後,NetworkWordCountStateful程式就啟動了,它會向主機名為localhost的9999号發起socket通信請求。這裡我們讓nc程式扮演Socket伺服器端,也就是讓NetworkWordCountStateful程式和nc程式建立Socket連接配接。一旦Socket連接配接,NetworkWordCountStateful程式接收來自nc程式的資料,并進行詞頻統計。打開一個新的終端,輸入如下指令:

nc -lk 9999           

在這個視窗手動輸入一些單詞

Spark Streaming轉換操作

切換到剛才流計算終端,可以看到已經輸出了類似如下的詞頻統計資訊: 

繼續閱讀