一、概述
當任務失敗時,Flink需要重新啟動失敗的任務和其他受影響的任務,将作業恢複到正常狀态;重新啟動政策和故障轉移政策用于控制任務的重新啟動。重新啟動政策決定是否以及何時可以重新啟動失敗/受影響的任務。故障轉移政策決定應該重新啟動哪些任務以恢複作業。
二、Restart Strategies 重新開機政策
在沒有定義特定作業的重新開機政策時,總是使用預設的重新開機政策。如果送出的作業帶有重新開機政策,該政策将覆寫叢集的預設設定。
預設的重新開機政策是通過Flink的配置檔案flink-conf.yml設定的,也可以通過代碼設定覆寫預設設定,重新開機政策主要分了以下幾種類型
配置參數restart-strategy定義了采取哪種政策。如果沒有啟用Checkpoint,則使用“no restart”政策;如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策,其中 Integer.MAX_VALUE 參數是嘗試重新開機次數,
restart-strategy重新開機政策主要分了以下幾種類型:
1.none, off, disable
不設定重新開機政策,如果沒有啟用Checkpoint,則使用“no restart”政策;
a.通過配置檔案flink-conf.yaml設定:
restart-strategy: none
b.通過代碼設定:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());
2.fixeddelay, fixed-delay
固定時間間隔,可設定重新開機次數和重試時間間隔,如果啟用了 checkpointing,但沒有配置重新開機政策,則使用固定間隔 (fixed-delay) 政策;
restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3restart-strategy.fixed-delay.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 失敗重新開機次數 Time.of(10, TimeUnit.SECONDS) //每次重新開機時間間隔));
3.failurerate, failure-rate:
失敗率重新開機政策在Job失敗後會重新開機,但是超過失敗率後,Job會最終被認定失敗。在兩個連續的重新開機嘗試之間,重新開機政策會等待一個固定的時間.如下配置是5分鐘内若失敗了3次則認為該job失敗,重試間隔為10s。
restart-strategy: failure-raterestart-strategy.failure-rate.max-failures-per-interval: 3restart-strategy.failure-rate.failure-rate-interval: 5 minrestart-strategy.failure-rate.delay: 10 s
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per interval Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay));
三、重新開機政策代碼示範
這裡還是從Socket端接收資料,然後進行單詞統計,主函數代碼基本不變,收到"error"開頭的資料,直接抛出異常,各種情況修改Checkpoint的相關設定進行功能示範:
1.第一種情況,不設定Checkpoint,預設是無重新開機政策,代碼如下:
package com.hadoop.ljs.flink110.checkpoint;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest { public static void main(String[] args) throws Exception { /*初始化Flink流處理環境*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*從socket接收資料*/ DataStream<String> source = senv.socketTextStream("localhost", 9000); DataStream<String> filterData=source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if(null==value||"".equals(value)){ return false; } return true; } }); DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { /*收到error開頭的資料 手動抛出異常 進行測試*/ if(value.startsWith("error")){ throw new Exception("收到error,報錯!!!"); } String[] lines = value.split(","); for(int i=0;i<lines.length;i++){ Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1); out.collect(wordTmp); } } }); DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1); wordCount.print(); senv.execute(); }}
這裡收到error資訊,抛出異常,無重新開機政策情況下,整個job直接挂掉。
2.第二種情況,啟用Checkpoint,采用固定時間間隔重新開機政策,代碼如下:
package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest { public static void main(String[] args) throws Exception { /*初始化Flink流處理環境*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*每隔10S做一次checkpoint*/ senv.enableCheckpointing(10000); /*采用固定時間間隔重新開機政策,設定重新開機次數和重新開機時間間隔*/ senv.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,// 嘗試重新開機的次數 Time.seconds(2) //重新開機時間間隔 )); /*從socket接收資料*/ DataStream<String> source = senv.socketTextStream("localhost", 9000); DataStream<String> filterData=source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if(null==value||"".equals(value)){ return false; } return true; } }); DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { /*收到error開頭的資料 手動抛出異常 進行測試*/ if(value.startsWith("error")){ throw new Exception("收到error,報錯!!!"); } String[] lines = value.split(","); for(int i=0;i<lines.length;i++){ Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1); out.collect(wordTmp); } } }); DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1); wordCount.print(); senv.execute(); }}
設定了重新開機次數3次,重新開機時間間隔2秒,這裡我發送前3次error,主程式自動重新開機不會挂掉,當我重新發送第4次error的時候主程式達到重新開機上限,直接挂掉了,如圖所示:
3.第三種情況,啟用Checkpoint,設定失敗率重新開機政策,并指定2分鐘時間内,最大失敗次數為3次,重新開機時間間隔10秒,代碼如下:
package com.hadoop.ljs.flink110.checkpoint;import akka.remote.WireFormats;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-04-12 09:26 * @version: v1.0 * @description: com.hadoop.ljs.flink110.checkpoint */public class CheckpointTest { public static void main(String[] args) throws Exception { /*初始化Flink流處理環境*/ StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); /*每隔10S做一次checkpoint*/ senv.enableCheckpointing(10000); /* 設定失敗率重新開機政策,下面設定2分鐘内 最大失敗次數3次 重試時間間隔10秒*/ senv.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // Time.minutes(2), //time interval for measuring failure rate Time.seconds(10) // delay )); /*從socket接收資料*/ DataStream<String> source = senv.socketTextStream("localhost", 9000); DataStream<String> filterData=source.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { if(null==value||"".equals(value)){ return false; } return true; } }); DataStream<Tuple2<String, Integer>> wordOne = filterData.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { /*收到error開頭的資料 手動抛出異常 進行測試*/ if(value.startsWith("error")){ throw new Exception("收到error,報錯!!!"); } String[] lines = value.split(","); for(int i=0;i<lines.length;i++){ Tuple2<String,Integer> wordTmp=new Tuple2<>(lines[i],1); out.collect(wordTmp); } } }); DataStream<Tuple2<String, Integer>> wordCount = wordOne.keyBy(0).sum(1); wordCount.print(); senv.execute(); }}
這裡使用失敗率重新開機政策,2分鐘内,收到第三次error時,主程式達到失敗次數,直接挂掉。