轉載自:https://blog.csdn.net/shenshouniu/article/details/84558874
你可能感興趣的文章:
Flink元件和邏輯計劃
Flink執行計劃生成
JobManager中的基本元件(1)
JobManager中的基本元件(2)
JobManager中的基本元件(3)
TaskManager
算子
網絡
水印WaterMark
CheckPoint
任務排程與負載均衡
異常處理
Alibaba Blink新特性
-
大資料成神之路系列:
Java進階特性增強-集合
Java進階特性增強-多線程
Java進階特性增強-Synchronized
Java進階特性增強-volatile
Java進階特性增強-并發集合架構
Java進階特性增強-分布式
Java進階特性增強-Zookeeper
Java進階特性增強-JVM
Java進階特性增強-NIO
Java進階特性增強-Netty
1 何為狀态
- 在批處理過程中,資料是劃分為塊分片去完成的,然後每一個Task去處理一個分片。當分片執行完成後,把輸出聚合起來就是最終的結果。在這個過程當中,對于state的需求還是比較小的。
- 在流計算過程中,對State有非常高的要求,因為在流系統中輸入是一個無限制的流,會持續運作從不間斷。在這個過程當中,就需要将狀态資料很好的管理起來。
- Flink的失敗恢複依賴于“檢查點機制+可部分重發的資料源”。
- 檢查點機制:檢查點定期觸發,産生快照,快照中記錄了(1)目前檢查點開始時資料源(例如Kafka)中消息的offset,(2)記錄了所有有狀态的operator目前的狀态資訊(例如sum中的數值)。
- 可部分重發的資料源:Flink選擇最近完成的檢查點K。然後系統重放整個分布式的資料流,然後給予每個operator他們在檢查點k快照中的狀态。資料源被設定為從位置Sk開始重新讀取流。例如在Apache Kafka中,那意味着告訴消費者從偏移量Sk開始重新消費。
- Flink中有兩種基本類型的State,即Keyed State和Operator State。
- State可以被記錄,在失敗的情況下資料還可以恢複
一句話的事兒:state一般指一個具體的task/operator的狀态【state資料預設儲存在java的堆記憶體中】
2 檢查點Checkpoint 與Barrier
一句話的事兒: checkpoint【可以了解為checkpoint是把state資料持久化存儲了】,則表示了一個Flink Job在一個特定時刻的一份全局狀态快照,即包含了所有task/operator的狀态
為了保證state的容錯性,Flink需要對state進行checkpoint。
Checkpoint是Flink實作容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀态來生成快照,進而将這些狀态資料定期持久化存儲下來,當Flink程式一旦意外崩潰時,重新運作程式時可以有選擇地從這些快照進行恢複,進而修正因為故障帶來的程式資料異常
Flink的checkpoint機制可以與(stream和state)的持久化存儲互動的前提是:
持久化的source(如kafka),它需要支援在一定時間内重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或檔案系統(比如HDFS,S3,GFS等)
用于state的持久化存儲,例如分布式檔案系統(比如HDFS,S3,GFS等)
Flink的檢查點機制實作了标準的Chandy-Lamport算法,并用來實作分布式快照。在分布式快照當中,有一個核心的元素:Barrier。
-
單流的barrier:
1: 屏障作為資料流的一部分随着記錄被注入到資料流中。屏障永遠不會趕超通常的流記錄,它會嚴格遵循順序。
2: 屏障将資料流中的記錄隔離成一系列的記錄集合,并将一些集合中的資料加入到目前的快照中,而另一些資料加入到下一個快照中。
3: 每一個屏障攜帶着快照的ID,快照記錄着ID并且将其放在快照資料的前面。
4: 屏障不會中斷流處理,是以非常輕量級。
-
并行barrier
1:不止一個輸入流的時的operator,需要在快照屏障上對齊(align)輸入流,才會發射出去。
2:可以看到1,2,3會一直放在Input buffer,直到另一個輸入流的快照到達Operator。
3 有狀态的Operator工作一覽圖
Stateful Flink applications are optimized for local state access. Task state
is always maintained in memory or, if the state size exceeds the available memory,
in access-efficient on-disk data structures. Hence, tasks perform all computations
by accessing local, often in-memory, state yielding very low processing latencies.
Flink guarantees exactly-once state consistency in case of failures by periodically
and asynchronously checkpointing the local state to durable storage.
- 1
- 2
- 3
- 4
- 5
- 6
4 狀态管理
4.1 原始狀态與托管狀态
Keyed State和Operator State,可以以兩種形式存在:
- 原始狀态(raw state)
- 托管狀态(managed state)
- 托管狀态是由Flink架構管理的狀态
- 原始狀态,由使用者自行管理狀态具體的資料結構,架構在做checkpoint的時候,使用byte[]來讀寫狀态内容,對其内部資料結構一無所知。
- 通常在DataStream上的狀态推薦使用托管的狀态。
- 當實作一個使用者自定義的operator時,會使用到原始狀态
4.2 State-Keyed State 是什麼?直接上幹貨。(兄弟 State-Operator State 與key無關)
-
顧名思義,就是基于KeyedStream上的狀态。這個狀态是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。
stream.keyBy(…)
-
state的資料結構;
(1) ValueState:即類型為T的單值狀态。這個狀态與對應的key綁定,是最簡單的狀态了。它可以通過update方法更新狀态值,通過value()方法擷取狀态值
(2) ListState:即key上的狀态值為一個清單。可以通過add方法往清單中附加值;也可以通過get()方法傳回一個Iterable來周遊狀态值
(3) ReducingState:這種狀态通過使用者傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最後合并到一個單一的狀态值
(4) MapState<UK, UV>:即狀态值為一個map。使用者通過put或putAll方法添加元素
- 需要注意的是,以上所述的State對象,僅僅用于與狀态進行互動(更新、删除、清空等),而真正的狀态值,有可能是存在記憶體、磁盤、或者其他分布式存儲系統中。相當于我們隻是持有了這個狀态的句柄。實際上:這些狀态有三種存儲方式:
MemoryStateBackend: FsStateBackend RockDBStateBackend
4.3 State-Keyed State 存儲方式?直接上幹貨
-
MemoryStateBackend
state資料儲存在java堆記憶體中,執行checkpoint的時候,會把state的快照資料儲存到jobmanager的記憶體中
基于記憶體的state backend在生産環境下不建議使用。
-
FsStateBackend
state資料儲存在taskmanager的記憶體中,執行checkpoint的時候,會把state的快照資料儲存到配置的檔案系統中,可以使用hdfs等分布式檔案系統。
-
RocksDBStateBackend
RocksDB跟上面的都略有不同,它會在本地檔案系統中維護狀态,state會直接寫入本地rocksdb中。同時RocksDB需要配置一個遠端的filesystem。
uri(一般是HDFS),在做checkpoint的時候,會把本地的資料直接複制到filesystem中。fail over的時候從filesystem中恢複到本地。
RocksDB克服了state受記憶體限制的缺點,同時又能夠持久化到遠端檔案系統中,比較适合在生産中使用
4.4 State 生成快照
4.5 State 快照恢複
5 與Key相關的狀态管理案例實戰(以Key分組進行狀态管理)
5.1 RichFlatMapFunction 核心代碼奉上
package xuwei.tech.streaming;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
/**
-
qinkaixin 2018 11 24
*/
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
The ValueState handle. The first field is the count, the second field a running sum.
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value Tuple2<Long, Long> currentSum = sum.value(); // update the count currentSum.f0 += 1; // add the second field of the input value currentSum.f1 += input.f1; // update the state sum.update(currentSum); // if the count reaches 2, emit the average and clear the state if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); }
}
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
“average”,
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
-
5.2 RichFlatMapFunction 執行操作
public static void main(String[] args) throws Exception{
//擷取Flink的運作環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
env.execute("StafulOperator");
System.out.println("***********");
}
5.3 最終結果為什麼是這樣的?
- if the count reaches 2, emit the average and clear the state
- 是以Tuple2.of(1L, 3L), Tuple2.of(1L, 5L) 一組
- 是以Tuple2.of(1L, 7L),Tuple2.of(1L, 4L)一組
6 與Operator相關的State案例實戰
- 與Key無關的State,與Operator綁定的state,整個operator隻對應一個state
- 儲存Operator state的資料結構為ListState
- 舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector執行個體中,儲存該執行個體中消費topic的所有(partition, offset)映射
- 繼承CheckpointedFunction,實作snapshotState和restoreState。
To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface.
Whenever a checkpoint has to be performed, snapshotState() is called. The
counterpart,initializeState(), is called every time the user-defined function
is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this,
initializeState() is not only the place where different types of state are
initialized, but also where state recovery
logic is included.
6.1 BufferingSink案例
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
6.2 Stateful Source案例
public static class CounterSource extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
/** current offset for exactly once semantics */
private Long offset;
/** flag for job cancellation */
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
7 checkPoint的配置進一步升華
7.1 checkpoint 開關
- 預設checkpoint功能是disabled的,想要使用的時候需要先啟用
- checkpoint開啟之後,預設的checkPointMode是Exactly-once
- checkpoint的checkPointMode有兩種,Exactly-once和At-least-once
- Exactly-once對于大多數應用來說是最合适的。At-least-once可能用在某些延遲超低的應用程式(始終延遲為幾毫秒)
7.2 checkpoint 調優配置(Cancel處理很有意思)
-
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進行啟動一個檢查點【設定checkpoint的周期】
env.enableCheckpointing(1000);
// 進階選項:
// 設定模式為exactly-once (這是預設值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確定檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘内完成,或者被丢棄【checkpoint的逾時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間隻允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
cancel處理選項:
(1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:
表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定
的Checkpoint
(2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:
表示一旦Flink處理程式被cancel後,會删除Checkpoint資料,隻有job執行失敗的時候才會
儲存checkpoint
8 State Backend 狀态的後端存儲(一劍封喉)
8.1 配置說明
修改State Backend的兩種方式
- 第一種:單任務調整
修改目前任務代碼 env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints")); 或者new MemoryStateBackend() 或者new RocksDBStateBackend( hdfs->url, true);【需要添加第三方依賴】
- 第二種:全局調整
修改flink-conf.yaml state.backend: filesystem state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints 注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
8.2 精彩案例實戰
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCountJavaCheckPoint {
public static void main(String[] args) throws Exception{
//擷取需要的端口号
int port;
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port");
}catch (Exception e){
System.err.println("No port set. use default port 9000--java");
port = 9010;
}
//擷取flink的運作環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進行啟動一個檢查點【設定checkpoint的周期】
env.enableCheckpointing(1000);
// 進階選項:
// 設定模式為exactly-once (這是預設值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確定檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘内完成,或者被丢棄【checkpoint的逾時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間隻允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint【詳細解釋見備注】
//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程式被cancel後,會保留Checkpoint資料,以便根據實際需要恢複到指定的Checkpoint
//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程式被cancel後,會删除Checkpoint資料,隻有job執行失敗的時候才會儲存checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//設定statebackend
//env.setStateBackend(new MemoryStateBackend());
//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
String hostname = "SparkMaster";
String delimiter = "\n";
//連接配接socket擷取輸入的資料
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
// a a c
// a 1
// a 1
// c 1
DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new WordWithCount(word, 1L));
}
}
}).keyBy("word")
.timeWindow(Time.seconds(2), Time.seconds(1))//指定時間視窗大小為2秒,指定時間間隔為1秒
.sum("count");//在這裡使用sum或者reduce都可以
/*.reduce(new ReduceFunction<WordWithCount>() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
return new WordWithCount(a.word,a.count+b.count);
}
})*/
//把資料列印到控制台并且設定并行度
windowCounts.print().setParallelism(1);
//這一行代碼一定要實作,否則程式不執行
env.execute("Socket window count");
}
public static class WordWithCount{
public String word;
public long count;
public WordWithCount(){}
public WordWithCount(String word,long count){
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "作者 : 秦凱新 , 窗大小2秒,滑動1秒 {" +
" word='" + word + '\'' +
", count=" + count +
'}';
}
}