作者:Genmao Yu
原文連結:
https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0.html編譯:邵嘉陽,計算機科學與技術大三在讀,Apache Spark 中文社群志願者
在Apache Spark 2.0中,我們迎來了Structured Streaming——建構分布式流處理應用的最佳平台。統一的API(SQL,Dataset和DataFrame)以及Spark内置的大量函數為開發者實作複雜的需求提供了便利,比如流的聚合,流-流連接配接和視窗支援。開發者們普遍喜歡通過Spark Streaming中的DStream的方式來管理他們的流,那麼類似的功能什麼時候能在Structured Streaming中得到實作呢?這不,在Apache Spark 3.0中,全新的Structured Streaming可視化UI和開發者們見面了。
新的Structured Streaming UI會提供一些有用的資訊和統計資料,以此來監視所有流作業,便于在開發調試過程中排除故障。同時,開發者還能夠獲得實時的監測資料,這能使生産流程更直覺。在這個新的UI中,我們會看到兩組統計資料:1)流查詢作業的聚合資訊;2)流查詢的具體統計資訊,包括輸入速率(Input Rate)、處理速率(Process Rate)、輸入行數(Input Rows)、批處理持續時間(Batch Duration)和操作持續時間(Operation Duration)等。
流查詢作業的聚合資訊
開發者送出的流SQL查詢會被列在Structured Streaming一欄中,包括正在運作的流查詢(active)和已完成的流查詢(completed)。結果表則會顯示流查詢的一些基本資訊,包括查詢名稱、狀态、ID、運作ID、送出時間、查詢持續時間、最後一批的ID以及一些聚合資訊,如平均輸入速率和平均處理速率。流查詢有三種狀态:運作(RUNNING)、結束(FINISHED)、失敗(FAILED)。所有結束(FINISHED)和失敗(FAILED)的查詢都在已完成的流式查詢表中列出。Error列顯示有關失敗查詢的詳細資訊。
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iNjhDNjRTO4IGNyETZ3YWY3M2M0UDMyMTZzQGNlBjNy8CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
我們可以通過單擊Run ID連結檢視流查詢的詳細資訊。
詳細的統計資訊
Statistics頁面顯示了包括輸入速率、處理速率、延遲和詳細的操作持續時間在内的一系列名額。通過圖表,開發者能全面了解已送出的流查詢的狀态,并且輕松地調試查詢進行中的異常情況。
它包含以下名額:
- Input Rate:資料到達的聚合速率(跨所有源)。
- Process Rate: Spark處理資料的聚合速率(跨所有源)。
- Batch Duration: 每一批的處理時間。
-
Operation Duration: 執行各種操作所花費的時間(以毫秒為機關)。
被追蹤的操作羅列如下:
- addBatch:從源讀取微批的輸入資料、對其進行處理并将批的輸出寫入接收器所花費的時間。這應該會占用微批處理的大部分時間。
- getBatch:準備邏輯查詢以從源讀取目前微批的輸入所花費的時間。
- getOffset:查詢源是否有新的輸入資料所花費的時間。
- walCommit:将偏移量寫入中繼資料日志。
- queryPlanning:生成執行計劃。
需要注意的是,由于資料源的類型不同,一個查詢可能不會包含以上列出的所有操作。
使用UI解決流的性能故障
在這一部分中,我們會看到新的UI是怎樣實時、直覺地顯示查詢執行過程中的異常情況的。我們會在每個例子中預先假設一些條件,樣例查詢看起來是這樣的:
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
由于處理能力不足而增加延遲
在第一種情況下,我們希望盡快處理Apache Kafka資料。在每一批中,流作業将處理Kafka中所有可用的資料。如果處理能力不足以處理批資料,那麼延遲将迅速增加。最直覺的現象是Input Rows和Batch Duration會呈線性上升。Process Rate提示流作業每秒最多隻能處理大約8000條記錄,但是目前的輸入速率是每秒大約20000條記錄。産生問題的原因一目了然,那麼我們可以為流作業提供更多的執行資源,或者添加足夠的分區來處理與生産者比對所需的所有消費者。
穩定但高延遲
第二種情況下,延遲并沒有持續增加,而是保持穩定,如下截圖所示:
我們發現在相同的Input Rate下,Process Rate可以保持穩定。這意味着作業的處理能力足以處理輸入資料。然而,每批的延遲仍然高達20秒。這裡,高延遲的主要原因是每個批中有太多資料,那麼我們可以通過增加這個作業的并行度來減少延遲。在為Spark任務添加了10個Kafka分區和10個核心之後,我們發現延遲大約為5秒——比20秒要好得多。
使用操作持續時間圖進行故障排除
操作持續時間圖(Operation Duration Chart)顯示了執行各種操作所花費的時間(以毫秒為機關)。這對于了解每個批處理的時間分布和故障排除非常有用。讓我們以Apache Spark社群中的性能改進“Spark-30915:在查找最新批處理ID時避免讀取中繼資料日志檔案“為例。
在某次查詢中我們發現,當壓縮後的中繼資料日志很大時,下一批要花費比其他批更多的時間來處理。
在進行代碼審查之後,我們發現這是由對壓縮日志檔案的不必要讀取造成的并進行了修複。新的操作持續時間圖确認了我們想法:
未來的開發方向
如上所示,新的Structured Streaming UI将通過提供更有用的流查詢資訊幫助開發者更好地監視他們的流作業。作為早期釋出版本,新的UI仍在開發中,并将在未來的釋出中得到改進。有幾個未來可以實作的功能,包括但不限于:
- 更多的流查詢執行細節:延遲資料,水印,狀态資料名額等等。
- 在Spark曆史伺服器中支援Structured Streaming UI。
- 對于不尋常的情況有更明顯的提示:發生延遲等。
近期活動:
8月24日開始 Spark 實戰訓練營正式開課
免費報名連結:
https://developer.aliyun.com/learning/trainingcamp/spark/2