天天看點

flink checkpoint 重新開機_Flink Sql on Zeppelin之Checkpoint應用

1 概述

我們都知道,使用Scala或者Java寫代碼的時候可以配置Flink Checkpoint:

val env = StreamExecutionEnvironment.getExecutionEnvironment  .enableCheckpointing(5 * 60 * 1000)val checkpointConfig = env.getCheckpointConfigcheckpointConfig.setMinPauseBetweenCheckpoints(2 * 60 * 1000)checkpointConfig.setCheckpointTimeout(3 * 60 * 1000)checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
           

但這對于其他并不善于寫代碼的同僚來說是很麻煩的事情,難以維護。我們使用Flink Sql + Zeppelin不就是想盡可能地幹掉代碼,使用純SQL+配置嗎?

好在Flink已經支援了Checkpoint相關配置,接下來開始介紹。

# Zeppelin配置Flink Checkpoint

%flink.conf# 開啟Checkpoint,指定兩次checkpoint開始排程之間的間隔,機關毫秒# 當然,還會受到checkpoint并發數和min-pause影響execution.checkpointing.interval 120000# 開始下次Checkpoint時距離上一次Checkpoint完成後的最小時間間隔,機關毫秒execution.checkpointing.min-pause 60000# 如果某次Checkpoint超過此門檻值還沒完成,則将進行中的Checkpoint幹掉廢棄,機關毫秒execution.checkpointing.timeout 60000# 當Cancel該job時也保留 Checkpoint,用于作業手動重新開機# 此模式下我們必須在Cancel後需要手動删除Checkpoint檔案。execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION# 從Checkpoint或Savepoint恢複時使用# execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1
           

這樣就配置好了Checkpoint。

關于Checkpoint更多詳細配置請參考Flink官網:

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html

# 從Checkpoint恢複

1. 如果要準備修改作業需要重新開機,則先在Flink UI中記錄下最新的Checkpoint位址:

flink checkpoint 重新開機_Flink Sql on Zeppelin之Checkpoint應用

2. 随後在Zeppelin中暫停該任務:

flink checkpoint 重新開機_Flink Sql on Zeppelin之Checkpoint應用

3. 修改完成後,請務必在%flink.conf中加入以下配置

execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1
           

當然,路徑換成我們剛才記錄的。

4. 重新開機該Notebook的Flink Interpreter,随後重新運作%flink.conf使得新配置生效。

5. 最後,在Zeppelin重新送出該Flink任務,可觀察到該任務從Checkpoint恢複:

flink checkpoint 重新開機_Flink Sql on Zeppelin之Checkpoint應用

# 更多Flink On Zeppelin内容

我個人寫的FlinkSql On Zeppelin  部落格

https://blog.csdn.net/baichoufei90/article/details/105294787

如果有碰到任何問題,請加入下面這個釘釘群讨論。

flink checkpoint 重新開機_Flink Sql on Zeppelin之Checkpoint應用