基于事件時間的延遲資料處理-★
- 說明
- 時間分類
- 實際需求
- API示範
說明
- 之前在引入StructuredStreaming的時候提到
- StructuredStreaming可以基于事件時間做延遲資料的處理,
- 那麼接下來進行原來說明和代碼示範
時間分類
- 事件時間:event-time:表示資料/資料真正發生的時間–現在用 因為它才能真正反映資料的真實狀态
- 處理時間:process-time:表示資料被處理時的時間–以前用
- 攝入時間:Ingestion-time:表示資料到達系統的資料–不用
- 了解
- 現在假設,你正在去往地下停車場的路上,并且打算用手機點一份外賣。選好了外賣後,你就用線上支付功能付款了,這個時候是11點55分。恰好這時,你走進了地下停車庫,而這裡并沒有手機信号。是以外賣的線上支付并沒有立刻成功,而支付系統一直在Retry重試“支付”這個操作。
- 當你找到自己的車并且開出地下停車場的時候,已經是12點05分了。這個時候手機重新有了信号,手機上的支付資料成功發到了外賣線上支付系統,支付完成。
- 在上面這個場景中你可以看到,
支付資料的事件時間是11點55分,而支付資料的處理時間是12點05分
- 也就是上面的案例中
- 事件時間:event-time,事情
為 11點55分真正發生的時間
- 處理時間:process-time,表示事件/資料
為12點05分被系統處理的時間
- 而如果要計算當天上午的訂單12點之前的訂單,那麼就應該考慮使用訂單的事件時間來統計
- 而為了完成這個目标, 之前的技術是不行的得使用Dataflow模型-
- 事件時間:event-time,事情
實際需求
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHLzsGROp3aE9kMRpHW4Z0MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLzAzN5QDN1EjMzIjMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
- 到達視窗的最大的事件時間 - 允許延遲的時間 >= 視窗原來的結束時間 就觸發該視窗的計算!
- 否則該視窗就一直等待資料到來(這樣的話不就給了遲到的資料一些機會)
API示範
http://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#window-operations-on-event-time
package cn.hanjiaxiaozhi.structedstream
import java.sql.Timestamp
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author hanjiaxiaozhi
* Date 2020/7/26 16:35
* Desc 示範StructuredStreaming中的基于事件時間的延遲資料處理
* 注意這裡僅僅示範API,實際中使用都是用Flink中的,因為Flink的該功能更加的完善和強大
*/
object WordCountWithWindowAndWatermakerAndEventTime {
def main(args: Array[String]): Unit = {
//1.準備StructuredStreaming執行環境
val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
//2.讀取node01:9999端口的資料
val df: DataFrame = spark.readStream//表示使用DataFrame/DataSet做流處理并加載資料
.option("host", "node01")//指定ip
.option("port", 9999)//指定端口
.option("includeTimeStamp",true)//表示将從socket接收到的資料的時間作為事件時間,示範後面的API,Socket不好模拟延遲
.format("socket")//指定資料源為socket
.load()//開始加載資料
//3.做WordCount
import org.apache.spark.sql.functions._
import spark.implicits._
// Dataset[(單詞, 時間戳-事件時間)]
val wordAndTimeDF: DataFrame = df.as[(String, Timestamp)] //給DF添加上類型并轉為DS
.flatMap(line => {
line._1.split(" ").map((_, line._2))
}).toDF("word","timestamp")
//4.基于事件時間進行視窗聚合并設定最大允許的延遲時間
val result = wordAndTimeDF
//以下為關鍵代碼
//到達視窗的最大的事件時間 - 允許延遲的時間 >= 視窗原來的結束時間 就觸發該視窗的計算!
// ===========================
.withWatermark("timestamp", "5 seconds")//指定事件時間是哪一列,并設定最大允許的延遲時間為5s
// ===========================
.groupBy(//分組時指定時間列,和視窗長度和滑動間隔,以及分組字段
window($"timestamp", "10 seconds", "5 seconds"), $"word")
.count()//聚合/計算
.sort("window")//按照視窗時間進行排序
//5.輸出結果
result.writeStream
.format("console")//指定往控制台輸出
.outputMode("complete")//輸出模式,complete表示每次将所有資料都輸出,必須包含聚合
.option("truncate",false)
.start()//開啟
.awaitTermination()//等待結束
}
}