天天看点

Apache Flink fault tolerance源码剖析(五)被支持的状态类型用户定义的键值对状态状态快照状态访问器状态终端的实现小结

上一篇文章我们谈论了保存点的相关内容,其中就谈到了保存点状态的存储。这篇文章我们来探讨用户程序状态的存储,也是在之前的文章中多次提及的<code>state backend</code>(中文暂译为状态终端)。

基于数据流API而编写的程序经常以各种各样的形式保存着状态:

窗口收集/聚合元素(这里的元素可以看作是窗口的状态)直到它们被触发

转换函数可能会使用<code>key/value</code>状态接口来存储数据

转换函数可能实现<code>Checkpointed</code>接口来让它们的本地变量受益于<code>fault tolerant</code>机制

当检查点机制工作时,上面谈到的这些状态将能够基于检查点一同持久化来保证数据不丢失并且得到可持续的恢复。那么状态在内部是如何表示及存储的呢?这依赖于状态终端的选择。

我们将从几个方面来分解状态终端的实现:

被支持的状态类型

用户定义的键值对状态

状态快照

状态访问器

状态终端实现

因为状态终端的实现内容较多,所以本文不会太过于拘泥细节,以免管中窥豹

状态相关的接口都维护在package:

org.apache.flink.api.common.state

其继承关系如图:

通过多层的继承,最终的叶子节点是被状态终端直接支持的几种状态类型,它们是:

ValueState : 单值状态

ListState : 集合状态

FoldingState : <code>folding</code>状态,for <code>FoldFunction</code>

ReducingState : <code>reducing</code>状态,for <code>ReduceFunction</code>

注意这里只定义了实现这些状态的协议接口,具体的实现本文后面会谈到

针对每一个被直接支持的状态,都有一个描述它们的状态描述符(<code>StateDescriptor</code>),来负责创建对应的状态。一个状态描述符描述状态的名称,默认值。并提供了一个抽象方法来创建状态:

上面提到的所有被直接支持的状态都有一个描述符:

从上面创建状态的方法<code>bind</code>的签名中可以看到,它依赖于参数<code>StateBackend</code>。而<code>StateBackend</code>暂且可以看作是创建状态的代理。

上面的<code>State</code>定义了特定状态的接口协议。除了上面的那些基本状态外,Flink还提供了基于键值对的用户定义的状态,它以<code>KvState</code>接口来描述,其实它才是最终结合检查点机制进行存储和恢复的状态表示。其携带多个泛型参数:

key的类型

命名空间的类型

最终存储的<code>State</code>的类型

状态描述符<code>StateDescriptor</code>的类型

管理该<code>KvState</code>的<code>AbstractStateBackend</code>的具体类型

可以简单地将其看作<code>State</code>的容器

该接口提供了一个<code>snapshot</code>方法,用于结合检查点机制提供快照支持。并返回<code>KvStateSnapshot</code>的实例来表示一个键值对状态的快照。

当然针对每种被直接支持的状态,都有<code>KvState</code>的特定实现:

<code>KvStateSnapshot</code>表示<code>KvState</code>快照的接口,它结合检查点机制提供了对状态进行恢复:

restoreState : 基于状态终端以及表示检查点的时间戳等来恢复状态

从类图关系可以看出每个针对键值状态的实现(<code>KvState</code>)都有一个内部类提供与之对应的快照实现:

<code>StateHandle</code>给<code>operator</code>提供操作状态的接口,将状态从面向存储介质的原始表示还原为对象表示。重要接口:

可以理解为状态的反序列化接口,根据给定的类加载器加载需要反序列化的类表示来还原状态。

所谓的状态终端是真正跟状态持久化介质交互的代理类。

<code>AbstractStateBackend</code>为实现状态终端提供了一个模板。主要提供了如下功能:

状态创建/获取、创建快照

基于检查点存储状态

定义检查点状态输出流

跟检查点有关的部分:

定义了创建状态检查点输出流<code>CheckpointStateOutputView</code>的接口(抽象方法),以及对检查点状态反序列化的接口。这些接口供继承者根据最终的状态终端选择进行实现。

Flink支持了三种类型的状态终端:

MemoryStateBackend

FsStateBackend

RocksDBStateBackend(第三方开发者实现,本文不进行代码分析)

它们都以<code>AbstractStateBackend</code>为模板:

如果没有进行配置,<code>MemoryStateBackend</code>将是默认的实现。

<code>MemoryStateBackend</code>在内部将数据以对象的形式保存的Java堆中。键值对状态以及窗口<code>operator</code>以<code>hash table</code>的形式存储值、触发器等。

建立在检查点的机制上,该状态终端将对状态进行快照并且将状态的快照作为检查点应答消息的一部分发送给<code>JobManager</code>(master),<code>JobManager</code>将快照存储在它的堆内存中。

<code>MemoryStateBackend</code>的限制:

每个独立状态的大小默认限制在5MB,可以在<code>MemoryStateBackend</code>的构造器中对该值进行增加

不管你将状态大小设置得有多大,它都不能大于akka的<code>frame size</code>

状态的总占用空间必须适配<code>JobManager</code>的内存空间

推荐在如下场景时使用<code>MemoryStateBackend</code>作为状态终端:

本地开发与调试模式

只存储很少状态的<code>Job</code>,例如只包含每次只处理一条记录的函数(<code>Map</code>,<code>FlatMap</code>,<code>Filter</code>…)的<code>job</code>

<code>FsStateBackend</code>采用文件系统<code>URL</code>(包含<code>type</code>,<code>address</code>,<code>path</code>)的模式进行配置。例如<code>hdfs://namenode:40010/flink/checkpoints</code>或者<code>file:///data/flink/checkpoints</code>。

<code>FsStateBackend</code>将正在处理的数据存储在<code>TaskManager</code>的内存里。结合检查点,它将状态快照写到基于配置的文件系统的文件里。而最小化元数据信息被存储在<code>JobManager</code>的内存里(如果处于高可用模式,元数据将存储在元数据检查点里)。

推荐在如下场景使用<code>FsStateBackend</code>:

具有大量状态,很大的窗口,大量键值对状态的<code>Job</code>

全程高可用模式

<code>RocksDBStateBackend</code>存储正在处理的数据到<code>RocksDB</code>数据库。而<code>RocksDB</code>被存储在<code>TaskManager</code>的数据字典里。结合检查点机制,整个<code>RocksDB</code>数据库将进行快照并被存储到配置的文件系统中。最小化的元数据被存储到<code>JobManager</code>的内存里(如果配置为高可用模式,将会保存到元数据检查点中)。

推荐在如下场景使用<code>RocksDBStateBackend</code>:

具有很大的状态,很长的窗口,大量的键值对状态的<code>Job</code>

全程高可用状态

注意,使用<code>RocksDBStateBackend</code>时,你能保存的状态仅受到磁盘可用空间的限制。因此,与<code>MemoryStateBackend</code>将状态保存在内存中进行对比,这种状态终端允许你保存非常多的状态。但这也意味着,它所能达到的最大化的吞吐量也将不及<code>MemoryStateBackend</code>。

首先来看具体的状态终端对各种状态的实现:

与此对应的<code>KvStateSnapshot</code>也拥有特定的实现:

状态的存储通常是绑定着检查点的,也就是状态会作为检查点的一部分被一同持久化。因此,它具备了<code>fault tolerance</code>的能力。这里我们分成两部分来看:<code>snapshot</code>、<code>restore</code>。

snapshot

每个最终的状态,都实现了<code>KvState</code>接口(通过间接继承抽象类<code>AbstractHeapState</code>),而实现该接口就必须实现其<code>snapshot</code>方法。这被认为是所有的最终状态都要实现其生产快照的逻辑。当然,这绝大部分逻辑都被<code>AbstractFsState</code>和<code>AbstractMemState</code>给实现了。

具体而言,<code>AbstractFsState</code>利用<code>FsStateBackend</code>创建<code>FsCheckpointStateOutputStream</code>将状态写入检查点对应的路径下(根据检查点编号)。而<code>AbstractMemState</code>则是将其写入到堆内存中(这里甚至都没有用到检查点编号)。

这里有两个状态终端定义的检查点输出流(用于最终的持久化):

restore

恢复逻辑分别实现在<code>AbstractFsStateSnapshot</code>和<code>AbstractMemStateSnapshot</code>的<code>restoreState</code>方法中。<code>restoreState</code>的逻辑基本是<code>snapshot</code>的反逻辑,将数据从特定的持久化介质中反序列化回来,并生成<code>KvState</code>对象。

本文梳理了状态终端的实现方式,由于内容较多,因此省略了一些细节实现。但从本文的分析应该基本能理清状态终端如何对状态进行持久化以及恢复。

原文发布时间为:2016-06-07

本文作者:vinoYang