文章目錄
- 一、Spark Streaming入門
- 二、Streaming 和 Structured Streaming差別
- 2.1 流計算(Streaming)和批計算(Batch)
- 2.2 Spark Streaming 和 Spark Structured Streaming
- 三、基于Spark Streaming統計文本數栗子
- 四、代碼實踐
- Reference
一、Spark Streaming入門
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SN4ADO4QDN0cTOmRzN3Y2YyYzXzIDN1kDM0EzLcdDMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
spark streaming可以接收實時的輸入資料流(如上圖的kafka、HDFS、TCP socket的資料流等)的高吞吐量、容錯性良好的資料處理,然後将處理完的資料推送到HDFS檔案系統、資料庫或dashboards上。在spark streaming中可以使用機器學習或者圖處理算法。spark streaming的大緻處理過程如下:
對于像在交易系統上,對流式資料進行實時計算,現在主流的流計算工具有三種:
- Storm的延遲最低,一般為幾毫秒到幾十毫秒,但資料吞吐量較低,每秒能夠處理的事件在幾十萬左右,建設成本高。
- Flink是目前國内網際網路廠商主要使用的流計算工具,延遲一般在幾十到幾百毫秒,資料吞吐量非常高,每秒能處理的事件可以達到幾百上千萬,建設成本低。
- Spark通過【Spark Streaming】或【Spark Structured Streaming】支援流計算。
- 但Spark的流計算是将流資料按照時間分割成一個一個的小批次(mini-batch)進行處理的,其延遲一般在1秒左右。吞吐量和Flink相當。
- Spark Structured Streaming 現在也支援了Continous Streaming 模式,即在資料到達時就進行計算,不過目前還處于測試階段,不是特别成熟。
二、Streaming 和 Structured Streaming差別
2.1 流計算(Streaming)和批計算(Batch)
批計算或批處理是處理離線資料。單個處理資料量大,處理速度比較慢。
流計算是處理線上實時産生的資料。單次處理的資料量小,但處理速度更快。
2.2 Spark Streaming 和 Spark Structured Streaming
- Spark在2.0之前,主要使用的Spark Streaming來支援流計算,其資料結構模型為
,其實就是一個個小批次資料構成的RDD隊列。DStream
- 目前,Spark主要推薦的流計算子產品是Structured Streaming,其資料結構模型是
,即沒有邊界的資料表。Unbounded DataFrame
- 相比于 Spark Streaming 建立在 RDD資料結構上面,Structured Streaming 是建立在 SparkSQL基礎上,DataFrame的絕大部分API也能夠用在流計算上,實作了流計算和批處理的一體化,并且由于SparkSQL的優化,具有更好的性能,容錯性也更好。
三、基于Spark Streaming統計文本數栗子
場景:在資料伺服器中,基于tcp socket接收到的資料中統計文本數。
(1)建立StreamingContext。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
(2)建立DStream,指定localhost的ip和port(即确定socket)。
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
DStream的每條記錄都是一行文本,現在需要根據句子中每個單詞之間的空格,分隔成一個個單詞:
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
以上隻是transformation操作,接下來是action部分:
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
完整代碼如下:
r"""
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
Usage: network_wordcount.py <hostname> <port>
<hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
To run this on your local machine, you need to first run a Netcat server
`$ nc -lk 9999`
and then run the example
`$ bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999`
"""
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
# 建立StreamingContext
ssc = StreamingContext(sc, 1)
# 建立DStream,确定localhost和port
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
# action部分
ssc.start()
ssc.awaitTermination()
四、代碼實踐
- 讀取檔案https://cdn.coggle.club/Pokemon.csv為textFileStream
- 使用
篩選行不包含Grass的文本filter
- 使用
對文本行進行拆分flatmap