天天看點

SparkStreaming wordcount程式

build.gradle 添加配置

implementation "org.apache.spark:spark-streaming_$scalaVersion:$sparkVersion"
           

main方法

def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountStream")
    val sc = SparkContext.getOrCreate(conf)
    val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
    val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
    val resultDstream: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    resultDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
           

使用指令 錄入資料流

nc -lk 9999
           

繼續閱讀