天天看點

Spark-StructuredStreaming 下的checkpointLocation分析以及對接 Grafana 監控和送出Kafka Lag 監控

一、Spark-StructuredStreaming checkpointLocation 介紹

Structured Streaming 在 Spark 2.0 版本于 2016 年引入, 是基于 Spark SQL 引擎建構的可擴充且容錯的流處理引擎,對比傳統的 Spark Streaming,由于複用了 Spark SQL 引擎,代碼的寫法和批處理 API (基于 Dataframe 和 Dataset API)一樣,而且這些 API 非常的簡單。

Structured Streaming 還支援使用 event time,通過設定 watermark 來處理延時到達的資料;而 Spark Streaming 隻能基于 process time 做計算,顯然是不夠用的。

比如 <code>.withWatermark("timestamp", "10 minutes")</code> 表示用 DataFrame 裡面的 <code>timestamp</code> 字段作為 event time,如果 event time 比 process time 落後超過 10 分鐘,那麼就不會處理這些資料。

Structured Streaming 預設情況下還是使用 micro batch 模式處理資料,不過從 Spark 2.3 開始提供了一種叫做 Continuous Processing 的模式,可以在至少一次語義下資料端到端隻需 1ms 。

不過 Structured Streaming 的 Web UI 并沒有和 Spark Streaming 一樣的監控名額。

<code>Checkpoint</code>目錄的結構:

Spark-StructuredStreaming 下的checkpointLocation分析以及對接 Grafana 監控和送出Kafka Lag 監控

1、checkpointLocation 在源碼調用鍊

分析源碼檢視 StructuredStreaming 啟動流程發現,DataStreamWriter#start 方法啟動一個 StreamingQuery。

同時将 checkpointLocation配置參數傳遞給StreamingQuery管理。

StreamingQuery 接口實作關系如下:

Spark-StructuredStreaming 下的checkpointLocation分析以及對接 Grafana 監控和送出Kafka Lag 監控

StreamingQueryWrapper 僅包裝了一個不可序列化的StreamExecution

StreamExecution 管理Spark SQL查詢的執行器

MicroBatchExecution 微批處理執行器

ContinuousExecution 連續處理(流式)執行器

是以我們僅需要分析 checkpointLocation 在 StreamExecution中調用即可。

備注:StreamExecution 中 protected def checkpointFile(name: String): String 方法為所有與 checkpointLocation 有關邏輯,傳回 $checkpointFile/name 路徑

2、MetadataLog(中繼資料日志接口)

spark 提供了org.apache.spark.sql.execution.streaming.MetadataLog接口用于統一進行中繼資料日志資訊。

checkpointLocation 檔案内容均使用 MetadataLog進行維護。

分析接口實作關系如下:

Spark-StructuredStreaming 下的checkpointLocation分析以及對接 Grafana 監控和送出Kafka Lag 監控

類作用說明:

NullMetadataLog 空日志,即不輸出日志直接丢棄

HDFSMetadataLog 使用 HDFS 作為中繼資料日志輸出

CommitLog 送出日志

OffsetSeqLog 偏移量日志

CompactibleFileStreamLog 封裝了支援按大小合并、删除曆史記錄的 MetadataLog

StreamSourceLog 檔案類型作為資料源時日志記錄

FileStreamSinkLog 檔案類型作為資料接收端時日志記錄

EsSinkMetadataLog Es作為資料接收端時日志記錄

分析 CompactibleFileStreamLog#compact 合并邏輯簡單描述為:假設有 0,1,2,3,4,5,6,7,8,9,10 個批次以此到達,合并大小為3目前合并結果為 `0,1,2.compact,3,4`下一次合并結果為 `0,1,2.compact,3,4,5.compact` , **說明:5.compact 檔案内容 = 2.compact + 3 + 4**last.compact 檔案大小會随着批次運作無限增大...

分析 CompactibleFileStreamLog 删除過期檔案邏輯:CompactibleFileStreamLog#add 方法被調用時,預設會判斷是否支援删除操作 override def add(batchId: Long, logs: Array[T]): Boolean = { val batchAdded = if (isCompactionBatch(batchId, compactInterval)) { // 是否合并 compact(batchId, logs) } else { super.add(batchId, logs) } if (batchAdded &amp;&amp; isDeletingExpiredLog) { // 添加成功且支援删除過期檔案 // 删除時判斷目前批次是否在 spark.sql.streaming.minBatchesToRetain 配置以外且在檔案保留時間内 // 配置項參考 第4節 解決方案配置說明 deleteExpiredLog(batchId) } batchAdded }

3、 分析 checkpointLocation 目錄内容

目前 checkpointLocation 内容主要包含以下幾個目錄

offsets

commits

metadata

sources

sinks

3.1 offsets 目錄

記錄每個批次中的偏移量。為了保證給定的批次始終包含相同的資料,在處理資料前将其寫入此日志記錄。

此日志中的第 N 條記錄表示目前正在已處理,第 N-1 個條目訓示哪些偏移已處理完成。

3.2 commitLog 目錄

記錄已完成的批次,重新開機任務檢查完成的批次與 offsets 批次記錄比對,确定接下來運作的批次

3.3 metadata 目錄

metadata 與整個查詢關聯的中繼資料,目前僅保留目前job id

3.4 sources 目錄

sources 目錄為資料源(Source)時各個批次讀取詳情

3.5 sinks 目錄

sinks 目錄為資料接收端(Sink)時批次的寫出詳情

另外如果在任務中存在state計算時,還會存在state目錄: 記錄狀态。當有狀态操作時,如累加聚合、去重、最大最小等場景,這個目錄會被用來記錄這些狀态資料。目錄結構:checkpoint/state/xxx.delta、checkpoint/state/xxx.snapshot。新的.snapshot是老的.snapshot和.delta合并生成的檔案。Structured Streaming會根據配置周期性地生成.snapshot檔案用于記錄狀态。

二、Spark Structured Streaming 對接 Grafana 監控

Structured Streaming 有個 <code>StreamingQueryListener</code> 用于異步報告名額,這是一個官方示例:

StreamingQuery API含義:

Spark-StructuredStreaming 下的checkpointLocation分析以及對接 Grafana 監控和送出Kafka Lag 監控

轉載請注明 作者:張永清  來源于部落格園:https://www.cnblogs.com/laoqing/p/15588436.html

我們監控的話,主要是利用 <code>onQueryProgress</code> 方法來上報資料給監控系統。

在主程式裡面添加監聽:轉載請注明 作者:張永清  來源于部落格園:https://www.cnblogs.com/laoqing/p/15588436.html

需要啟動 graphite_exporter,随便找一台伺服器即可,有兩個預設端口:

9109 用來上報資料,即 spark -&gt; graphite_exporter

9108 是 Prometheus 從 graphite_exporter 拉去資料用的

還需要在 Prometheus 配置檔案 <code>prometheus.yml</code> 裡面配置讀取資料

最後啟動 spark 程式之後,就可以在 Grafana 裡面配置圖表了。

比如我設定的 <code>prefix</code> 是 <code>click</code>,那麼我們在 Grafana 裡面的 Explore 子產品可以選擇 Prometheus 資料源,輸入名額 <code>spark_click_inputRowsPerSecond</code> ,點選 Query 就可以擷取讀取速率這個名額了,如圖:

Spark-StructuredStreaming 下的checkpointLocation分析以及對接 Grafana 監控和送出Kafka Lag 監控

三、基于StreamingQueryListener向Kafka送出Offset

我們可以在SparkStreamingGraphiteMetrics的基礎上做向kafka送出offset。如下所示

 轉載請注明 作者:張永清  來源于部落格園:https://www.cnblogs.com/laoqing/p/15588436.html

作者的原創文章,轉載須注明出處。原創文章歸作者所有,歡迎轉載,但是保留版權。對于轉載了部落客的原創文章,不标注出處的,作者将依法追究版權,請尊重作者的成果。

繼續閱讀