1. 算子状态分类
算子状态的作用范围限定为算子并行子任务。这意味着由同一并行子任务所处理的所有数据都可以访问到相同的状态,状态对于同一子任务而言是共享的。算子状态不能由相同或不同算子的另一个并行子任务访问。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISPrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdsATOfd3bkFGazxCMx8VesATMfhHLlN3XnxCMwEzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5CZzYWMhRjZiJmZ3gTOhRTY2kzN1IWMmVWZ1cTZ1AzYy8CX0EzLcRDMxIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLzM3Lc9CX6MHc0RHaiojIsJye.png)
Flink为算子状态提供三种基本数据结构,主要介绍当并行度改变(扩缩容)时,从保存点重新启动时,算子状态如何分配:
- 列表状态(List state):将状态表示为一组数据的列表。
带有算子列表状态的算子在扩缩容时会对列表中的条目进行重新分配。理论上,所有并行算子任务的列表条目会被统一收集起来,随后均匀分配到更少或更多的任务之上。如果列表条目的数量小于算子新设置的并行度,部分任务在启动时的状态就可能为空。
- 联合列表状态(Union list state) 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障从保存点(savepoint)启动应用程序时进行恢复,如果并行度发生改变,带有算子联合列表状态的算子会在扩缩容时把状态列表的全部条目广播到全部任务上,随后由任务自己决定哪些条目应该保留,哪些应该丢弃。
- 广播状态(Broadcast state):不同于普通的算子状态,每个并行子任务的状态相同。但是仍然是每个并行子任务访问自己的状态,但是状态都是一样的。 如果一个算子有多项任务,而它的每个并行子任务状态又都相同,那么这种特殊情况最适合应用广播状态。
2.算子状态的使用
public class StateTest1_OperatorState {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// socket文本流
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个有状态的map操作,统计当前分区数据个数
SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());
resultStream.print();
env.execute();
}
// 自定义MapFunction
public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{
// 定义一个本地变量,作为算子状态
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count++;
return count;
}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
for( Integer num: state )
count += num;
}
}
}
- 算子状态的定义和普通的成员变量定义相同,但是对应的算子处理函数要继承对应的接口,例如ListCheckpointed,自定义状态进行快照和恢复的逻辑。