天天看点

快速入门Flink (10) —— DataStream API 开发之【EventTime 与 Window】

        在上一篇博客中,博主已经为大家介绍了DataStream API 开发之【Time 与 Window】,并着重介绍了常用的 Window API 。本篇博客,我们就趁热打铁,继续接下去讲, DataStream API 开发之【EventTime 与 Window】。

        码字不易,先赞后看!!!

快速入门Flink (10) —— DataStream API 开发之【EventTime 与 Window】

文章目录

    • 2、EventTime 与 Window
      • 2.1 EventTime 的引入
      • 2.2 Watermark
        • 2.2.1 基本概念
        • 2.2.2 Watermark 的引入
      • 2.3 EventTimeWindow API
        • 2.3.1 滚动窗口
        • 2.3.2 滑动窗口
        • 2.3.3 会话窗口
    • 彩蛋
    • 小结

        在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。

        如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment 
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
           

        我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和 时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但 是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 EventTime 顺序排列的。

快速入门Flink (10) —— DataStream API 开发之【EventTime 与 Window】

        那么此时出现一个问题,一旦出现乱序,如果只根据 eventTime 决定 window 的运行, 我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一 个特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark。

         Watermark 是一种衡量 Event Time 进展的机制,它是数据本身的一个隐藏属性,数据本身携带着对应的 Watermark 。

        Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制 结合 window 来实现。

        数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了, 因此, window 的执行也是由 Watermark 触发的。

        Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t, 每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t, 那么这个窗口被触发执行。

        当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当 前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的,

一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口 的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那 么没有被触发的窗口将永远都不被触发。

        上图中,我们设置的允许最大延迟到达时间为 2s,所以时间戳为 7s 的事件对应的 Watermark 是 5s,时间戳为 12s 的事件的 Watermark 是 10s,如果我们的窗口 1 是 1s~5s,窗口 2 是 6s~10s,那么时间戳为 7s 的事件到达时的 Watermarker 恰好触发窗口 1,时间戳为 12s 的事件到达时的 Watermark 恰好触发窗口 2。

val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 从调用时刻开始给 senv 创建的每一个 stream 追加时间特征
    val stream: DataStream[String] = senv.readTextFile("eventTest.txt").assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) {

      override def extractTimestamp(t: String): Long = {
        // EventTime 是日志生成时间,我们从日志中解析 EventTime
        t.split(" ")(0).toLong
      }
    })
           

        当使用 EventTimeWindow 时,所有的 Window 在 EventTime 的时间轴上进行划分,也就是说,在 Window 启动后,会根据初始的 EventTime 时间每隔一段时间划分一个窗口, 如果 Window 大小是 3 秒,那么 1 分钟内会把 Window 划分为如下的形式:

[00:00:00,00:00:03) 
[00:00:03,00:00:06) 
... 
[00:00:57,00:01:00)
           

        如果 Window 大小是 10 秒,则 Window 会被分为如下的形式:

[00:00:00,00:00:10) 
[00:00:10,00:00:20) 
... 
[00:00:50,00:01:00)
           

        注意,窗口是左闭右开的,形式为:[window_start_time,window_end_time)

        Window 的设定无关数据本身,而是系统定义好了的,也就是说,Window 会一直按照指定的时间间隔进行划分,不论这个 Window 中有没有数据,EventTime 在这个 Window 期间 的数据会进入这个 Window。

        Window 会不断产生,属于这个 Window 范围的数据会被不断加入到 Window 中,所有 未被触发的 Window 都会等待触发,只要 Window 还没触发,属于这个 Window 范围的数据。就会一直被加入到 Window 中,直到 Window 被触发才会停止数据的追加,而当 Window 触发之后才接受到的属于被触发 Window 的数据会被丢弃。

        Window 会在以下的条件满足时被触发执行:

  • watermark 时间 >= window_end_time;
  • 在[window_start_time,window_end_time)中有数据存在

        可以通过下图来说明 Watermark、EventTime 和 Window 的关系。

快速入门Flink (10) —— DataStream API 开发之【EventTime 与 Window】

/*
 * @Author: Alice菌
 * @Date: 2020/8/13 11:13
 * @Description: 


    当使用 EventTimeWindow 时,所有的 Window 在 EventTime 的时间轴上进行划分,

 */
/*** 步骤:
  * 1.创建流处理环境
  * 2.设置EventTime
  * 3.构建数据源
  * 4.设置水印
  * 5.逻辑处理
  * 6.引入滚动窗口TumblingEventTimeWindows
  * 7.聚合操作
  * 8.输出打印
  * 9.执行程序     */
object TumblingEventTimeWindowsDemo {
  def main(args: Array[String]): Unit = {

    // 1、创建流处理环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、设置 EventTime
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 3、构建数据源
    // 数据格式:   1000 hello
    val socketSource: DataStream[String] = senv.socketTextStream("node01",9999)
    // 4、设置水印
    val groupKeyedStream: KeyedStream[(String, Int), Tuple] = socketSource.assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(2)) {
        override def extractTimestamp(element: String): Long = {

          // EventTime 是日志生成时间,我们从日志中解析 EventTime
          val eventTime: Long = element.split(" ")(0).toLong
          eventTime
        }
      })

      // 5、逻辑处理
      .map(x => x.split(" ")(1))
      .map((_, 1))
      .keyBy(0)

    // 6、引入滚动窗口TumblingEventTimeWindows
    val windowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupKeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))

    // 7、聚合操作
    val resultDataStream: DataStream[(String, Int)] = windowStream.reduce((v1,v2) => (v1._1,v1._2+v2._2))

    // 8、输出打印
    resultDataStream.print()

    // 9、执行程序
    senv.execute("TumblingEventTimeWindowsDemo")


    /**
      * 结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)
      */

  }
}

           

        结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)

/*
 * @Author: Alice菌
 * @Date: 2020/10/24 18:28
 * @Description: 
      滑动窗口(SlidingEventTimeWindows)
*/
/**
  步骤:

  * 1.创建流处理环境
  * 2.设置EventTime
  * 3.构建数据源
  * 4.设置水印
  * 5.逻辑处理
  * 6.引入滑动窗口 SlidingEventTimeWindows
  * 7.聚合操作
  * 8.输出打印
  * 9.执行程序
  */
object SlidingEventTimeWindowsDemo {
  def main(args: Array[String]): Unit = {

    // 1、 创建流处理环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、 追加时间特征
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 3、 创建数据源
    val socketSource: DataStream[String] = senv.socketTextStream("node01", 9999)
    // 4、 添加水印
    // 模拟的数据格式: 10000 hello
    val waterMarkDataStream: DataStream[String] = socketSource.assignTimestampsAndWatermarks(
       
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {

        override def extractTimestamp(element: String): Long = {

          val eventTime: Long = element.split(" ")(0).toLong
          eventTime
        }
      })

    // 5、数据处理
    val resultKeyedStream: KeyedStream[(String, Int), Tuple] = waterMarkDataStream
      .map(x => x.split(" ")(1))
      .map((_, 1))
      .keyBy(0)


    // 6、引入滑动窗口 SlidingEventTimeWindows
    val slidingWindowedStream: WindowedStream[(String, Int), Tuple, TimeWindow] = resultKeyedStream.window(SlidingEventTimeWindows.of(Time.seconds(5),
      Time.seconds(2)))

    // 7、聚合计算
    val result: DataStream[(String, Int)] = slidingWindowedStream.reduce((v1,v2) => (v1._1,v1._2+v2._2))

    // 8、打印测试输出
    result.print()

    // 9、执行程序
    senv.execute("SlidingEventTimeWindowsDemo")

  }
  }

           

        相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会同时触发执行。

/*
 * @Author: Alice菌
 * @Date: 2020/10/24 20:58
 * @Description: 
    会话窗口

    相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行

 */
object EventTimeSessionWindowsDemo {
  def main(args: Array[String]): Unit = {
    // 1、 创建流处理环境
    val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 2、 追加时间特征
    senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 3、 创建数据源
    val socketSource: DataStream[String] = senv.socketTextStream("node01",9999)
    // 4、 添加水印
    // 模拟的数据格式:    1000 hello
    val waterMarkDataStream: DataStream[String] = socketSource.assignTimestampsAndWatermarks(

      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {
        // 提取时间
        override def extractTimestamp(element: String): Long = {

          val eventTime: Long = element.split(" ")(0).toLong
          eventTime
        }
      })

    // 5、数据处理
    val groupKeyStream: KeyedStream[(String, Int), Tuple] = waterMarkDataStream
      .map(x => x.split(" ")(1))
      .map((_, 1))
      .keyBy(0)

    // 6、 引入会话窗口 EventTimeSessionWindows
    val sessionWindowStream: WindowedStream[(String, Int), Tuple, TimeWindow] = groupKeyStream
      .window(EventTimeSessionWindows.withGap(Time.seconds(5)))

    // 7、 聚合计算
    val resultDataStream: DataStream[(String, Int)] = sessionWindowStream.reduce((v1,v2) => (v1._1,v1._2 + v2._2))

    // 8、 输出打印
    resultDataStream.print()

    // 9、 执行程序
    senv.execute(this.getClass.getSimpleName)
  }
}
           

        为了能鼓励大家多学会总结,菌在这里贴上自己平时做的思维导图,需要的朋友,可以关注博主个人微信公众号【猿人菌】,后台回复“思维导图”即可获取。

继续阅读