天天看點

spark--基于事件時間的延遲資料處理-★說明時間分類實際需求API示範

基于事件時間的延遲資料處理-★

  • 說明
  • 時間分類
  • 實際需求
  • API示範

說明

  • 之前在引入StructuredStreaming的時候提到
  • StructuredStreaming可以基于事件時間做延遲資料的處理,
  • 那麼接下來進行原來說明和代碼示範

時間分類

  • 事件時間:event-time:表示資料/資料真正發生的時間–現在用 因為它才能真正反映資料的真實狀态
  • 處理時間:process-time:表示資料被處理時的時間–以前用
  • 攝入時間:Ingestion-time:表示資料到達系統的資料–不用
  • 了解
    • 現在假設,你正在去往地下停車場的路上,并且打算用手機點一份外賣。選好了外賣後,你就用線上支付功能付款了,這個時候是11點55分。恰好這時,你走進了地下停車庫,而這裡并沒有手機信号。是以外賣的線上支付并沒有立刻成功,而支付系統一直在Retry重試“支付”這個操作。
    • 當你找到自己的車并且開出地下停車場的時候,已經是12點05分了。這個時候手機重新有了信号,手機上的支付資料成功發到了外賣線上支付系統,支付完成。
    • 在上面這個場景中你可以看到,

      支付資料的事件時間是11點55分,而支付資料的處理時間是12點05分

    • 也就是上面的案例中
      • 事件時間:event-time,事情

        真正發生的時間

        為 11點55分
      • 處理時間:process-time,表示事件/資料

        被系統處理的時間

        為12點05分
      • 而如果要計算當天上午的訂單12點之前的訂單,那麼就應該考慮使用訂單的事件時間來統計
      • 而為了完成這個目标, 之前的技術是不行的得使用Dataflow模型-

實際需求

spark--基于事件時間的延遲資料處理-★說明時間分類實際需求API示範
  • 到達視窗的最大的事件時間 - 允許延遲的時間 >= 視窗原來的結束時間 就觸發該視窗的計算!
  • 否則該視窗就一直等待資料到來(這樣的話不就給了遲到的資料一些機會)

API示範

http://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#window-operations-on-event-time

spark--基于事件時間的延遲資料處理-★說明時間分類實際需求API示範
package cn.hanjiaxiaozhi.structedstream

import java.sql.Timestamp

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * Author hanjiaxiaozhi
 * Date 2020/7/26 16:35
 * Desc 示範StructuredStreaming中的基于事件時間的延遲資料處理
 * 注意這裡僅僅示範API,實際中使用都是用Flink中的,因為Flink的該功能更加的完善和強大
*/
object WordCountWithWindowAndWatermakerAndEventTime {
  def main(args: Array[String]): Unit = {
    //1.準備StructuredStreaming執行環境
    val spark: SparkSession = SparkSession.builder.appName("wc").master("local[*]").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")

    //2.讀取node01:9999端口的資料
    val df: DataFrame = spark.readStream//表示使用DataFrame/DataSet做流處理并加載資料
      .option("host", "node01")//指定ip
      .option("port", 9999)//指定端口
      .option("includeTimeStamp",true)//表示将從socket接收到的資料的時間作為事件時間,示範後面的API,Socket不好模拟延遲
      .format("socket")//指定資料源為socket
      .load()//開始加載資料

    //3.做WordCount
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // Dataset[(單詞, 時間戳-事件時間)]
    val wordAndTimeDF: DataFrame = df.as[(String, Timestamp)] //給DF添加上類型并轉為DS
      .flatMap(line => {
        line._1.split(" ").map((_, line._2))
      }).toDF("word","timestamp")


    //4.基于事件時間進行視窗聚合并設定最大允許的延遲時間
   val result = wordAndTimeDF
     //以下為關鍵代碼
     //到達視窗的最大的事件時間  -  允許延遲的時間 >= 視窗原來的結束時間 就觸發該視窗的計算!
     // ===========================
      .withWatermark("timestamp", "5 seconds")//指定事件時間是哪一列,并設定最大允許的延遲時間為5s
     // ===========================
      .groupBy(//分組時指定時間列,和視窗長度和滑動間隔,以及分組字段
        window($"timestamp", "10 seconds", "5 seconds"), $"word")
      .count()//聚合/計算
      .sort("window")//按照視窗時間進行排序



    //5.輸出結果
    result.writeStream
      .format("console")//指定往控制台輸出
      .outputMode("complete")//輸出模式,complete表示每次将所有資料都輸出,必須包含聚合
      .option("truncate",false)
      .start()//開啟
      .awaitTermination()//等待結束
  }
}
           
spark--基于事件時間的延遲資料處理-★說明時間分類實際需求API示範
spark--基于事件時間的延遲資料處理-★說明時間分類實際需求API示範

繼續閱讀