build.gradle 添加配置
implementation "org.apache.spark:spark-streaming_$scalaVersion:$sparkVersion"
main方法
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("WordCountStream")
val sc = SparkContext.getOrCreate(conf)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val resultDstream: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
resultDstream.print()
ssc.start()
ssc.awaitTermination()
}
使用指令 錄入資料流
nc -lk 9999