天天看点

Flink1.10入门:Checkpoint重启策略及代码演示

一、概述 

    当任务失败时,Flink需要重新启动失败的任务和其他受影响的任务,将作业恢复到正常状态;重新启动策略和故障转移策略用于控制任务的重新启动。重新启动策略决定是否以及何时可以重新启动失败/受影响的任务。故障转移策略决定应该重新启动哪些任务以恢复作业。

二、Restart Strategies 重启策略

    在没有定义特定作业的重启策略时,总是使用默认的重启策略。如果提交的作业带有重启策略,该策略将覆盖集群的默认设置。

    默认的重启策略是通过Flink的配置文件flink-conf.yml设置的,也可以通过代码设置覆盖默认设置,重启策略主要分了以下几种类型

配置参数restart-strategy定义了采取哪种策略。如果没有启用Checkpoint,则使用“no restart”策略;如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略,其中 Integer.MAX_VALUE 参数是尝试重启次数,

restart-strategy重启策略主要分了以下几种类型:

1.none, off, disable

    不设置重启策略,如果没有启用Checkpoint,则使用“no restart”策略;

a.通过配置文件flink-conf.yaml设置:

restart-strategy: none      

b.通过代码设置:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());      

 2.fixeddelay, fixed-delay 

      固定时间间隔,可设置重启次数和重试时间间隔,如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略;

restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s      
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(  3, // 失败重启次数  Time.of(10, TimeUnit.SECONDS) //每次重启时间间隔));      

3.failurerate, failure-rate:

    失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间.如下配置是5分钟内若失败了3次则认为该job失败,重试间隔为10s。

restart-strategy: failure-raterestart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s      
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.failureRateRestart(  3, // max failures per interval  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate  Time.of(10, TimeUnit.SECONDS) // delay));      

三、重启策略代码演示

    这里还是从Socket端接收数据,然后进行单词统计,主函数代码基本不变,收到"error"开头的数据,直接抛出异常,各种情况修改Checkpoint的相关设置进行功能演示:

1.第一种情况,不设置Checkpoint,默认是无重启策略,代码如下:

package com.hadoop.ljs.flink110.checkpoint;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest {    public static void main(String[] args) throws Exception {        /*初始化Flink流处理环境*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*从socket接收数据*/        DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filterData=source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if(null==value||"".equals(value)){                    return false;                }                return true;            }        });        DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                /*收到error开头的数据 手动抛出异常  进行测试*/                if(value.startsWith("error")){                    throw new Exception("收到error,报错!!!");                }                String[] lines = value.split(",");                for(int i=0;i<lines.length;i++){                    Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1);                    out.collect(wordTmp);                }            }        });        DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1);        wordCount.print();        senv.execute();    }}      

    这里收到error信息,抛出异常,无重启策略情况下,整个job直接挂掉。

2.第二种情况,启用Checkpoint,采用固定时间间隔重启策略,代码如下:

package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest {    public static void main(String[] args) throws Exception {        /*初始化Flink流处理环境*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*每隔10S做一次checkpoint*/        senv.enableCheckpointing(10000);        /*采用固定时间间隔重启策略,设置重启次数和重启时间间隔*/        senv.setRestartStrategy(RestartStrategies.fixedDelayRestart(                3,// 尝试重启的次数                Time.seconds(2)  //重启时间间隔        ));        /*从socket接收数据*/        DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filterData=source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if(null==value||"".equals(value)){                    return false;                }                return true;            }        });        DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                /*收到error开头的数据 手动抛出异常  进行测试*/                if(value.startsWith("error")){                    throw new Exception("收到error,报错!!!");                }                String[] lines = value.split(",");                for(int i=0;i<lines.length;i++){                    Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1);                    out.collect(wordTmp);                }            }        });        DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1);        wordCount.print();        senv.execute();    }}      

    设定了重启次数3次,重启时间间隔2秒,这里我发送前3次error,主程序自动重启不会挂掉,当我重新发送第4次error的时候主程序达到重启上限,直接挂掉了,如图所示:

3.第三种情况,启用Checkpoint,设置失败率重启策略,并指定2分钟时间内,最大失败次数为3次,重启时间间隔10秒,代码如下:

package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest {    public static void main(String[] args) throws Exception {        /*初始化Flink流处理环境*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*每隔10S做一次checkpoint*/        senv.enableCheckpointing(10000);        /* 设置失败率重启策略,下面设置2分钟内 最大失败次数3次 重试时间间隔10秒*/        senv.setRestartStrategy(RestartStrategies.failureRateRestart(                3, //                Time.minutes(2), //time interval for measuring failure rate                Time.seconds(10) // delay        ));        /*从socket接收数据*/        DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filterData=source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if(null==value||"".equals(value)){                    return false;                }                return true;            }        });        DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                /*收到error开头的数据 手动抛出异常  进行测试*/                if(value.startsWith("error")){                    throw new Exception("收到error,报错!!!");                }                String[] lines = value.split(",");                for(int i=0;i<lines.length;i++){                    Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1);                    out.collect(wordTmp);                }            }        });        DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1);        wordCount.print();        senv.execute();    }}      

    这里使用失败率重启策略,2分钟内,收到第三次error时,主程序达到失败次数,直接挂掉。

Flink1.10入门:Checkpoint重启策略及代码演示