天天看點

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

一、什麼是 checkpoint

上次發文,提到了 Flink 可以非常高效的進行有狀态流的計算,通過使用 Flink 内置的 Keyed State 和 Operator State,儲存每個算子的狀态。

預設情況下,狀态是存儲在 JVM 的堆記憶體中,如果系統中某個環節發生了錯誤,當機,這個時候所有的狀态都會丢失,并且無法恢複,會導緻整個系統的資料計算發生錯誤。

此時就需要 Checkpoint 來保障系統的容錯。Checkpoint 過程,就是把算子的狀态周期性持久化的過程。

在系統出錯後恢複時,就可以從 checkpoint 中恢複每個算子的狀态,從上次消費的地方重新開始消費和計算。進而可以做到在高效進行計算的同時還可以保證資料不丢失,隻計算一次。

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

二、Checkpoint 必要的兩個條件

答案是否,需要滿足以下兩個條件才能做 Checkpoint:

  1. 需要支援重放一定時間範圍内資料的資料源,比如:kafka 。因為容錯機制就是在任務失敗後自動從最近一次成功的 checkpoint 處恢複任務,此時需要把任務失敗前消費的資料再消費一遍。假設資料源不支援重放,那麼資料還未寫到存儲中就丢了,任務恢複後,就再也無法重新消費這部分丢了的資料了。
  2. 需要一個存儲來儲存持久化的狀态,如:Hdfs,本地檔案。可以在任務失敗後,從存儲中恢複 checkpoint 資料。

三、Checkpoint 參數詳解

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 60s 做一次 checkpoint
env.enableCheckpointing(60000);

// 進階配置:

// checkpoint 語義設定為 EXACTLY_ONCE,這是預設語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 兩次 checkpoint 的間隔時間至少為 1 s,預設是 0,立即進行下一次 checkpoint
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);

// checkpoint 必須在 60s 内結束,否則被丢棄,預設是 10 分鐘
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 同一時間隻能允許有一個 checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 最多允許 checkpoint 失敗 3 次
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

// 當 Flink 任務取消時,保留外部儲存的 checkpoint 資訊
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 當有較新的 Savepoint 時,作業也會從 Checkpoint 處恢複
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// 允許實驗性的功能:非對齊的 checkpoint,以提升性能
env.getCheckpointConfig().enableUnalignedCheckpoints();      

相關參數的文字描述:

  1. env.enableCheckpointing(60000),1 分鐘觸發一次 checkpoint;
  2. setCheckpointTimeout,checkpoint 逾時時間,預設是 10 分鐘逾時,超過了逾時時間就會被丢棄;
  3. setCheckpointingMode,設定 checkpoint 語義,可以設定為 EXACTLY_ONCE,表示既不重複消費也不丢資料;AT_LEAST_ONCE,表示至少消費一次,可能會重複消費;
  4. setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時間。假如設定每分鐘進行一次 checkpoint,兩次 checkpoint 間隔時間為 30s。假設某一次 checkpoint 耗時 40s,那麼理論上20s 後就要進行一次 checkpoint,但是設定了兩次 checkpoint 之間的間隔時間為 30s,是以是 30s 之後才會進行 checkpoint。另外,如果配置了該參數,那麼同時進行的 checkpoint 數量隻能為 1;
  5. enableExternalizedCheckpoints,Flink 任務取消後,外部 checkpoint 資訊是否被清理。
  • DELETE_ON_CANCELLATION,任務取消後,所有的 checkpoint 都将會被清理。隻有在任務失敗後,才會被保留;
  • RETAIN_ON_CANCELLATION,任務取消後,所有的 checkpoint 都将會被保留,需要手工清理。
  1. setPreferCheckpointForRecovery,恢複任務時,是否從最近一個比較新的 savepoint 處恢複,預設是 false;
  2. enableUnalignedCheckpoints,是否開啟試驗性的非對齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數;

四、Checkpoint 如何實作的

Flink 的 checkpoint 是基于 Chandy-Lamport 算法,實作了一個分布式一緻性的存儲快照算法。

這裡我們假設一個簡單的場景來描述 checkpoint 具體過程是怎樣的。

場景是:假如現在 kafka 隻有一個分區,資料是每個 app 發過來的日志,我們統計每個 app 的 PV。

Flink 的 checkpoint coordinator (JobManager 的一部分)會周期性的在流事件中插入一個 barrier 事件(栅欄),用來隔離不同批次的事件,如下圖紅色的部分。

下圖中有兩個 barrier ,checkpoint barrier n-1 處的 barrier 是指 Job 從開始處理到 barrier n -1 所有的狀态資料,checkpoint barrier n 處的 barrier 是指 Job 從開始處理到 barrier n 所有的狀态資料。

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

回到剛剛計算 PV 的場景,當 Source Task 接受到 JobManager 的編号為 chk-100 的 Checkpoint 觸發請求後,發現自己恰好接收到了 offset 為(0,1000)【表示分區0,offset 為1000】處的資料,是以會往 offset 為(0,1000)資料之後,(0,1001)資料之前安插一個 barrier,然後自己開始做快照,把 offset (0,1000)儲存到狀态後端中,向 CheckpointCoordinator報告自己快照制作情況,同時向自身所有下遊算子廣播該barrier。如下圖:

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

當下遊計算的算子收到 barrier 後,會看是否收到了所有輸入流的 barrier,我們現在隻有一個分區,Source 算子隻有一個執行個體,barrier 到了就是收到了所有的輸入流的 barrier。

開始把本次的計算結果(app1,1000),(app2,5000)寫到狀态存儲之中,向 CheckpointCoordinator 報告自己快照制作情況,同時向自身所有下遊算子廣播該barrier。

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

當 Operator 2 收到栅欄後,會觸發自身進行快照,把自己當時的狀态存儲下來,向 CheckpointCoordinator 報告 自己快照制作情況。因為這是一個 sink ,狀态存儲成功後,意味着本次 checkpoint 也成功了。

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

Barrier 對齊

上面我們舉的例子是 Source Task 執行個體隻有一個的情況,在輸入流的算子有多個執行個體的情況下,會有一個概念叫 Barrier 對齊。

圖解 Flink Checkpoint 原理及在 1.11 版本的優化

可以看上面的第一張圖,有兩個輸入流,一個是上面的數字流,一個是下面的字母流。

數字流的 barrier 在 1 後面,字母流的 barrier 在 e 後面。當上面的 barrier 到達 operator 之後,必須要等待下面的數字流的 barrier 也到達,此時數字流後面過來的資料會被緩存到緩沖區。這就是 barrier 對齊的過程。

看上面的第二張圖,當數字流的 barrier 到達後,意味着輸入流的所有執行個體的 barrier 都到達了,此時開始處理 到第三張圖的時候,處理完畢,自身做快照,然後把緩沖區的 pending 資料都發出去,把 checkpoint barrier n 繼續往下發送。

五、Flink 1.11 對 Checkpoint 的優化

從上圖的對齊過程,我們可以發現,在進行對齊的過程中,算子是不會再接着處理資料了,一定要等到對齊動作完成之後,才能繼續對齊。也就是上圖中的數字流的 barrier 到達之後,需要去等待字母流的 barrier 事件。

這其中會有一個阻塞的過程。在大多數情況下運作良好,然而當作業出現反壓時,阻塞式的 Barrier 對齊反而會加劇作業的反壓,甚至導緻作業不穩定。

首先, Chandy-Lamport 分布式快照的結束依賴于 Marker 的流動,而反壓則會限制 Marker 的流動,導緻快照的完成時間變長甚至逾時。無論是哪種情況,都會導緻 Checkpoint 的時間點落後于實際資料流較多。

這時作業的計算進度是沒有被持久化的,處于一個比較脆弱的狀态,如果作業出于異常被動重新開機或者被使用者主動重新開機,作業會復原丢失一定的進度。如果 Checkpoint 連續逾時且沒有很好的監控,復原丢失的進度可能高達一天以上,對于實時業務這通常是不可接受的。更糟糕的是,復原後的作業落後的 Lag 更大,通常帶來更大的反壓,形成一個惡性循環。

是以在 Flink 1.11 版本中,引入了一個 Unaligned Checkpointing 的子產品,主要功能是,在 barrier 到達之後,不必等待所有的輸入流的 barrier,而是繼續處理資料。

然後把第一次到達的 barrier 之後的所有資料也放到 checkpoint 裡面,在下一次計算的時候,會合并上次儲存的資料以及流入的資料後再計算。這樣會大大加快 Barrier 流動的速度,降低 checkpoint 整體的時長。

六、總結 Checkpoint 的原理

  1. JobManager 端的 CheckPointCoordinator 會定期向所有 SourceTask 發送 CheckPointTrigger,Source Task 會在資料流中安插 Checkpoint barrier;
  2. 當 task 收到上遊所有執行個體的 barrier 後,向自己的下遊繼續傳遞 barrier,然後自身同步進行快照,并将自己的狀态異步寫入到持久化存儲中
  • 如果是增量 Checkpoint,則隻是把最新的一部分更新寫入到外部持久化存儲中
  • 為了下遊盡快進行 Checkpoint,是以 task 會先發送 barrier 到下遊,自身再同步進行快照;

繼續閱讀