天天看點

是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

正如在之前的那篇文章中 Spark Streaming 設計原理 中說到 Spark 團隊之後對 Spark Streaming 的維護可能越來越少,Spark 2.4 版本的 Release Note

裡面果然一個 Spark Streaming 相關的 ticket 都沒有。相比之下,Structured Streaming 有将近十個 ticket 說明。是以各位同學,是時候舍棄 Spark Streaming 轉向 Structured Streaming 了,當然理由并不止于此。我們這篇文章就來分析一下 Spark Streaming 的不足,以及Structured Streaming 的設計初衷和思想是怎麼樣的。文章主要參考今年(2018 年)sigmod 上面的這篇論文: *Structured Streaming: A Declarative API for Real-Time

Applications in Apache Spark *。

首先可以注意到的了論文标題中的 Declarative API,中文一般叫做聲明式程式設計 API。一般直接看到這個詞可能不知道什麼意思,但是當我們列出他的對立單詞:Imperative API,中文一般叫指令式程式設計 API,仿佛一切都明了了。是的,沒錯,Declarative 隻是表達出我們想要什麼,而 Imperative 則是說為了得到什麼我們需要做哪些東西一個個說明。舉個例子,我們要一個糕點,去糕點店直接去定做告訴店員我們要什麼樣式的糕點,然後店員去給我們做出來,這就是 Declarative。而 Imperative 對應的就是面粉店了。

0. Spark Streaming 不足

在開始正式介紹 Structured Streaming 之前有一個問題還需要說清楚,就是 Spark Streaming 存在哪些不足?總結一下主要有下面幾點:

使用 Processing Time 而不是 Event Time。首先解釋一下,Processing Time 是資料到達 Spark 被處理的時間,而 Event Time 是資料自帶的屬性,一般表示資料産生于資料源的時間。比如 IoT 中,傳感器在 12:00:00 産生一條資料,然後在 12:00:05 資料傳送到 Spark,那麼 Event Time 就是 12:00:00,而 Processing Time 就是 12:00:05。我們知道 Spark Streaming 是基于 DStream 模型的 micro-batch 模式,簡單來說就是将一個微小時間段,比如說 1s,的流資料目前批資料來處理。如果我們要統計某個時間段的一些資料統計,毫無疑問應該使用 Event Time,但是因為 Spark Streaming 的資料切割是基于 Processing Time,這樣就導緻使用 Event Time 特别的困難。

Complex, low-level api。這點比較好了解,DStream (Spark Streaming 的資料模型)提供的 API 類似 RDD 的 API 的,非常的 low level。當我們編寫 Spark Streaming 程式的時候,本質上就是要去構造 RDD 的 DAG 執行圖,然後通過 Spark Engine 運作。這樣導緻一個問題是,DAG 可能會因為開發者的水準參差不齊而導緻執行效率上的天壤之别。這樣導緻開發者的體驗非常不好,也是任何一個基礎架構不想看到的(基礎架構的口号一般都是:你們專注于自己的業務邏輯就好,其他的交給我)。這也是很多基礎系統強調 Declarative 的一個原因。

reason about end-to-end application。這裡的 end-to-end 指的是直接 input 到 out,比如 Kafka 接入 Spark Streaming 然後再導出到 HDFS 中。DStream 隻能保證自己的一緻性語義是 exactly-once 的,而 input 接入 Spark Streaming 和 Spark Straming 輸出到外部存儲的語義往往需要使用者自己來保證。而這個語義保證寫起來也是非常有挑戰性,比如為了保證 output 的語義是 exactly-once 語義需要 output 的存儲系統具有幂等的特性,或者支援事務性寫入,這個對于開發者來說都不是一件容易的事情。

批流代碼不統一。盡管批流本是兩套系統,但是這兩套系統統一起來确實很有必要,我們有時候确實需要将我們的流處理邏輯運作到批資料上面。關于這一點,最早在 2014 年 Google 提出 Dataflow 計算服務的時候就批判了 streaming/batch 這種叫法,而是提出了 unbounded/bounded data 的說法。DStream 盡管是對 RDD 的封裝,但是我們要将 DStream 代碼完全轉換成 RDD 還是有一點工作量的,更何況現在 Spark 的批處理都用 DataSet/DataFrame API 了。

1. Structured Streaming 介紹

Structured Streaming 在 Spark 2.0 版本于 2016 年引入,設計思想參考很多其他系統的思想,比如區分 processing time 和 event time,使用 relational 執行引擎提高性能等。同時也考慮了和 Spark 其他元件更好的內建。Structured Streaming 和其他系統的顯著差別主要如下:

  • Incremental query model: Structured Streaming 将會在新增的流式資料上不斷執行增量查詢,同時代碼的寫法和批處理 API (基于 Dataframe 和 Dataset API)完全一樣,而且這些 API 非常的簡單。
  • Support for end-to-end application: Structured Streaming 和内置的 connector 使的 end-to-end 程式寫起來非常的簡單,而且 "correct by default"。資料源和 sink 滿足 "exactly-once" 語義,這樣我們就可以在此基礎上更好地和外部系統內建。
  • 複用 Spark SQL 執行引擎:我們知道 Spark SQL 執行引擎做了非常多的優化工作,比如執行計劃優化、codegen、記憶體管理等。這也是 Structured Streaming 取得高性能和高吞吐的一個原因。
是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

2. Structured Streaming 核心設計

下面我們看一下 Structured Streaming 的核心設計。

  • Input and Output: Structured Streaming 内置了很多 connector 來保證 input 資料源和 output sink 保證 exactly-once 語義。而實作 exactly-once 語義的前提是:
    • Input 資料源必須是可以 replay 的,比如 Kafka,這樣節點 crash 的時候就可以重新讀取 input 資料。常見的資料源包括 Amazon Kinesis, Apache Kafka 和檔案系統。
    • Output sink 必須要支援寫入是幂等的。這個很好了解,如果 output 不支援幂等寫入,那麼一緻性語義就是 at-least-once 了。另外對于某些 sink, Structured Streaming 還提供了原子寫入來保證 exactly-once 語義。
  • API: Structured Streaming 代碼編寫完全複用 Spark SQL 的 batch API,也就是對一個或者多個 stream 或者 table 進行 query。query 的結果是 result table,可以以多種不同的模式(append, update, complete)輸出到外部存儲中。另外,Structured Streaming 還提供了一些 Streaming 處理特有的 API:Trigger, watermark, stateful operator。
  • Execution: 複用 Spark SQL 的執行引擎。Structured Streaming 預設使用類似 Spark Streaming 的 micro-batch 模式,有很多好處,比如動态負載均衡、再擴充、錯誤恢複以及 straggler (straggler 指的是哪些執行明顯慢于其他 task 的 task)重試。除了 micro-batch 模式,Structured Streaming 還提供了基于傳統的 long-running operator 的 continuous 處理模式。
  • Operational Features: 利用 wal 和狀态存儲,開發者可以做到集中形式的 rollback 和錯誤恢複。還有一些其他 Operational 上的 feature,這裡就不細說了。

3. Structured Streaming 程式設計模型

可能是受到 Google Dataflow 的批流統一的思想的影響,Structured Streaming 将流式資料當成一個不斷增長的 table,然後使用和批處理同一套 API,都是基于 DataSet/DataFrame 的。如下圖所示,通過将流式資料了解成一張不斷增長的表,進而就可以像操作批的靜态資料一樣來操作流資料了。

是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

在這個模型中,主要存在下面幾個組成部分:

  • Input Unbounded Table: 流式資料的抽象表示
  • Query: 對 input table 的增量式查詢
  • Result Table: Query 産生的結果表
  • Output: Result Table 的輸出
是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

下面舉一個具體的例子,NetworkWordCount,代碼如下:

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .start()           

代碼實際執行流程可以用下圖來表示。把流式資料當成一張不斷增長的 table,也就是圖中的 Unbounded table of all input。然後每秒 trigger 一次,在 trigger 的時候将 query 應用到 input table 中新增的資料上,有時候還需要和之前的靜态資料一起組合成結果。query 産生的結果成為 Result Table,我們可以選擇将 Result Table 輸出到外部存儲。輸出模式有三種:

  • Complete mode: Result Table 全量輸出
  • Append mode (default): 隻有 Result Table 中新增的行才會被輸出,所謂新增是指自上一次 trigger 的時候。因為隻是輸出新增的行,是以如果老資料有改動就不适合使用這種模式。
  • Update mode: 隻要更新的 Row 都會被輸出,相當于 Append mode 的加強版。
是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

和 batch 模式相比,streaming 模式還提供了一些特有的算子操作,比如 window, watermark, statefaul oprator 等。

window,下圖是一個基于 event-time 統計 window 内事件的例子。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window("eventTime", "10 minutes", "5 minutes"),
  $"word"
).count()           

如下圖所示,視窗大小為 10 分鐘,每 5 分鐘 trigger 一次。在 12:11 時候收到了一條 12:04 的資料,也就是 late data (什麼叫 late data 呢?就是 Processing Time 比 Event Time 要晚),然後去更新其對應的 Result Table 的記錄。

是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

watermark,是也為了處理 ,很多情況下對于這種 late data 的時效資料并沒有必要一直保留太久。比如說,資料晚了 10 分鐘或者還有點有,但是晚了 1 個小時就沒有用了,另外這樣設計還有一個好處就是中間狀态沒有必要維護那麼多。watermark 的形式化定義為 max(eventTime) - threshold,早于 watermark 的資料直接丢棄。

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("eventTime", "10 minutes")
    .groupBy(
        window("eventTime", "10 minutes", "5 minutes"),
        $"word")
    .count()           

用下圖表示更加形象。在 12:15 trigger 時 watermark 為 12:14 - 10m = 12:04,是以 late date (12:08, dog; 12:13, owl) 都被接收了。在 12:20 trigger 時 watermark 為 12:21 - 10m = 12:11,是以 late data (12:04, donkey) 都丢棄了。

是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

除此之後 Structured Streaming 還提供了使用者可以自定義狀态計算邏輯的算子:

  • mapGroupsWithState
  • flatMapGroupsWithState

看名字大概也能看出來 mapGroupsWithState 是 one -> one,flatMapGroupsWithState 是 one -> multi。這兩個算子的底層都是基于 Spark Streaming 的 updateStateByKey。

4. Continuous Processing Mode

好,終于要介紹到“真正”的流處理了,我之是以說“真正”是因為 continuous mode 是傳統的流處理模式,通過運作一個 long-running 的 operator 用來處理資料。之前 Spark 是基于 micro-batch 模式的,就被很多人诟病不是“真正的”流式處理。continuous mode 這種處理模式隻要一有資料可用就會進行處理,如下圖所示。epoch 是 input 中資料被發送給 operator 處理的最小機關,在處理過程中,epoch 的 offset 會被記錄到 wal 中。另外 continuous 模式下的 snapshot 存儲使用的一緻性算法是 Chandy-Lamport 算法。

是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

這種模式相比與 micro-batch 模式缺點和優點都很明顯。

  • 缺點是不容易做擴充
  • 優點是延遲更低

關于為什麼延遲更低,下面兩幅圖可以做到一目了然。

是時候放棄 Spark Streaming, 轉向 Structured Streaming 了
是時候放棄 Spark Streaming, 轉向 Structured Streaming 了

5. 一緻性語義

對于 Structured Streaming 來說,因為有兩種模式,是以我們分開讨論。

micro-batch 模式可以提供 end-to-end 的 exactly-once 語義。原因是因為在 input 端和 output 端都做了很多工作來進行保證,比如 input 端 replayable + wal,output 端寫入幂等。

continuous mode 隻能提供 at-least-once 語義。關于 continuous mode 的官方讨論的實在太少,甚至隻是提了一下。在和 @李呈祥 讨論之後覺得應該還是 continuous mode 由于要盡可能保證低延遲,是以在 sink 端沒有做一緻性保證。

6. Benchmark

Structured Streming 的官方論文裡面給出了 Yahoo! Streaming Benchmark 的結果,Structured Streaming 的 throughput 大概是 Flink 的 2 倍和 Kafka Streaming 的 90 多倍。

7. 總結

總結一下,Structured Streaming 通過提供一套 high-level 的 declarative api 使得流式計算的編寫相比 Spark Streaming 簡單容易不少,同時通過提供 end-to-end 的 exactly-once 語義

8. 閑扯

最後,閑扯一點别的。Spark 在 5 年推出基于 micro-batch 模式的 Spark Streaming 必然是基于當時 Spark Engine 最快的方式,盡管不是真正的流處理,但是在吞吐量更重要的年代,還是嘗盡了甜頭。而 Spark 的真正基于 continuous 處理模式的 Structured Streaming 直到 Spark 2.3 版本才真正推出,進而導緻近兩年讓 Flink 嘗盡了甜頭(當然和 Flink 的優秀的語義模型存在很大的關系)。在實時計算領域,由 Spark 的卓越核心 SQL Engine 助力的 Structured Streaming,還是風頭正勁的 Flink,亦或是其他流處理引擎,究竟誰将占領統治地位,還是值得期待一下的。

9. Reference

  1. Zaharia M, Das T, Li H, et al. Discretized streams: Fault-tolerant streaming computation at scale[C]//Proceedings of the Twenty-Fourth ACM Symposium on Operating Systems Principles. ACM, 2013: 423-438.
  2. Akidau T, Bradshaw R, Chambers C, et al. The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing[J]. Proceedings of the VLDB Endowment, 2015, 8(12): 1792-1803.
  3. Armbrust M, Das T, Torres J, et al. Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark[C]//Proceedings of the 2018 International Conference on Management of Data. ACM, 2018: 601-613.
  4. The world beyond batch: Streaming 101
  5. The world beyond batch: Streaming 102
  6. Streaming Systems
  7. https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html
  8. https://en.wikipedia.org/wiki/Chandy-Lamport_algorithm
  9. A Deep Dive Into Structured Streaming: https://databricks.com/session/a-deep-dive-into-structured-streaming
  10. Continuous Applications: Evolving Streaming in Apache Spark 2.0: https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html
  11. Spark Structured Streaming:A new high-level API for streaming: https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html
  12. Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming: https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html
  13. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
  14. Benchmarking Structured Streaming on Databricks Runtime Against State-of-the-Art Streaming Systems: https://databricks.com/blog/2017/10/11/benchmarking-structured-streaming-on-databricks-runtime-against-state-of-the-art-streaming-systems.html

繼續閱讀