天天看點

通過Spark Streaming從TCP協定中擷取實時流資料(scala測試代碼)

scala的簡單實作:

package lab2012

import org.apache.spark._
import org.apache.spark.streaming._

object WaitToGetWordFromTCP {

  def main(args: Array[String]): Unit = {
    // Spark Streaming程式以StreamingContext為起點,其内部維持了一個SparkContext的執行個體。
    // 這裡我們建立一個帶有兩個本地線程的StreamingContext,并設定批處理間隔為8秒。
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))
    // 在一個Spark應用中預設隻允許有一個SparkContext,預設地spark-shell已經為我們建立好了
    // SparkContext,名為sc。是以在spark-shell中應該以下述方式建立StreamingContext,以
    // 避免建立再次建立SparkContext而引起錯誤:
    // val ssc = new StreamingContext(sc, Seconds(8))
    // 建立一個從TCP連接配接擷取流資料的DStream,其每條記錄是一行文本
    val lines = ssc.socketTextStream("192.168.88.80", 8048)
    // 對DStream進行轉換,最終得到計算結果
    //val res = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    val res = lines.flatMap(word=>word.split(" ")).map((_, 1)).reduceByKey(_ + _)
    // 列印該DStream中每個RDD中的前十個元素
    res.print()
    // 執行完上面代碼,Spark Streaming并沒有真正開始處理資料,而隻是記錄需在資料上執行的操作。
    // 當我們設定好所有需要在資料上執行的操作以後,我們就可以開始真正地處理資料了。如下:
    ssc.start() // 開始計算
    ssc.awaitTermination() // 等待計算終止
    //啟動後,通過:nc -lk 8048,可以向端口發送字元串,spark可以接受到字元串,并處理
  }
}
           

繼續閱讀