一、什麼是 checkpoint
上次發文,提到了 Flink 可以非常高效的進行有狀态流的計算,通過使用 Flink 内置的 Keyed State 和 Operator State,儲存每個算子的狀态。
預設情況下,狀态是存儲在 JVM 的堆記憶體中,如果系統中某個環節發生了錯誤,當機,這個時候所有的狀态都會丢失,并且無法恢複,會導緻整個系統的資料計算發生錯誤。
此時就需要 Checkpoint 來保障系統的容錯。Checkpoint 過程,就是把算子的狀态周期性持久化的過程。
在系統出錯後恢複時,就可以從 checkpoint 中恢複每個算子的狀态,從上次消費的地方重新開始消費和計算。進而可以做到在高效進行計算的同時還可以保證資料不丢失,隻計算一次。
二、Checkpoint 必要的兩個條件
答案是否,需要滿足以下兩個條件才能做 Checkpoint:
- 需要支援重放一定時間範圍内資料的資料源,比如:kafka 。因為容錯機制就是在任務失敗後自動從最近一次成功的 checkpoint 處恢複任務,此時需要把任務失敗前消費的資料再消費一遍。假設資料源不支援重放,那麼資料還未寫到存儲中就丢了,任務恢複後,就再也無法重新消費這部分丢了的資料了。
- 需要一個存儲來儲存持久化的狀态,如:Hdfs,本地檔案。可以在任務失敗後,從存儲中恢複 checkpoint 資料。
三、Checkpoint 參數詳解
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每 60s 做一次 checkpoint
env.enableCheckpointing(60000);
// 進階配置:
// checkpoint 語義設定為 EXACTLY_ONCE,這是預設語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 兩次 checkpoint 的間隔時間至少為 1 s,預設是 0,立即進行下一次 checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// checkpoint 必須在 60s 内結束,否則被丢棄,預設是 10 分鐘
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間隻能允許有一個 checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 最多允許 checkpoint 失敗 3 次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 當 Flink 任務取消時,保留外部儲存的 checkpoint 資訊
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 當有較新的 Savepoint 時,作業也會從 Checkpoint 處恢複
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 允許實驗性的功能:非對齊的 checkpoint,以提升性能
env.getCheckpointConfig().enableUnalignedCheckpoints();
相關參數的文字描述:
- env.enableCheckpointing(60000),1 分鐘觸發一次 checkpoint;
- setCheckpointTimeout,checkpoint 逾時時間,預設是 10 分鐘逾時,超過了逾時時間就會被丢棄;
- setCheckpointingMode,設定 checkpoint 語義,可以設定為 EXACTLY_ONCE,表示既不重複消費也不丢資料;AT_LEAST_ONCE,表示至少消費一次,可能會重複消費;
- setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時間。假如設定每分鐘進行一次 checkpoint,兩次 checkpoint 間隔時間為 30s。假設某一次 checkpoint 耗時 40s,那麼理論上20s 後就要進行一次 checkpoint,但是設定了兩次 checkpoint 之間的間隔時間為 30s,是以是 30s 之後才會進行 checkpoint。另外,如果配置了該參數,那麼同時進行的 checkpoint 數量隻能為 1;
- enableExternalizedCheckpoints,Flink 任務取消後,外部 checkpoint 資訊是否被清理。
- DELETE_ON_CANCELLATION,任務取消後,所有的 checkpoint 都将會被清理。隻有在任務失敗後,才會被保留;
- RETAIN_ON_CANCELLATION,任務取消後,所有的 checkpoint 都将會被保留,需要手工清理。
- setPreferCheckpointForRecovery,恢複任務時,是否從最近一個比較新的 savepoint 處恢複,預設是 false;
- enableUnalignedCheckpoints,是否開啟試驗性的非對齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數;
四、Checkpoint 如何實作的
Flink 的 checkpoint 是基于 Chandy-Lamport 算法,實作了一個分布式一緻性的存儲快照算法。
這裡我們假設一個簡單的場景來描述 checkpoint 具體過程是怎樣的。
場景是:假如現在 kafka 隻有一個分區,資料是每個 app 發過來的日志,我們統計每個 app 的 PV。
Flink 的 checkpoint coordinator (JobManager 的一部分)會周期性的在流事件中插入一個 barrier 事件(栅欄),用來隔離不同批次的事件,如下圖紅色的部分。
下圖中有兩個 barrier ,checkpoint barrier n-1 處的 barrier 是指 Job 從開始處理到 barrier n -1 所有的狀态資料,checkpoint barrier n 處的 barrier 是指 Job 從開始處理到 barrier n 所有的狀态資料。
回到剛剛計算 PV 的場景,當 Source Task 接受到 JobManager 的編号為 chk-100 的 Checkpoint 觸發請求後,發現自己恰好接收到了 offset 為(0,1000)【表示分區0,offset 為1000】處的資料,是以會往 offset 為(0,1000)資料之後,(0,1001)資料之前安插一個 barrier,然後自己開始做快照,把 offset (0,1000)儲存到狀态後端中,向 CheckpointCoordinator報告自己快照制作情況,同時向自身所有下遊算子廣播該barrier。如下圖:
當下遊計算的算子收到 barrier 後,會看是否收到了所有輸入流的 barrier,我們現在隻有一個分區,Source 算子隻有一個執行個體,barrier 到了就是收到了所有的輸入流的 barrier。
開始把本次的計算結果(app1,1000),(app2,5000)寫到狀态存儲之中,向 CheckpointCoordinator 報告自己快照制作情況,同時向自身所有下遊算子廣播該barrier。
當 Operator 2 收到栅欄後,會觸發自身進行快照,把自己當時的狀态存儲下來,向 CheckpointCoordinator 報告 自己快照制作情況。因為這是一個 sink ,狀态存儲成功後,意味着本次 checkpoint 也成功了。
Barrier 對齊
上面我們舉的例子是 Source Task 執行個體隻有一個的情況,在輸入流的算子有多個執行個體的情況下,會有一個概念叫 Barrier 對齊。
可以看上面的第一張圖,有兩個輸入流,一個是上面的數字流,一個是下面的字母流。
數字流的 barrier 在 1 後面,字母流的 barrier 在 e 後面。當上面的 barrier 到達 operator 之後,必須要等待下面的數字流的 barrier 也到達,此時數字流後面過來的資料會被緩存到緩沖區。這就是 barrier 對齊的過程。
看上面的第二張圖,當數字流的 barrier 到達後,意味着輸入流的所有執行個體的 barrier 都到達了,此時開始處理 到第三張圖的時候,處理完畢,自身做快照,然後把緩沖區的 pending 資料都發出去,把 checkpoint barrier n 繼續往下發送。
五、Flink 1.11 對 Checkpoint 的優化
從上圖的對齊過程,我們可以發現,在進行對齊的過程中,算子是不會再接着處理資料了,一定要等到對齊動作完成之後,才能繼續對齊。也就是上圖中的數字流的 barrier 到達之後,需要去等待字母流的 barrier 事件。
這其中會有一個阻塞的過程。在大多數情況下運作良好,然而當作業出現反壓時,阻塞式的 Barrier 對齊反而會加劇作業的反壓,甚至導緻作業不穩定。
首先, Chandy-Lamport 分布式快照的結束依賴于 Marker 的流動,而反壓則會限制 Marker 的流動,導緻快照的完成時間變長甚至逾時。無論是哪種情況,都會導緻 Checkpoint 的時間點落後于實際資料流較多。
這時作業的計算進度是沒有被持久化的,處于一個比較脆弱的狀态,如果作業出于異常被動重新開機或者被使用者主動重新開機,作業會復原丢失一定的進度。如果 Checkpoint 連續逾時且沒有很好的監控,復原丢失的進度可能高達一天以上,對于實時業務這通常是不可接受的。更糟糕的是,復原後的作業落後的 Lag 更大,通常帶來更大的反壓,形成一個惡性循環。
是以在 Flink 1.11 版本中,引入了一個 Unaligned Checkpointing 的子產品,主要功能是,在 barrier 到達之後,不必等待所有的輸入流的 barrier,而是繼續處理資料。
然後把第一次到達的 barrier 之後的所有資料也放到 checkpoint 裡面,在下一次計算的時候,會合并上次儲存的資料以及流入的資料後再計算。這樣會大大加快 Barrier 流動的速度,降低 checkpoint 整體的時長。
六、總結 Checkpoint 的原理
- JobManager 端的 CheckPointCoordinator 會定期向所有 SourceTask 發送 CheckPointTrigger,Source Task 會在資料流中安插 Checkpoint barrier;
- 當 task 收到上遊所有執行個體的 barrier 後,向自己的下遊繼續傳遞 barrier,然後自身同步進行快照,并将自己的狀态異步寫入到持久化存儲中
- 如果是增量 Checkpoint,則隻是把最新的一部分更新寫入到外部持久化存儲中
- 為了下遊盡快進行 Checkpoint,是以 task 會先發送 barrier 到下遊,自身再同步進行快照;