Flink的狀态管理
- Flink中的狀态
- 狀态的分類
-
- Operator State
- Keyed State
- 狀态後端(State Backends)
Flink中的狀态
- 什麼是狀态?
【Flink】Flink的狀态管理 由一個任務維護,并且用來計算某個結果的所有資料,都屬于這個任務的狀态
可以認為狀态就是一個本地變量,可以被任務的業務邏輯通路
Flink 會進行狀态管理,包括狀态一緻性、故障處理以及高效存儲和通路,以便開發人員可以專注于應用程式的邏輯
- 有些算子有些任務是沒有狀态的,如map操作,隻跟輸入資料有關。像視窗操作不管是增量視窗函數還是全視窗函數都要保持裡面的資訊的,一開始在視窗到達結束時間之前是不輸出資料的,是以最後輸出資料的時候,他的計算是要依賴之前的,全視窗可以認為是把所有資料都作為狀态儲存下來。增量聚合視窗來一個聚合一次要儲存的是中間聚合狀态。像ProcessFunction可以有狀态也可以沒有狀态。
- 無狀态流處理和有狀态流處理的主要差別:無狀态流處理分别接收每條輸入資料,根據最新輸入的資料生成輸出資料;有狀态流處理會維護狀态,根據每條輸入記錄進行更新,并基于最新輸入的記錄和目前的狀态值生成輸出記錄,即綜合考慮多個事件之後的結果。
跳轉頂部
狀态的分類
Operator State
- 每個算子狀态綁定到一個并行算子執行個體,作用範圍限定為算子任務,同一并行任務的狀态是共享的,并行處理的所有資料都可以通路到相同的狀态。Kafka Connector就是使用算子狀态的很好的一個例子,Kafka consumer的每個并行執行個體都維護一個主題分區和偏移,作為算子狀态。當并行性發生變化時,算子狀态接口支援在并行運算符執行個體之間重新配置設定狀态。可以有不同的方案來進行這種再配置設定。
- 因為同一個并行任務處理的所有資料都可以通路到目前的狀态,是以就相當于本地變量
- 算子狀态有3種基本資料結構:①清單狀态(List state):狀态表示為一組資料的清單②聯合清單狀态(Union list state):也将狀态表示為資料的清單。它與正常清單狀态的差別在于,在發生故障時,或者從儲存點(savepoint)啟動應用程式時如何恢複。③廣播狀态(Broadcast state):如果一個算子有多項任務,而它的每項任務狀态又都相同,那麼這種特殊情況最适合應用廣播狀态。那就可以通路到别的并行子任務的狀态。
- 算子狀态運用的時候可能應用場景沒那麼多,一般都是keyby之後根據不同的key做分區讨論。如果所有資料來了全部統一處理的話一般還要劃分成不同的狀态要儲存為連結清單,并行度調整的時候可以根據這個清單拆開,做進一步調整。
-
算子狀态資料結構
1️⃣:清單狀态(List state):将狀态表示為一組資料的清單
2️⃣:聯合清單狀态(Union list state):也将狀态表示為資料的清單。它與正常清單狀态的差別在于,在發生故障時,或者從儲存點(savepoint)啟動應用程式時如何恢複
3️⃣:廣播狀态(Broadcast state):如果一個算子有多項任務,而它的每項任務狀态又都相同,那麼這種特殊情況最适合應用廣播狀态。
4️⃣:聯合清單狀态與清單狀态的差別:主要是并行度調整狀态怎樣重新配置設定,清單狀态本身配置設定的時候直接配置設定;聯合清單狀态的話就是把所有元素都聯合起來,然後由每個任務自己定義最後留下哪些,也就是自己截取要哪一部分。
-
編寫代碼統計出每個分區内的資料
資料展示
自定義類sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547728199,25.8 sensor_6,1547712201,35.4 sensor_7,1547718102,16.7 sensor_10,1547712205,28.1
package beans;
/**
* 傳感器溫度讀數的資料類型
*/
public class SenSorReading {
private String id;
private Long timeStamp;
private Double temperature;
public SenSorReading() {
}
public SenSorReading(String id, Long timeStamp, Double temperature) {
this.id = id;
this.timeStamp = timeStamp;
this.temperature = temperature;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(Long timeStamp) {
this.timeStamp = timeStamp;
}
public Double getTemperature() {
return temperature;
}
public void setTemperature(Double temperature) {
this.temperature = temperature;
}
@Override
public String toString() {
return "SenSorReading{" +
"id='" + id + '\'' +
", timeStamp=" + timeStamp +
", temperature=" + temperature +
'}';
}
}
程式代碼實作
package State;
import beans.SenSorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Collections;
import java.util.List;
public class StateTest01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("src/main/resources/sensor.txt");
SingleOutputStreamOperator<SenSorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SenSorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//定義一個有狀态Map操作,統計目前分區資料個數
SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());
resultStream.print();
env.execute();
}
//自定義MapFunction
public static class MyCountMapper implements MapFunction<SenSorReading, Integer>, ListCheckpointed<Integer> {
//第一一個本地變量,作為算子狀态
private Integer count = 0;
@Override
public Integer map(SenSorReading senSorReading) throws Exception {
count++;
return count;
}
/**
* 對狀态做快照
*
* @param l
* @param l1
* @return
* @throws Exception
*/
@Override
public List<Integer> snapshotState(long l, long l1) throws Exception {
return Collections.singletonList(count);
}
/**
* 發生故障時
*
* @param list
* @throws Exception
*/
@Override
public void restoreState(List<Integer> list) throws Exception {
for (Integer num : list) {
count += num;
}
}
}
}
結果展示
跳轉頂部
Keyed State
- 鍵控狀态是根據輸入資料流中定義的鍵(key)來維護和通路的
- Flink 為每個 key 維護一個狀态執行個體,并将具有相同鍵的所有資料,都分區到同一個算子任務中,這個任務會維護和處理這個 key 對應的狀态
- 當任務處理一條資料時,它會自動将狀态的通路範圍限定為目前資料的 key
- 鍵控狀态資料結構
【Flink】Flink的狀态管理 - 寫出相同key的個數
package State;
import beans.SenSorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 隻針對key生效
* 比如本體的計算key相同的個數
*/
public class State_KeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.readTextFile("src/main/resources/sensor.txt");
SingleOutputStreamOperator<SenSorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SenSorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//定義一個有狀态Map操作,統計目前sensor資料個數
SingleOutputStreamOperator<Integer> resultStream = dataStream.keyBy("id")
.map(new MyKeyCountMapper());
resultStream.print();
env.execute();
}
//自定義RichMapFunction
public static class MyKeyCountMapper extends RichMapFunction<SenSorReading, Integer> {
private ValueState<Integer> keyCountState;
//其他類型狀态的聲明
// private ListState<String> myListState;
// private MapState<String, Double> myMapState;
//
// private ReducingState<SenSorReading> myReducing;
//getRuntimeContext是在open方法之後才生成的
@Override
public void open(Configuration parameters) throws Exception {
//三個參數分别是名稱、資料類型和初始值
keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class, 0));
// myListState = getRuntimeContext().getListState(new ListStateDescriptor<String>("my-list", String.class));
// myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("my-map", String.class, Double.class));
// myReducing = getRuntimeContext().getMapState(new ReducingStateDescriptor<SenSorReading>("my-reducing", SenSorReading.class));
}
@Override
public Integer map(SenSorReading senSorReading) throws Exception {
Integer count = keyCountState.value();
count++;
keyCountState.update(count);
//其他狀态的api調用
// Iterable<String> strings = myListState.get();
// myListState.add("hello");
//
// myMapState.put("a", 1.1);
//
// myReducing.add(senSorReading);
return count;
}
}
}
- 結果展示
【Flink】Flink的狀态管理
跳轉頂部
狀态後端(State Backends)
- 每傳入一條資料,有狀态的算子任務都會讀取和更新狀态
- 由于有效的狀态通路對于處理資料的低延遲至關重要,是以每個并行任務都會在本地維護其狀态,以確定快速的狀态通路
- 狀态的存儲、通路以及維護,由一個可插入的元件決定,這個元件就叫做狀态後端(state backend)
- 狀态後端主要負責兩件事:本地的狀态管理,以及将檢查點(checkpoint)狀态寫入遠端存儲
-
狀态後端的分類
1️⃣:MemoryStateBackend:記憶體級的狀态後端,會将鍵控狀态作為記憶體中的對象進行管理,将它們存儲在TaskManager 的 JVM 堆上,而将 checkpoint 存儲在 JobManager 的記憶體中,特點:快速、低延遲,但不穩定
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//狀态後端的配置
env.setStateBackend(new MemoryStateBackend());//存到記憶體
env.setStateBackend(new FsStateBackend(""));//存儲到檔案
env.setStateBackend(new RocksDBStateBackend(""));//序列化後存儲到本地的RocksDB