天天看點

Flink1.10入門:Checkpoint重新開機政策及代碼示範

一、概述 

    當任務失敗時,Flink需要重新啟動失敗的任務和其他受影響的任務,将作業恢複到正常狀态;重新啟動政策和故障轉移政策用于控制任務的重新啟動。重新啟動政策決定是否以及何時可以重新啟動失敗/受影響的任務。故障轉移政策決定應該重新啟動哪些任務以恢複作業。

二、Restart Strategies 重新開機政策

    在沒有定義特定作業的重新開機政策時,總是使用預設的重新開機政策。如果送出的作業帶有重新開機政策,該政策将覆寫叢集的預設設定。

    預設的重新開機政策是通過Flink的配置檔案flink-conf.yml設定的,也可以通過代碼設定覆寫預設設定,重新開機政策主要分了以下幾種類型

配置參數restart-strategy定義了采取哪種政策。如果沒有啟用Checkpoint,則使用“no restart”政策;如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策,其中 Integer.MAX_VALUE 參數是嘗試重新開機次數,

restart-strategy重新開機政策主要分了以下幾種類型:

1.none, off, disable

    不設定重新開機政策,如果沒有啟用Checkpoint,則使用“no restart”政策;

a.通過配置檔案flink-conf.yaml設定:

restart-strategy: none      

b.通過代碼設定:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());      

 2.fixeddelay, fixed-delay 

      固定時間間隔,可設定重新開機次數和重試時間間隔,如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策;

restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s      
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart(  3, // 失敗重新開機次數  Time.of(10, TimeUnit.SECONDS) //每次重新開機時間間隔));      

3.failurerate, failure-rate:

    失敗率重新開機政策在Job失敗後會重新開機,但是超過失敗率後,Job會最終被認定失敗。在兩個連續的重新開機嘗試之間,重新開機政策會等待一個固定的時間.如下配置是5分鐘内若失敗了3次則認為該job失敗,重試間隔為10s。

restart-strategy: failure-raterestart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s      
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.failureRateRestart(  3, // max failures per interval  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate  Time.of(10, TimeUnit.SECONDS) // delay));      

三、重新開機政策代碼示範

    這裡還是從Socket端接收資料,然後進行單詞統計,主函數代碼基本不變,收到"error"開頭的資料,直接抛出異常,各種情況修改Checkpoint的相關設定進行功能示範:

1.第一種情況,不設定Checkpoint,預設是無重新開機政策,代碼如下:

package com.hadoop.ljs.flink110.checkpoint;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest {    public static void main(String[] args) throws Exception {        /*初始化Flink流處理環境*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*從socket接收資料*/        DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filterData=source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if(null==value||"".equals(value)){                    return false;                }                return true;            }        });        DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                /*收到error開頭的資料 手動抛出異常  進行測試*/                if(value.startsWith("error")){                    throw new Exception("收到error,報錯!!!");                }                String[] lines = value.split(",");                for(int i=0;i<lines.length;i++){                    Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1);                    out.collect(wordTmp);                }            }        });        DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1);        wordCount.print();        senv.execute();    }}      

    這裡收到error資訊,抛出異常,無重新開機政策情況下,整個job直接挂掉。

2.第二種情況,啟用Checkpoint,采用固定時間間隔重新開機政策,代碼如下:

package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest {    public static void main(String[] args) throws Exception {        /*初始化Flink流處理環境*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*每隔10S做一次checkpoint*/        senv.enableCheckpointing(10000);        /*采用固定時間間隔重新開機政策,設定重新開機次數和重新開機時間間隔*/        senv.setRestartStrategy(RestartStrategies.fixedDelayRestart(                3,// 嘗試重新開機的次數                Time.seconds(2)  //重新開機時間間隔        ));        /*從socket接收資料*/        DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filterData=source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if(null==value||"".equals(value)){                    return false;                }                return true;            }        });        DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                /*收到error開頭的資料 手動抛出異常  進行測試*/                if(value.startsWith("error")){                    throw new Exception("收到error,報錯!!!");                }                String[] lines = value.split(",");                for(int i=0;i<lines.length;i++){                    Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1);                    out.collect(wordTmp);                }            }        });        DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1);        wordCount.print();        senv.execute();    }}      

    設定了重新開機次數3次,重新開機時間間隔2秒,這裡我發送前3次error,主程式自動重新開機不會挂掉,當我重新發送第4次error的時候主程式達到重新開機上限,直接挂掉了,如圖所示:

3.第三種情況,啟用Checkpoint,設定失敗率重新開機政策,并指定2分鐘時間内,最大失敗次數為3次,重新開機時間間隔10秒,代碼如下:

package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest {    public static void main(String[] args) throws Exception {        /*初始化Flink流處理環境*/        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        /*每隔10S做一次checkpoint*/        senv.enableCheckpointing(10000);        /* 設定失敗率重新開機政策,下面設定2分鐘内 最大失敗次數3次 重試時間間隔10秒*/        senv.setRestartStrategy(RestartStrategies.failureRateRestart(                3, //                Time.minutes(2), //time interval for measuring failure rate                Time.seconds(10) // delay        ));        /*從socket接收資料*/        DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filterData=source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if(null==value||"".equals(value)){                    return false;                }                return true;            }        });        DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {            @Override            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {                /*收到error開頭的資料 手動抛出異常  進行測試*/                if(value.startsWith("error")){                    throw new Exception("收到error,報錯!!!");                }                String[] lines = value.split(",");                for(int i=0;i<lines.length;i++){                    Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1);                    out.collect(wordTmp);                }            }        });        DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1);        wordCount.print();        senv.execute();    }}      

    這裡使用失敗率重新開機政策,2分鐘内,收到第三次error時,主程式達到失敗次數,直接挂掉。

Flink1.10入門:Checkpoint重新開機政策及代碼示範