scala的簡單實作:
package lab2012
import org.apache.spark._
import org.apache.spark.streaming._
object WaitToGetWordFromTCP {
def main(args: Array[String]): Unit = {
// Spark Streaming程式以StreamingContext為起點,其内部維持了一個SparkContext的執行個體。
// 這裡我們建立一個帶有兩個本地線程的StreamingContext,并設定批處理間隔為8秒。
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 在一個Spark應用中預設隻允許有一個SparkContext,預設地spark-shell已經為我們建立好了
// SparkContext,名為sc。是以在spark-shell中應該以下述方式建立StreamingContext,以
// 避免建立再次建立SparkContext而引起錯誤:
// val ssc = new StreamingContext(sc, Seconds(8))
// 建立一個從TCP連接配接擷取流資料的DStream,其每條記錄是一行文本
val lines = ssc.socketTextStream("192.168.88.80", 8048)
// 對DStream進行轉換,最終得到計算結果
//val res = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
val res = lines.flatMap(word=>word.split(" ")).map((_, 1)).reduceByKey(_ + _)
// 列印該DStream中每個RDD中的前十個元素
res.print()
// 執行完上面代碼,Spark Streaming并沒有真正開始處理資料,而隻是記錄需在資料上執行的操作。
// 當我們設定好所有需要在資料上執行的操作以後,我們就可以開始真正地處理資料了。如下:
ssc.start() // 開始計算
ssc.awaitTermination() // 等待計算終止
//啟動後,通過:nc -lk 8048,可以向端口發送字元串,spark可以接受到字元串,并處理
}
}