1 概述
我們都知道,使用Scala或者Java寫代碼的時候可以配置Flink Checkpoint:
val env = StreamExecutionEnvironment.getExecutionEnvironment .enableCheckpointing(5 * 60 * 1000)val checkpointConfig = env.getCheckpointConfigcheckpointConfig.setMinPauseBetweenCheckpoints(2 * 60 * 1000)checkpointConfig.setCheckpointTimeout(3 * 60 * 1000)checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
但這對于其他并不善于寫代碼的同僚來說是很麻煩的事情,難以維護。我們使用Flink Sql + Zeppelin不就是想盡可能地幹掉代碼,使用純SQL+配置嗎?
好在Flink已經支援了Checkpoint相關配置,接下來開始介紹。
# Zeppelin配置Flink Checkpoint
%flink.conf# 開啟Checkpoint,指定兩次checkpoint開始排程之間的間隔,機關毫秒# 當然,還會受到checkpoint并發數和min-pause影響execution.checkpointing.interval 120000# 開始下次Checkpoint時距離上一次Checkpoint完成後的最小時間間隔,機關毫秒execution.checkpointing.min-pause 60000# 如果某次Checkpoint超過此門檻值還沒完成,則将進行中的Checkpoint幹掉廢棄,機關毫秒execution.checkpointing.timeout 60000# 當Cancel該job時也保留 Checkpoint,用于作業手動重新開機# 此模式下我們必須在Cancel後需要手動删除Checkpoint檔案。execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION# 從Checkpoint或Savepoint恢複時使用# execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1
這樣就配置好了Checkpoint。
關于Checkpoint更多詳細配置請參考Flink官網:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
# 從Checkpoint恢複
1. 如果要準備修改作業需要重新開機,則先在Flink UI中記錄下最新的Checkpoint位址:
2. 随後在Zeppelin中暫停該任務:
3. 修改完成後,請務必在%flink.conf中加入以下配置
execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1
當然,路徑換成我們剛才記錄的。
4. 重新開機該Notebook的Flink Interpreter,随後重新運作%flink.conf使得新配置生效。
5. 最後,在Zeppelin重新送出該Flink任務,可觀察到該任務從Checkpoint恢複:
# 更多Flink On Zeppelin内容
我個人寫的FlinkSql On Zeppelin 部落格
https://blog.csdn.net/baichoufei90/article/details/105294787
如果有碰到任何問題,請加入下面這個釘釘群讨論。