1 概述
我们都知道,使用Scala或者Java写代码的时候可以配置Flink Checkpoint:
val env = StreamExecutionEnvironment.getExecutionEnvironment .enableCheckpointing(5 * 60 * 1000)val checkpointConfig = env.getCheckpointConfigcheckpointConfig.setMinPauseBetweenCheckpoints(2 * 60 * 1000)checkpointConfig.setCheckpointTimeout(3 * 60 * 1000)checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
但这对于其他并不善于写代码的同事来说是很麻烦的事情,难以维护。我们使用Flink Sql + Zeppelin不就是想尽可能地干掉代码,使用纯SQL+配置吗?
好在Flink已经支持了Checkpoint相关配置,接下来开始介绍。
# Zeppelin配置Flink Checkpoint
%flink.conf# 开启Checkpoint,指定两次checkpoint开始调度之间的间隔,单位毫秒# 当然,还会受到checkpoint并发数和min-pause影响execution.checkpointing.interval 120000# 开始下次Checkpoint时距离上一次Checkpoint完成后的最小时间间隔,单位毫秒execution.checkpointing.min-pause 60000# 如果某次Checkpoint超过此阈值还没完成,则将进行中的Checkpoint干掉作废,单位毫秒execution.checkpointing.timeout 60000# 当Cancel该job时也保留 Checkpoint,用于作业手动重启# 此模式下我们必须在Cancel后需要手动删除Checkpoint文件。execution.checkpointing.externalized-checkpoint-retention RETAIN_ON_CANCELLATION# 从Checkpoint或Savepoint恢复时使用# execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1
这样就配置好了Checkpoint。
关于Checkpoint更多详细配置请参考Flink官网:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
# 从Checkpoint恢复
1. 如果要准备修改作业需要重启,则先在Flink UI中记录下最新的Checkpoint地址:
2. 随后在Zeppelin中暂停该任务:
3. 修改完成后,请务必在%flink.conf中加入以下配置
execution.savepoint.path hdfs:/flink/flink-checkpoints/a84fccc7d3ff03f0c111bb98e176e1da/chk-1
当然,路径换成我们刚才记录的。
4. 重启该Notebook的Flink Interpreter,随后重新运行%flink.conf使得新配置生效。
5. 最后,在Zeppelin重新提交该Flink任务,可观察到该任务从Checkpoint恢复:
# 更多Flink On Zeppelin内容
我个人写的FlinkSql On Zeppelin 博客
https://blog.csdn.net/baichoufei90/article/details/105294787
如果有碰到任何问题,请加入下面这个钉钉群讨论。