天天看点

Flink状态与容错【状态生存时间TTL,清理过期状态】

一.状态生存时间(TTL)

任何类型的键控状态都可以配置状态的生存期(TTL)。如果配置了TTL且状态值已过期,则将尽力清除存储的值。

所有状态收集类型均支持按条目TTL。这意味着列表元素和映射条目独立过期。

为了使用状态TTL,必须首先构建一个StateTtlConfig配置对象。然后可以通过传递配置在任何状态描述符中启用TTL功能:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
    
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
           

该配置有几个选项可供考虑:

该newBuilder方法的第一个参数是强制性的,它是生存时间值。

更新类型配置何时刷新状态TTL(默认为OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite -仅在创建和写访问权限时
  • StateTtlConfig.UpdateType.OnReadAndWrite -也具有读取权限

状态可见性用于配置是否清除尚未过期的默认值(默认情况下NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired -永不返回过期值
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp -如果仍然可用,则返回

在NeverReturnExpired的情况下,即使仍必须删除过期状态,其行为也好像不再存在一样。对于严格在TTL之后数据必须变得不可用于读取访问的用例,例如使用隐私敏感数据的应用程序,该选项很有用。

另一个选项ReturnExpiredIfNotCleanedUp允许在清理之前返回过期状态。

笔记:

  • 状态后端与用户值一起存储上次修改的时间戳,这意味着启用此功能会增加状态存储的消耗。堆状态后端将附加的Java对象与对用户状态对象的引用以及原始的long值一起存储在内存中。RocksDB状态后端为每个存储的值,列表条目或映射条目添加8个字节。
  • 当前仅支持有关处理时间的 TTL 。
  • 尝试使用启用了TTL的描述符恢复状态(以前没有配置TTL),反之亦然,这将导致兼容性失败和StateMigrationException。
  • TTL配置不是检查点或保存点的一部分,而是Flink在当前正在运行的作业中如何对待它的一种方式。
  • 仅当用户值序列化程序可以处理空值时,带有TTL的映射状态当前才支持空用户值。如果序列化器不支持空值,则可以用NullableSerializer序列化形式包装一个额外的字节来包装它。

二.清理过期状态

默认情况下,过期的值会在读取时显式删除,例如ValueState#value,并且如果配置的状态后端支持,则会定期在后台收集垃圾。可以在以下位置禁用后台清理StateTtlConfig:

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground
    .build
           

要对后台的某些特殊清理进行更细粒度的控制,可以按如下所述分别进行配置。当前,堆状态后端依赖于增量清理,而RocksDB后端使用压缩过滤器进行后台清理。

三.完整快照中的清理

此外,可以在拍摄完整状态快照时激活清除操作,这将减小其大小。在当前实现下不会清除本地状态,但是如果从先前的快照还原,则不会包括已删除的过期状态。可以在StateTtlConfig以下位置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build
           

此选项不适用于RocksDB状态后端中的增量检查点。

笔记:

  • 对于现有作业,可以在StateTtlConfig中的任何时间(例如,从保存点重新启动后)激活或取消激活此清除策略。

四.增量清理

另一个选择是逐步触发某些状态条目的清除。触发器可以是来自每个状态访问或每个记录处理的回调。如果此清理策略在特定状态下处于活动状态,则存储后端将在其所有条目上为此状态保留一个惰性全局迭代器。每次触发增量清理时,迭代器都会前进。检查遍历的状态条目,并清理过期的条目。

可以在StateTtlConfig以下位置配置此功能:

import org.apache.flink.api.common.state.StateTtlConfig;
 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();
           

此策略有两个参数。第一个是每个清除触发的已检查状态条目数。它总是在每个状态访问时触发。第二个参数定义是否在每个记录处理中额外触发清理。堆后端的默认后台清理会检查5个条目,而每个记录处理不会进行清理。

笔记:

  • 如果对该状态没有访问权限或未处理任何记录,则过期状态将继续存在。
  • 用于增量清理的时间会增加记录处理的延迟。
  • 目前,仅针对堆状态后端实施了增量清理。为RocksDB设置它不会生效。
  • 如果将堆状态后端与同步快照一起使用,则全局迭代器将在迭代时保留所有键的副本,这是因为其特定的实现不支持并发修改。启用此功能将增加内存消耗。异步快照没有此问题。

    对于现有作业,可以在StateTtlConfig中的任何时间(例如,从保存点重新启动后)激活或取消激活此清除策略。

五.RocksDB压缩期间的清理

如果使用RocksDB状态后端,则将调用Flink特定的压缩过滤器进行后台清理。RocksDB定期运行异步压缩以合并状态更新并减少存储。Flink压缩过滤器使用TTL检查状态条目的到期时间戳记,并排除到期值。

可以在StateTtlConfig以下位置配置此功能:

import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();
           

RocksDB压缩过滤器每次处理一定数量的状态条目后,都会从Flink查询当前时间戳,以检查到期时间。可以更改它,并将自定义值传递给 cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)方法。更频繁地更新时间戳可以提高清除速度,但由于使用本地代码中的JNI调用,因此会降低压缩性能。每次处理1000个条目时,RocksDB后端的默认后台清理都会查询当前时间戳。

可以通过激活以下级别的调试级别来从RocksDB过滤器的本机代码中激活调试日志FlinkCompactionFilter:

log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
           

笔记:

  • 压缩期间调用TTL过滤器会使速度变慢。TTL过滤器必须解析上次访问的时间戳,并针对每个要压缩的键的每个存储状态条目检查其到期时间。如果是收集状态类型(列表或映射),则还将针对每个存储的元素调用检查。
  • 如果此功能与列表状态一起使用,该列表状态具有不固定字节长度的元素,则本机TTL筛选器必须至少在每个第一个状态元素过期的每个状态项上,通过JNI额外调用该元素的Flink Java类型序列化器。确定下一个未过期元素的偏移量。
  • 对于现有作业,可以在StateTtlConfig中的任何时间(例如,从保存点重新启动后)激活或取消激活此清除策略。

六.Scala DataStream API中的状态

除了上述接口之外,Scala API还具有用于状态map()或flatMap()的一个值状态快捷方式 KeyedStream。用户函数获取ValueState的当前值并且必须返回一个更新后的值用于更新状态。

val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })
           

七.使用托管操作状态

要使用托管操作状态,有状态功能可以实现更通用的CheckpointedFunction 接口,也可以实现ListCheckpointed接口。

该CheckpointedFunction接口提供对具有不同重新分配方案的非键控状态的访问。它需要实现两种方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
           

每当必须执行检查点时,都会snapshotState()被调用。每次初始化用户定义的函数时,对应的initializeState()都会被调用,该函数在第一次初始化用户定义函数以及检查点恢复时都会被调用。因此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑的地方。

当前,支持列表样式的托管操作符状态。状态应为彼此独立List的可序列化对象,因此有资格在重新缩放后进行重新分配。换句话说,这些对象是可以重新分配非密钥状态的最佳粒度。根据状态访问方法,定义了以下重新分配方案:

  • 偶数拆分的重新分配:每个运算符都返回一个状态元素列表。从逻辑上讲,整个状态是所有列表的串联。在还原/重新分发时,该列表被平均分为与并行运算符一样多的子列表。每个运算符都有一个子列表,该子列表可以为空,也可以包含一个或多个元素。例如,如果并行度为1,运算符的检查点状态包含元素element1和element2,在将并行度增加为2时,element1可能会以运算符实例0结束,而element2将转到运算符实例1。
  • 联合重新分配:每个运算符都返回一个状态元素列表。从逻辑上讲,整个状态是所有列表的串联。在还原/重新分发时,每个操作都会获得状态元素的完整列表。

下面是一个有状态的示例,该状态SinkFunction用于CheckpointedFunction 在将元素发送到外界之前先对其进行缓冲。它演示了基本的偶数拆分再分配列表状态:

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}
           

该initializeState方法以FunctionInitializationContext作为参数。这用于初始化非键控状态容器。这些是类型的容器,ListState在检查点上将存储非键状态对象。

请注意,与键控状态类似,如何使用StateDescriptor包含状态名称和有关该状态保存的值的类型的信息来初始化状态,类似于键控状态:

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);
           

状态访问方法的命名约定包含其重新分配模式及其状态结构。例如,要在恢复时将列表状态与联合重新分配方案一起使用,请使用来访问状态getUnionListState(descriptor)。如果方法名称不包含重新分配模式 getListState(descriptor),则仅表示将使用基本的偶数拆分重新分配方案。

初始化容器后,我们使用isRestored()上下文的方法检查失败后是否要恢复。如果为true,即我们正在恢复,则将应用还原逻辑。

如修改的代码为BufferingSink,这ListState期间状态初始化被保持,以备将来使用的一类可变回收snapshotState()。在那里,清除由先前的检查点包含的所有对象ListState,然后填充我们要设置新的检查点。

另外,也可以在initializeState()方法中初始化键控状态。可以使用提供的FunctionInitializationContext完成此操作。

该ListCheckpointed接口是CheckpointedFunction的更有限的变体,它仅支持列表样式状态以及还原时使用偶数拆分的重新分配方案。它还需要实现两种方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
           

在snapshotState()操作上应将对象列表返回到检查点,并且 restoreState在恢复时必须处理此类列表。如果状态不是重新分区,可以随时返回Collections.singletonList(MY_STATE)的snapshotState()。

八.有状态的源函数

与其他操作相比,有状态信息源需要多加注意。为了使状态和输出集合的更新成为原子性(失败/恢复时仅需一次精确的语义),要求用户从源的上下文中获取锁。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset = 0L;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}
           

继续阅读