一、概述
当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,将作业恢复到正常状态;重新启动策略和故障转移策略用于控制任务的重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应该重新启动哪些任务以恢复作业。
二、Restart Strategies 重启策略
在没有定义特定作业的重启策略时,总是使用默认的重启策略。如果提交的作业带有重启策略,该策略将覆盖集群的默认设置。
默认的重启策略是通过Flink的配置文件flink-conf.yml设置的,也可以通过代码设置覆盖默认设置,重启策略主要分了以下几种类型
配置参数restart-strategy定义了采取哪种策略。如果没有启用Checkpoint,则使用“no restart”策略;如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE 参数是尝试重启次数,
restart-strategy重启策略主要分了以下几种类型:
1.none, off, disable
不设置重启策略,如果没有启用Checkpoint,则使用“no restart”策略;
a.通过配置文件flink-conf.yaml设置:
restart-strategy: none
b.通过代码设置:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());
2.fixeddelay, fixed-delay
固定时间间隔,可设置重启次数和重试时间间隔,如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略;
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 失败重启次数 Time.of(10, TimeUnit.SECONDS) //每次重启时间间隔));
3.failurerate, failure-rate:
失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间.如下配置是5分钟内若失败了3次则认为该job失败,重试间隔为10s。
restart-strategy: failure-raterestart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per interval Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay));
三、重启策略代码演示
这里还是从Socket端接收数据,然后进行单词统计,主函数代码基本不变,收到"error"开头的数据,直接抛出异常,各种情况修改Checkpoint的相关设置进行功能演示:
1.第一种情况,不设置Checkpoint,默认是无重启策略,代码如下:
package com.hadoop.ljs.flink110.checkpoint;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest { public static void main(String[] args) throws Exception { /*初始化Flink流处理环境*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*从socket接收数据*/ DataStream<String> source = senv.socketTextStream("localhost", 9000); DataStream<String> filterData=source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if(null==value||"".equals(value)){ return false; } return true; } }); DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { /*收到error开头的数据 手动抛出异常 进行测试*/ if(value.startsWith("error")){ throw new Exception("收到error,报错!!!"); } String[] lines = value.split(","); for(int i=0;i<lines.length;i++){ Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1); out.collect(wordTmp); } } }); DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1); wordCount.print(); senv.execute(); }}
这里收到error信息,抛出异常,无重启策略情况下,整个job直接挂掉。
2.第二种情况,启用Checkpoint,采用固定时间间隔重启策略,代码如下:
package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest { public static void main(String[] args) throws Exception { /*初始化Flink流处理环境*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*每隔10S做一次checkpoint*/ senv.enableCheckpointing(10000); /*采用固定时间间隔重启策略,设置重启次数和重启时间间隔*/ senv.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,// 尝试重启的次数 Time.seconds(2) //重启时间间隔 )); /*从socket接收数据*/ DataStream<String> source = senv.socketTextStream("localhost", 9000); DataStream<String> filterData=source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if(null==value||"".equals(value)){ return false; } return true; } }); DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { /*收到error开头的数据 手动抛出异常 进行测试*/ if(value.startsWith("error")){ throw new Exception("收到error,报错!!!"); } String[] lines = value.split(","); for(int i=0;i<lines.length;i++){ Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1); out.collect(wordTmp); } } }); DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1); wordCount.print(); senv.execute(); }}
设定了重启次数3次,重启时间间隔2秒,这里我发送前3次error,主程序自动重启不会挂掉,当我重新发送第4次error的时候主程序达到重启上限,直接挂掉了,如图所示:
3.第三种情况,启用Checkpoint,设置失败率重启策略,并指定2分钟时间内,最大失败次数为3次,重启时间间隔10秒,代码如下:
package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest { public static void main(String[] args) throws Exception { /*初始化Flink流处理环境*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*每隔10S做一次checkpoint*/ senv.enableCheckpointing(10000); /* 设置失败率重启策略,下面设置2分钟内 最大失败次数3次 重试时间间隔10秒*/ senv.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // Time.minutes(2), //time interval for measuring failure rate Time.seconds(10) // delay )); /*从socket接收数据*/ DataStream<String> source = senv.socketTextStream("localhost", 9000); DataStream<String> filterData=source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if(null==value||"".equals(value)){ return false; } return true; } }); DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { /*收到error开头的数据 手动抛出异常 进行测试*/ if(value.startsWith("error")){ throw new Exception("收到error,报错!!!"); } String[] lines = value.split(","); for(int i=0;i<lines.length;i++){ Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1); out.collect(wordTmp); } } }); DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1); wordCount.print(); senv.execute(); }}
这里使用失败率重启策略,2分钟内,收到第三次error时,主程序达到失败次数,直接挂掉。