窗口
在Flink中数据是从开始一直流动的,只有开始没有结束,窗口就是一些数据的集合,根据窗口的划分方式可以按照时间片段来划分某一段时间内的数据划分为一个窗口,也可以按照数据条数的个数来划分,一定量的数据为一个窗口。对窗口的数据的研究有利于我们分析总结数据流。这里的窗口如果是按照时间来划分就比较像Spark Streaming中的一个微批的数据。
窗口的类别
- 滑动窗口
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9UEWjZmVYF2c5cVWwh2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwATNyIDNzEjM3AzNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
上图中window size就是窗口大到小,window slide就是滑动步长,红色、蓝色、绿色、紫色的框分别代表四个窗口
按照时间划分窗口,划分出来的窗口就是一个前闭后开的窗口区间,假如窗口大小为15分钟滑动步长为5分钟,窗口为[10:00,10:15)[10:05,10:20)[10:10,10:25)
窗口按照处理时间来看可以划分为处理时间和事件时间此处以处理时间为例的代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); Time slide = Time.seconds(5); // 对events进行开窗 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(SlidingProcessingTimeWindows.of(windowSize, slide)) .sum("times") .map(event -> String.format("用户%s在10秒内的点击次数为%d", event.getUserId(), event.getTimes())) .print("用户点击次数统计"); env.execute(); |
滑动窗口以上是按照时间处理时间来划分的窗口,也可以按照个数来划分。
假如按照元素个数来划分,窗口大小为10个,滑动步长为5个那么
就是这样一个状态,按照元素个数来划分窗口大小
- 滚动窗口
滚动窗口和滑动窗口类似也是按照时间(包括事件时间和处理时间)来划分,只不过就是滚动窗口没有步长就是一个挨着一个的窗口,窗口之间没有重叠。
假如窗口大小为15分钟那么划分出来的窗口就是[10:00,10:15)[10:15,10:30)
按照处理时间举例API操作为
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 对events进行开窗 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)) .sum("times") .map(event -> String.format("用户%s在10秒内的点击次数为%d", event.getUserId(), event.getTimes())) .print("用户点击次数统计"); env.execute(); |
滚动窗口也是可以按照窗口个数来划分窗口的,加入窗口大小为5,那么就是下图
- 会话窗口
会话窗口,也是按照时间来划分的但是没有固定的开始也没有固定的结束,窗口的开启和结束完全和数据有关系。
会话窗口中有一个东西叫会话窗口的间隔gap就是决定窗口开启和结束的关键。
假如用户有这样一系列的行为且会话窗口的时间间隔为10s
{"event_type":"sf","times":1,"ts":1622887747000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887751000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887758000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887769000,"user_id":"10033"} {"event_type":"sf","times":1,"ts":1622887780000,"user_id":"10033"} |
同样的一个用户,浏览事件前三个事件会落在第一个窗口,第四个事件第二个窗口,第五个事件第三个窗口。划分依据就是时间间隔,前三个事件的时间间隔都在10s以内,所以窗口不会关闭,前三个一个窗口,第四个事件的时间对于第三个来说已经间隔超过10s所以此时第一个窗口已经关上了,第四个事件只能再开一个窗口,第五个同理。
操作API
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time gapSize = Time.seconds(10); // 固定时间间隔 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(ProcessingTimeSessionWindows.withGap(gapSize)) .sum("times") .map(event -> String.format("用户%s在会话期间的点击次数为%d", event.getUserId(), event.getTimes())) .print("固定时间间隔用户会话期间点击次数统计"); env.execute(); // 不固定时间间隔 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(ProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor<UserBehaviorEvent>() { @Override public long extract(UserBehaviorEvent element) { String userId = element.getUserId(); // 假设用户id结尾是偶数说明是A类用户这样的用户在本站的会话时间就给他长一点20s,其他用户为B类用户普通用户会话时长是10s if (Integer.parseInt(userId) % 2 == 0) { return Time.seconds(20).toMilliseconds(); } else { return Time.seconds(10).toMilliseconds(); } } })) .sum("times") .map(event -> String.format("用户%s在会话期间的点击次数为%d", event.getUserId(), event.getTimes())) .print("不固定时间间隔用户会话期间点击次数统计"); env.execute(); |
这是是固定好了gap时间间隔,当然也可以不固定时间间隔。
- 全局窗口
全局窗口只有开始没有结束,想要中间打出点结果来得定义触发器。
假如统计一个用户累计的点击次数从任务开始以来,每间隔10次播报一次
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // Global全局窗口 events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(GlobalWindows.create()) // 定义触发器每满5次就播报一次用户的累计点击 .trigger(CountTrigger.of(5)) .sum("times") .map(event -> String.format("用户%s累计点击次数为%d", event.getUserId(), event.getTimes())) .print("累计点击次数统计"); env.execute(); |
-
-
- 窗口函数
-
窗口函数,开窗之后是对窗口内的数据做统计分析和处理的,这种统计分析处理是由多种窗口函数来完成的。
- ReduceFunction(增量聚合)
增量聚合,来一条就累加到结果里,函数输入和输出是一样的类型。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 对events进行开窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> windowedStream = events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)); // reduce累加 windowedStream.reduce(new ReduceFunction<UserBehaviorEvent>() { @Override public UserBehaviorEvent reduce(UserBehaviorEvent value1, UserBehaviorEvent value2) throws Exception { value1.setTimes(value1.getTimes() + value2.getTimes()); return value1; } }) .map(event -> String.format("用户%s在10s内的点击次数为%d", event.getUserId(), event.getTimes())) .print("用户点击次数"); env.execute(); |
- AggregateFunction(增量聚合)
灵活度会比reduce高一些,可以自己定义输出类型并且累加过程也可以自己定义。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 对events进行开窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> windowedStream = events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)); // reduce累加 windowedStream.aggregate(new AggregateFunction<UserBehaviorEvent, Tuple2<String, Long>, String>() { // 初始化累加 @Override public Tuple2<String, Long> createAccumulator() { Tuple2<String, Long> tuple2 = new Tuple2<>(); tuple2.f1 = 0L; return tuple2; } // 累计的方式 @Override public Tuple2<String, Long> add(UserBehaviorEvent value, Tuple2<String, Long> accumulator) { if (Objects.isNull(accumulator.f0)) { accumulator.f0 = value.getUserId(); } accumulator.f1 += value.getTimes(); return accumulator; } @Override public String getResult(Tuple2<String, Long> accumulator) { return String.format("用户%s在10s内的点击次数为%d", accumulator.f0, accumulator.f1); } @Override public Tuple2<String, Long> merge(Tuple2<String, Long> a, Tuple2<String, Long> b) { return new Tuple2<>(a.f0, a.f1 + b.f1); } }).print("用户点击次数"); env.execute(); |
- ProcessWindowFunction(全窗口数据处理)
自由度就更高了,就是简单粗暴的把这一个窗口里面收集到的元素都给你,自己去处理想累加或怎么办都可以。
加入我们使用Process来实现一个点击次数统计。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); Time windowSize = Time.seconds(10); // 对events进行开窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> windowedStream = events.filter(event -> StringUtils.equals(event.getEventType(), "click")) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingProcessingTimeWindows.of(windowSize)); // reduce累加 泛型分别为 输入类型,输出类型,key的类型,window类型 windowedStream.process(new ProcessWindowFunction<UserBehaviorEvent, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { // 所有窗口里面的元素都被收集到了elements迭代器里面 LongSummaryStatistics collect = ((List<UserBehaviorEvent>) IteratorUtils.toList(elements.iterator())) .stream().collect(Collectors.summarizingLong(UserBehaviorEvent::getTimes)); Long count = collect.getSum(); out.collect(String.format("用户%s在10s内的点击次数为%d", key, count)); } }).print("用户点击次数"); env.execute(); |
-
-
- Keyed Window和Non-Keyed Window
-
在做窗口处理之前需要确认是keyedStream还是没有keyedStream如果进行了key分组后续的task是可以多个并行处理的key相同的都会分到一个task中进行处理。如果没有设置key那么所有的数据都会进入一个task中处理,那么这个task的并行度也只能是1,无论设置它有多少个并行度。
-
- 时间
在Flink里面时间有多种时间,处理时间ProcessTime事件时间EventTime还有接收时间Ingestion Time。
时间描述了事件发生的时刻,如果是被记录下来的描述像EventTime那就有可能乱序,如果是ProcessTime就不会乱序因为处理的时刻即为时间描述这个物理世界的时间顺序是不会乱的。
- 处理时间
当流程序按处理时间运行时,所有基于时间的操作(如时间窗口)都将使用运行相应算子的机器的系统时钟。每小时处理时间窗口将包括在系统时钟指示整小时之间到达特定操作员的所有记录。例如,如果应用程序在上午 9:15 开始运行,则第一个每小时处理时间窗口将包括上午 9:15 至上午 10:00 之间处理的事件,下一个窗口将包括上午 10:00 至上午 11:00 之间处理的事件,依此类推上。
处理时间是最简单的时间概念,不需要流和机器之间的协调。它提供最佳性能和最低延迟。然而,在分布式和异步环境中,处理时间不提供确定性,因为它容易受到记录到达系统(例如来自消息队列)的速度,以及记录在系统内部操作员之间流动的速度的影响,以及中断(计划的或其他方式)。
- 事件时间
事件时间是每个单独事件在其产生设备上发生的时间。这个时间通常在记录进入 Flink 之前嵌入在记录中,并且可以从每条记录中提取该事件时间戳。在事件时间中,时间的进度取决于数据,而不是任何挂钟。事件时间程序必须指定如何生成事件时间水印,这是在事件时间中表示进度的机制。这个水印机制在后面的章节所描述的, 下面。
在一个完美的世界中,事件时间处理将产生完全一致和确定性的结果,无论事件何时到达或它们的顺序如何。但是,除非已知事件按顺序到达(按时间戳),否则事件时间处理在等待乱序事件时会产生一些延迟。由于只能等待一段有限的时间,这限制了事件时间应用程序的确定性。
假设所有数据都已到达,事件时间操作将按预期运行,即使在处理乱序或延迟事件,或重新处理历史数据时,也会产生正确且一致的结果。例如,每小时事件时间窗口将包含所有带有属于该小时的事件时间戳的记录,无论它们到达的顺序或处理时间如何。(有关更多信息,请参阅有关迟到事件的部分。)
6.3 事件时间和水印
事件时间是和消息同在的也就是说成了一种标记,不会像处理时间一样,处理时间是随着时间的流逝机器上的时钟是一直按照顺序往后走的,不会说一会10点一会又9点,所以不存在乱序的问题。而事件时间因为时间是被记录下来的考虑到有顺序不同或延迟,要对此做出处理,另一方面时间被记录下来处理存量数据时不用真的等到某个时间再处理按照记录下来的时间如果速度快的话也能很快就处理完存量几天的数据按照窗口的方式来处理。
如果是简单的ETL清洗数据只关注当前处理的这一条数据可以不用记录数据的状态,但是如果处理当前数据需要前一条或者前几条数据配合处理或者处于同一窗口的数据就需要记录数据的状态。
假如都是有序的那么水印就如下
假如都存在无序那么水印就如下
从上面两张图可以看出第二张图是对数据有一定的包容性,迟到数据也可以包容一下进来处理。当然上面这个图没能看出它设置的允许最大延迟是多少。
假如一条数据进来就判断以下当前的水印。
假如允许迟到时间是5,上面是时间戳,下面是对应生成的水印,这样对吗?
肯定是不对的,水印是描述事件处理时间进展的,这个时间进度能倒退吗显然是不能的,时间是不能倒退的,所以应该是下面这样。
水印随着时间流动只能越来越大,被动拔高不能降低。
水印对于基于事件时间处理的流程序来说是至关重要的,如果说我知道我处理的数据迟到时间最大不会超过某个值,如果超过了这个处理机制就是存在问题的,比如本身就是实时性非常高的数据延迟却很高就说明数据流转的过程出现了性能问题。
其实针对迟到数据Flink也是提供了迟到处理的机制,即使真的迟到了也是可以处理的,不会漏掉。
6.3.1 时间语义和水印的设置
时间语义给定和水印生成是通过时间戳分配器完成的
- 内置的时间戳分配器
// 1. 不设置水印,此时可以配合处理时间来处理数据 WatermarkStrategy<UserBehaviorEvent> watermarks = WatermarkStrategy.<UserBehaviorEvent>noWatermarks(); // 2. 不允许迟到,也就是设置了水印,当前来的数据的时间的最高点就是水印 watermarks = WatermarkStrategy.<UserBehaviorEvent>forMonotonousTimestamps(); // 3. 允许固定时间水印,包容固定时间的迟到,下面是设置水印固定在最高点-1分钟 watermarks = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMinutes(1L)); // 在设置好水印的处理方式后根据数据给定水印的依据也就是时间戳提取 watermarks = watermarks.withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } }); |
第四种就是自定义水印生成
- 周期性水印生成
// 自定义水印生成器,这里是周期性发射水印 WatermarkStrategy<UserBehaviorEvent> watermarks = WatermarkStrategy.<UserBehaviorEvent>forGenerator(new WatermarkStrategy<UserBehaviorEvent>() { @Override public WatermarkGenerator<UserBehaviorEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { // 定义一个水印生成器 return new WatermarkGenerator<UserBehaviorEvent>() { // 允许最大的迟到时间 private final long lateTime = maxLateTime; // 当前最大的时间戳,这里要在周期性发射里面使用,里面 - lateTime - 1, // 如果直接使用long最小值减去一个值就使结果成为一个最大值, // 这样就违背了初始化的时候使maxTs为最小值的初衷所以在初始化的时候先提前加上 private long maxTs = Long.MIN_VALUE + lateTime + 1; // 当元素来的时候如何生成水印 @Override public void onEvent(UserBehaviorEvent event, long eventTimestamp, WatermarkOutput output) { maxTs = Math.max(maxTs, eventTimestamp); } // 周期性的发射水印,默认触发器是200ms执行一次 @Override public void onPeriodicEmit(WatermarkOutput output) { Watermark watermark = new Watermark(maxTs - lateTime - 1); System.out.println("当前水印:" + watermark); output.emitWatermark(watermark); } }; } }); |
- 间歇式水印生成
// 自定义水印生成器,这里是间歇性发射水印 WatermarkStrategy<UserBehaviorEvent> watermarks = WatermarkStrategy.<UserBehaviorEvent>forGenerator(new WatermarkStrategy<UserBehaviorEvent>() { @Override public WatermarkGenerator<UserBehaviorEvent> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { // 定义一个水印生成器 return new WatermarkGenerator<UserBehaviorEvent>() { // 允许最大的迟到时间 private final long lateTime = maxLateTime; // 当前最大的时间戳,这里要在周期性发射里面使用,里面 - lateTime - 1, // 如果直接使用long最小值减去一个值就使结果成为一个最大值, // 这样就违背了初始化的时候使maxTs为最小值的初衷所以在初始化的时候先提前加上 private long maxTs = Long.MIN_VALUE + lateTime + 1; // 当元素来的时候如何生成水印,间歇性在这里来发射水印 @Override public void onEvent(UserBehaviorEvent event, long eventTimestamp, WatermarkOutput output) { maxTs = Math.max(maxTs, eventTimestamp); // 事件来临时发射水印,不来就不发射 Watermark watermark = new Watermark(maxTs - lateTime - 1); System.out.println("当前水印:" + watermark); output.emitWatermark(watermark); } // 周期性的发射水印,默认触发器是200ms执行一次,如果是间歇性就不需要写该方法 @Override public void onPeriodicEmit(WatermarkOutput output) { } }; } }); |
6.3.2 并行情况下的水印传递情况
在同一task不同的分区下面这几个分区的水印是怎么管理的,是统一的是按照水印最低的那个发射给下一个task中的。总是以最低的那个水印为准。
6.3.3 处理迟到数据
当使用事件时间且后来的数据超过了水印容忍的限度,那就存在了迟到数据,在正常情况下应该是不会有迟到的,但是出现了迟到数据也是有处理的方式的。当然这种做法只能使用在事件时间上。
处理迟到数据API
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 对source进行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 设置好水印 WatermarkStrategy<UserBehaviorEvent> watermarkStrategy = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness( Duration.ofSeconds(1)) .withTimestampAssigner((SerializableTimestampAssigner<UserBehaviorEvent>) (element, recordTimestamp) -> element.getTimestamp()); OutputTag<UserBehaviorEvent> lateDataTag = new OutputTag<UserBehaviorEvent>("lateDataTag") { }; // 先做开窗 WindowedStream<UserBehaviorEvent, String, TimeWindow> window = events.assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(UserBehaviorEvent::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) // 对迟到给出声明 .allowedLateness(Time.seconds(1)) .sideOutputLateData(lateDataTag); // 对正常开窗数据进行处理,注意下面得使用process对结果进行处理才能通过测输出流得到迟到数据,使用reduce得不到迟到数据 SingleOutputStreamOperator<String> result = window // .reduce(new ReduceFunction<UserBehaviorEvent>() { // @Override // public UserBehaviorEvent reduce(UserBehaviorEvent value1, UserBehaviorEvent value2) throws Exception { // value1.setTimes(value1.getTimes() + value2.getTimes()); // return value1; // } // }) // .map(event -> String.format("%s用户10s窗口pv统计%d", event.getUserId(), event.getTimes())); .process(new ProcessWindowFunction<UserBehaviorEvent, String, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { int size = IteratorUtils.toList(elements.iterator()).size(); out.collect(String.format("%s用户10s窗口pv统计%d", s, size)); } }); // 打印处理结果 result.print("pv统计结果"); // 对迟到数据进行处理 result.getSideOutput(lateDataTag) .print("迟到数据"); env.execute(); |
侧输出流除了可以处理迟到数据还可以用来分离流数据,怎么分离,比如埋点事件,要分成一个全部事件的主流再分出一个只有点击事件的点击流。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 对source进行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 设置好水印 WatermarkStrategy<UserBehaviorEvent> watermarkStrategy = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness( Duration.ofSeconds(1)) .withTimestampAssigner((SerializableTimestampAssigner<UserBehaviorEvent>) (element, recordTimestamp) -> element.getTimestamp()); OutputTag<UserBehaviorEvent> clickDataTag = new OutputTag<UserBehaviorEvent>("clickDataTag") { }; KeyedStream<UserBehaviorEvent, String> keyedStream = events.assignTimestampsAndWatermarks(watermarkStrategy) .keyBy(UserBehaviorEvent::getUserId); SingleOutputStreamOperator<UserBehaviorEvent> splitOperator = keyedStream.process(new KeyedProcessFunction<String, UserBehaviorEvent, UserBehaviorEvent>() { @Override public void processElement(UserBehaviorEvent value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { out.collect(value); // 将点击事件单独处理 if ("click".equals(value.getEventType())) { ctx.output(clickDataTag, value); } } }); splitOperator.print("全部事件"); splitOperator.getSideOutput(clickDataTag).print("点击事件"); env.execute(); |
-
- 底层API操作
底层API就是process,其内部根据process的不同也是分为多种的process,这一层的API相对较为自由,所有的上层API都是可以基于process封装出来。
- ProcessFunction
ProcessFunction是作用在普通的DataStream上面的process方法,加入使用process完成map操作
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象,使用process来完成 SingleOutputStreamOperator<UserBehaviorEvent> events = source.process(new ProcessFunction<String, UserBehaviorEvent>() { @Override public void processElement(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { // 自由处理给出了element元素,给出了ctx上下文环境,给出了结果收集器out UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } }); events.print("对象流"); env.execute(); |
- KeyedProcessFuntion
KeyedProcessFunction是作用在keyedStream后的Process上面,主要针对分组后的流数据做处理。
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 使用KeyedProcessFunction需要先对流进行keyBy的处理 KeyedStream<String, String> keyedStream = source.keyBy(eventMessage -> JSON.parseObject(eventMessage).getString("user_id")); // 将分组之后的数据转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = keyedStream.process(new KeyedProcessFunction<String, String, UserBehaviorEvent>() { @Override public void processElement(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { // 自由处理给出了element元素,给出了ctx上下文环境,给出了结果收集器out UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } }); events.print("对象流"); env.execute(); |
- CoProcessFunction
Co开头就是作用在ConnectedStream后面的操作,所以CoProcessFunction是作用在两个流连接后的ConnectedStream的process上面
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); DataStreamSource<String> source2 = env.socketTextStream("hadoop02", 9999); // Co开头的都是作用在ConnectedStream上面多个流连接起来的流 ConnectedStreams<String, String> connectedStreams = source1.connect(source2); // 分别对两个流进行处理 SingleOutputStreamOperator<UserBehaviorEvent> events = connectedStreams.process(new CoProcessFunction<String, String, UserBehaviorEvent>() { @Override public void processElement1(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } @Override public void processElement2(String value, Context ctx, Collector<UserBehaviorEvent> out) throws Exception { UserBehaviorEvent event = JSON.parseObject(value, UserBehaviorEvent.class); out.collect(event); } }); events.print("合并后的对象流"); env.execute(); |
- ProcessJoinFunction
两个流之间的关联后的处理函数
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); DataStreamSource<String> source2 = env.socketTextStream("hadoop02", 9999); SingleOutputStreamOperator<UserBehaviorEvent> left = source1.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); SingleOutputStreamOperator<UserBehaviorEvent> right = source2.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); WatermarkStrategy<UserBehaviorEvent> watermark1 = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMillis(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } }); WatermarkStrategy<UserBehaviorEvent> watermark2 = WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMillis(3)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } }); left = left.assignTimestampsAndWatermarks(watermark1); right = right.assignTimestampsAndWatermarks(watermark2); DataStream<String> result = left.join(right).where(UserBehaviorEvent::getUserId).equalTo(UserBehaviorEvent::getUserId) .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) .apply(new JoinFunction<UserBehaviorEvent, UserBehaviorEvent, String>() { @Override public String join(UserBehaviorEvent first, UserBehaviorEvent second) throws Exception { return String.format("left message:%s\tright message:%s", first, second); } }); result.print("join结果"); env.execute(); |
这里面两个流之间的关联如果有水位线的话,下游水位线依然是按照最低的那个流的水位线为标准。
- ProcessWindowFunction
在window后使用,是在keyedWindow后面使用process
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); SingleOutputStreamOperator<UserBehaviorEvent> left = source1.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); SingleOutputStreamOperator<String> result = left.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } })).keyBy(UserBehaviorEvent::getUserId) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new ProcessWindowFunction<UserBehaviorEvent, String, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { int size = IteratorUtils.toList(elements.iterator()).size(); String message = String.format("%s用户10s内的浏览次数为%d", key, size); out.collect(message); } }); result.print("join结果"); env.execute(); } |
- ProcessAlWindowFunction
在windowAll后处理函数process里使用
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source1 = env.socketTextStream("hadoop01", 9999); SingleOutputStreamOperator<UserBehaviorEvent> left = source1.map(event -> JSON.parseObject(event, UserBehaviorEvent.class)); SingleOutputStreamOperator<String> result = left.assignTimestampsAndWatermarks(WatermarkStrategy.<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } })).windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new ProcessAllWindowFunction<UserBehaviorEvent, String, TimeWindow>() { @Override public void process(Context context, Iterable<UserBehaviorEvent> elements, Collector<String> out) throws Exception { int size = IteratorUtils.toList(elements.iterator()).size(); String message = String.format("所有用户5m内的浏览次数为%d", size); out.collect(message); } }); result.print("join结果"); env.execute(); |
-
- 定时器Timer
定时器在接收到一个元素后可以注册一个定时器,在指定的时间执行,可以依据处理时间来创建也可以依据事件时间创建。当然也可以删除定时器。
定时器是在TimeService上注册或删除的,这个timeservice是在context或OnTimerContext中持有的,比如ProcessFunction中就有所有继承自RichFunction的函数都持有这个上下文对象。
基于处理时间的定时器:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 对source进行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); events.keyBy(UserBehaviorEvent::getUserId) .process(new KeyedProcessFunction<String, UserBehaviorEvent, String>() { @Override public void processElement(UserBehaviorEvent value, Context ctx, Collector<String> out) throws Exception { TimerService timerService = ctx.timerService(); long currentProcessingTime = timerService.currentProcessingTime(); System.out.println(currentProcessingTime); timerService.registerProcessingTimeTimer(currentProcessingTime + Time.seconds(5).toMilliseconds()); out.collect(value.toString()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { System.out.println("定时执行:" + timestamp); } }).print("流数据"); env.execute(); |
基于事件时间的定时器:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); env.disableOperatorChaining(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 对source进行分流 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); events = events.assignTimestampsAndWatermarks(WatermarkStrategy .<UserBehaviorEvent>forBoundedOutOfOrderness(Duration.ofMillis(1)) .withTimestampAssigner(new SerializableTimestampAssigner<UserBehaviorEvent>() { @Override public long extractTimestamp(UserBehaviorEvent element, long recordTimestamp) { return element.getTimestamp(); } })); events.keyBy(UserBehaviorEvent::getUserId) .process(new KeyedProcessFunction<String, UserBehaviorEvent, String>() { @Override public void processElement(UserBehaviorEvent value, Context ctx, Collector<String> out) throws Exception { TimerService timerService = ctx.timerService(); long watermark = timerService.currentWatermark(); System.out.println(watermark); timerService.registerEventTimeTimer(watermark + Time.milliseconds(5).toMilliseconds()); out.collect(value.toString()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { System.out.println("定时执行:" + timestamp); } }).print("流数据"); env.execute(); |
-
- 状态编程
虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但有些操作会记住多个事件的信息(例如窗口操作符)。这些操作称为有状态的。
- 当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。
- 当聚合每分钟/小时/天的事件时,状态持有待处理的聚合。
- 在数据点流上训练机器学习模型时,状态保存模型参数的当前版本。
- 当需要管理历史数据时,状态允许有效访问过去发生的事件。
状态还可以用来作为检查点和保存点的容错依据
未来还可以做到外部可查询的状态
状态的管理可以使用不同的状态后端来进行管理
-
-
- 状态的分类
-
Flink包括两种基本类型的状态Managed State和Raw State
Managed State | Raw State | |
状态管理方式 | Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩 | 用户自己管理 |
状态数据结构 | Flink提供多种常用数据结构, 例如:ListState, MapState等 | 字节数组: byte[] |
使用场景 | 绝大数Flink算子 | 所有算子 |
-
-
-
- ManagedState
-
-
分为两类KeyedState监控状态
用在监控流上可以根据Key来记录流数据的状态
OperatorState算子状态
普通算子状态,一个算子任务对应一种状态
Operator State | Keyed State | |
适用用算子类型 | 可用于所有算子: 常用于source, 例如 FlinkKafkaConsumer | 只适用于KeyedStream上的算子 |
状态分配 | 一个算子的子任务对应一个状态 | 一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State |
创建和访问方式 | 实现CheckpointedFunction或ListCheckpointed(已经过时)接口 | 重写RichFunction, 通过里面的RuntimeContext访问 |
横向扩展 | 并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量 | 并发改变, State随着Key在实例间迁移 |
支持的数据结构 | ListState和BroadCastState | ValueState, ListState,MapState ReduceState, AggregatingState |
6.5.1.1 键控流状态操作
- K-V形式的流,指定Key来对数据进行分区可以保证相同Key的被分到同样的分区里,方便访问统一分区的数据状态。
-
- 状态的保存
-
Flink使用流重放和检查点组合来实现容错,任务失败允许从保存的记录中恢复过来继续执行,对于处理过程的容错可以保证Exactly-Once严格一次性。
键控流操作都可以通过RichFunction中RuntimeContext runtimeContext = getRuntimeContext();获取到运行时上下文环境,来获得state状态。
- VaueState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map来处理永远记录相同key情况下times最大的那个对象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, UserBehaviorEvent>() { // 记录用户times最大的那个值 ValueState<UserBehaviorEvent> maxTimesState; @Override public UserBehaviorEvent map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); maxTimesState = runtimeContext.getState(new ValueStateDescriptor<UserBehaviorEvent>("maxTimes", UserBehaviorEvent.class)); if (maxTimesState.value() == null) { maxTimesState.update(value); } else { if (maxTimesState.value().getTimes() < value.getTimes()) { maxTimesState.update(value); } } return maxTimesState.value(); } }).print("最大次数"); env.execute(); |
- ListState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map来处理永远记录top3的times的对象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, List<UserBehaviorEvent>>() { // 记录用户times最大的那个值 ListState<UserBehaviorEvent> top3TimesState; @Override public List<UserBehaviorEvent> map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); top3TimesState = runtimeContext.<UserBehaviorEvent>getListState(new ListStateDescriptor<UserBehaviorEvent>("top3Times", UserBehaviorEvent.class)); top3TimesState.add(value); List<UserBehaviorEvent> top3Events = ((List<UserBehaviorEvent>) IteratorUtils.toList(top3TimesState.get().iterator())); top3Events.sort(new Comparator<UserBehaviorEvent>() { @Override public int compare(UserBehaviorEvent e1, UserBehaviorEvent e2) { return (int) (e2.getTimes() - e1.getTimes()); } }); top3TimesState.clear(); top3Events = top3Events.stream().limit(3).collect(Collectors.toList()); top3TimesState.addAll(top3Events); return top3Events; } }).print("最大3次"); env.execute(); |
- ReducingState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map来处理永远记录top3的times的对象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, String>() { // 记录用户times最大的那个值 ReducingState<UserBehaviorEvent> reducingTimesState; @Override public String map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); reducingTimesState = runtimeContext.<UserBehaviorEvent>getReducingState(new ReducingStateDescriptor<UserBehaviorEvent>("top3Times", new ReduceFunction<UserBehaviorEvent>() { @Override public UserBehaviorEvent reduce(UserBehaviorEvent value1, UserBehaviorEvent value2) throws Exception { value1.setTimes(value1.getTimes() + value2.getTimes()); return value1; } }, UserBehaviorEvent.class)); reducingTimesState.add(value); UserBehaviorEvent event = reducingTimesState.get(); return String.format("%s用户累计访问次数%d", event.getUserId(), event.getTimes()); } }).print("累计访问次数"); env.execute(); |
- AggregateState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 使用map来处理永远记录top3的times的对象 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, String>() { // 记录用户times最大的那个值 AggregatingState<UserBehaviorEvent, String> aggregatingState; @Override public String map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); aggregatingState = runtimeContext.<UserBehaviorEvent, Long, String>getAggregatingState(new AggregatingStateDescriptor<UserBehaviorEvent, Long, String>("agg", new AggregateFunction<UserBehaviorEvent, Long, String>() { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehaviorEvent value, Long accumulator) { return value.getTimes() + accumulator; } @Override public String getResult(Long accumulator) { return accumulator.toString(); } @Override public Long merge(Long a, Long b) { return a + b; } }, Long.class)); aggregatingState.add(value); return String.format("%s用户累计访问次数%s", value.getUserId(), aggregatingState.get()); } }).print("累计访问次数"); env.execute(); |
- MapState
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); // 先将用户行为转化为对象 SingleOutputStreamOperator<UserBehaviorEvent> events = source.map(event -> JSONObject.parseObject(event, UserBehaviorEvent.class)); // 按照用户id进行去重永远取得最早的那个 events.keyBy(UserBehaviorEvent::getUserId).map(new RichMapFunction<UserBehaviorEvent, String>() { // 记录用户times最大的那个值 MapState<String, UserBehaviorEvent> mapState; @Override public String map(UserBehaviorEvent value) throws Exception { RuntimeContext runtimeContext = getRuntimeContext(); mapState = runtimeContext.<String, UserBehaviorEvent>getMapState(new MapStateDescriptor<String, UserBehaviorEvent>("mapState", String.class, UserBehaviorEvent.class)); if (mapState.get(value.getUserId()) == null) { mapState.put(value.getUserId(), value); } return String.format("%s用户访问最早是在%d", value.getUserId(), mapState.get(value.getUserId()).getTimestamp()); } }).print("累计访问次数"); env.execute(); |
6.5.1.2 算子状态操作
这两种算子状态可以不做keyBy就可以使用,只要Function继承自CheckpointedFunction就可以使用,需要实现一个初始化方法一个snapshot快照方法。
ListState
BroadState
-
-
- 状态后端
-
状态后端就是配合checkpoint完成状态存储的,将数据处理保存在TaskManager之外再保存在外部存储一份。
三种状态后端存储方式,Memory、Fs、RocksDB
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 首先开启使用checkpoint env.isUnalignedCheckpointsEnabled(); // 再设置开启checkpoint后状态数据保存在什么样的后端 // 内存后端 env.setStateBackend(new MemoryStateBackend()); // 外部文件后端 env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/user/flink/ck")); // rocksDB后端 env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop01:9000/user/flink/ck")); |
-
- 容错机制
一致性,三个级别至少一次、最多一次、精准一次。从Flink数据流的各个阶段来看不同阶段的一致性Source、flink流处理、Sink,首先flink流处理可以基于checkpoint完成精准一次性,除此之外Source得符合重新提交偏移量能从执行偏移量开始接收数据才能完成source端的精准一次,其次Sink端要么Sink到支持幂等性的存储要么实现两段提交(也就是内部实现一个事务)。
Checkpoint检查点如何容错
数据处理过程中出错如何恢复达到精准一次性,假设场景是这样的
一个输入数据流经过Source,Source记录一下当前的偏移量,然后发送给后面的处理算子,两个并行的处理算子上面用于sum偶数下面sum奇数,当5这条数据被后面的sum处理完后这三个位置一起记录一个checkpoint可用于恢复的检查点。
然后假设当下一条数据出现了错误
当Source当前记录是7,正在处理7的过程中发生了错误,上一次数据处理完记录的快照是5,6,9然后把source偏移量进行调整至6开始接收数据,sum_even位置是6,sum_odd是9,这样就恢复了从6开始的消费然后正常进行下面的处理。
注意:检查点不是每一条数据都会做检查点,检查点是周期性的,不是说7失败了一定会从6个点记录的检查点开始恢复(因为周期性记录的原因6完成后不一定记录了快照)
这里存在一个问题?
如果5这个偏移量的时候要记录快照了,sum_even和sum_odd是如何知道5这个偏移量的数据已经处理完了呢,5是奇数sum_odd处理了这条数据很容易能够感知,但是sum_even是如何知道5已经从整个的数据流已经处理完了呢,不知道。这样就得使用某种机制让sum_even也知道5已经处理完了。
这里就是插入一个Barrier(分界线),假如偏移量到了5,source接收到是5,将这个分界线5以广播的方式发送到下游的各个subtask,当然sum_even不处理这条数据但是已经知道偏移量到了5可以给出一个快照就是sum_even:5:6,sum_odd处理完5也可以给出一个快照sum_odd:5:9,这样就在checkpoint里面制作好了一个完整的基于5的一个快照,等处理后面的数据出现了差错而且还没来的及制作下一快照时就可以使用5这个快照来做状态恢复。
这里还有个问题就是对齐和非对齐的Barrier的问题,就是如果数据源是多份,同时存在两个并行的Source来接收数据。
上图中,数字流和字母流,如果中间图执行比如CoProcessFunction双流操作的时候,一个流比较快一个流比较慢需,在一个Operator算子处理的时候如果Barrier不齐,它会等到对齐之后再处理,如果先来的数据会怎么办呢,先保存起来,当Barrier对齐了之后再处理缓存起来的数据。这样是有可能引起反压机制的,处理不过来数据缓存过大为了Barrier对齐,降低了处理速度。
当然也是可以不保证Barrier对齐再处理的,这样就可能不会完成严格一次性的要求了。
SavePoint&Checkpoint操作
代码中配置检查点配置
System.setProperty("HADOOP_USER_NAME","hadoop"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 设置状态后端,也就是声明数据保存的位置 env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink/ck")); env.setParallelism(1); // 2. 配置检查点 // 设置允许checkpoint env.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); // 触发下一个检查点之前的最小暂停 checkpointConfig.setMinPauseBetweenCheckpoints(500); // 设置checkpoint检查设置的时长,如果超过这个时长就会抛弃做该检查点 checkpointConfig.setCheckpointTimeout(60000); // 设置同一时间检查点的并发量 checkpointConfig.setMaxConcurrentCheckpoints(1); // 设置检查点会不会被cancel后备删除,如果删除将无法从检查点恢复 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 启用未对齐的检查点这样可以增大吞吐量但是精准一次性会被忽略 checkpointConfig.enableUnalignedCheckpoints(); DataStreamSource<String> source = env.socketTextStream("hadoop01", 9999); SingleOutputStreamOperator<String> words = source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { String[] s = value.split(" "); for (String word : s) { out.collect(word); } } }); SingleOutputStreamOperator<Tuple2<String, Long>> maps = words.map(word -> new Tuple2<String, Long>(word, 1L)).returns(TupleTypeInfo.getBasicTupleTypeInfo(String.class, Long.class)); SingleOutputStreamOperator<Tuple2<String, Long>> result = maps.keyBy(tuple -> tuple.f0).sum(1); result.print("result"); env.execute(); |
- 提交任务时设置savepoint
正常提交任务
bin/flink run -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yjm 1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
设置对应任务的检查点
bin/flink savepoint -m yarn-cluster -yid application_1623244631654_0003 69819b8ef328cd01cc04bd6d7e46791e hdfs://hadoop01:9000/flink/savepoint
- 从savepoint恢复
bin/flink run -s hdfs://hadoop01:9000/flink/savepoint/savepoint-69819b-7b0d45c8b11f -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yj1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
如果是使用checkpoint
- 正常提交任务
bin/flink run -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yjm 1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
- 从检查点恢复
bin/flink run -s hdfs://hadoop01:9000/flink/ck/21fdd3943b2b5454480915cfc047395f/chk-5 -m yarn-cluster -ynm FlinkExample -ys 1 -p 1 -yjm 1g -ytm 2g -c com.example.flink.datastream.checkpoint.Example01Checkpoint ./jars/com.example.flink.datastream-0.0.1-SNAPSHOT.jar hadoop01 9999
两阶段提交
前面提到整个的过程如果都实现了Exactly-once才能真正的实现。
- Source端读取数据需要支持指定位点获取数据(Kafka支持)
- 处理的过程中支持状态保存(Checkpoint可以实现)
- Sink端支持事务(保存数据要么数据库可以幂等写入,要么得支持事务先flush等checkpoint完成后再次确认commit动作)