概覽
spark streaming是spark api的一個可橫向擴容,高吞吐量,容錯的實時資料流處理引擎,spark能夠從kafka、flume、kinesis或者tcp等等輸入擷取資料,然後能夠使用複雜的計算表達式如map,reduce,join和window對資料進行計算。計算完後的資料能夠被推送到檔案系統,資料庫,和實時的儀表盤。另外,你也可以使用spark ml和圖計算處理實時資料流。
spark streaming接受到了實時資料後,把它們分批進行切割,然後再交給spark進行資料的批量處理。
spark streaming對離散化的資料流提供了進階别的抽象dstream,所有進入的資料流都會被處理為dstreams,在内部,dstream是一個順序排列的rdd。
快速起步
第一個執行個體是如何從tcp輸入中計算單詞出現的次數
首先,我們建立一個javastreamingcontext對象,它是所有streaming函數的主入口,再建立一個帶有2個線程的streamingcontext對象,每1秒進行一次批處理。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.tuple2;
sparkconf conf = new sparkconf().setmaster("local[2]").setappname("networkwordcount");
javastreamingcontext jssc = new javastreamingcontext(conf, durations.seconds(1));
建立一個偵聽本地9999的tcp資料源
javareceiverinputdstream<string> lines = jssc.sockettextstream("localhost", 9999);
我們把接受到的資料按照空格進行切割
javadstream<string> words = lines.flatmap(x -> arrays.aslist(x.split(" ")).iterator());
對單詞進行統計
javapairdstream<string, integer> pairs = words.maptopair(s -> new tuple2<>(s, 1));
javapairdstream<string, integer> wordcounts = pairs.reducebykey((i1, i2) -> i1 + i2);
wordcounts.print();
把字元串拍扁->映射->進行去重統計,最後調用print函數把資料列印到控制台中
jssc.start(); // start the computation
jssc.awaittermination(); // wait for the computation to terminate
最後,啟動整個計算過程
為了完成這次實驗,還需要使用nc作為server進行配合
nc -lk 9999
spark提供了示例,可以使用 ./bin/run-example streaming.javanetworkwordcount localhost 9999 來體驗wordcount
本文作者:小埋醬
來源:51cto