随着近來越來越多的業務遷移到 Flink 上,對 Flink 作業的準确性要求也随之進一步提高,其中最為關鍵的是如何在不同業務場景下保證 exactly-once 的投遞語義。雖然不少實時系統(e.g. 實時計算/消息隊列)都宣稱支援 exactly-once,exactly-once 投遞似乎是一個已被解決的問題,但是其實它們更多是針對内部子產品之間的資訊投遞,比如 Kafka 生産(producer 到 Kafka broker)和消費(broker 到 consumer)的 exactly-once。而 Flink 作為實時計算引擎,在實際場景業務會涉及到很多不同元件,由于元件特性和定位的不同,Flink 并不是對所有元件都支援 exactly-once(見[1]),而且不同元件實作 exactly-once 的方法也有所差異,有些實作或許會帶來副作用或者用法上的局限性,是以深入了解 Flink exactly-once 的實作機制對于設計穩定可靠的架構有十分重要的意義。
下文将基于 Flink 詳細分析 exactly-once 的難點所在以及實作方案,而這些結論也可以推廣到其他實時系統,特别是流式計算系統。
Exactly-Once 難點分析
由于在分布式系統的程序間協調需要通過網絡,而網絡情況在很多情況下是不可預知的,通常發送消息要考慮三種情況:正常傳回、錯誤傳回和逾時,其中錯誤傳回又可以分為可重試錯誤傳回(e.g. 資料庫維護暫時不可用)和不可重試錯誤傳回(e.g. 認證錯誤),而可重試錯誤傳回和逾時都會導緻重發消息,導緻下遊可能接收到重複的消息,也就是 at-least-once 的投遞語義。而 exactly-once 是在 at-least-once 的基礎之上加上了可以識别出重發資料或者将消息包裝為為幂等操作的機制。
其實消息的 exactly-once 投遞并不是一個分布式系統産生的新課題(雖然它一般特指分布式領域的 exactly-once),早在計算網絡發展初期的 TCP 協定已經實作了網絡的可靠傳輸。TCP 協定的 exactly-once 實作方式是将消息傳遞變為有狀态的:首先同步建立連接配接,然後發送的每個資料包加上遞增的序列号(sequence number),發送完畢後再同步釋放連接配接。由于發送端和接受端都儲存了狀态資訊(已發送資料包的序列号/已接收資料包的序列号),它們可以知道哪些資料包是缺失或重複的。
而在分布式環境下 exactly-once 則更為複雜,最大的不同點在于分布式系統需要容忍程序崩潰和節點丢失,這會帶來許多問題,比如下面常見的幾個:
- 程序狀态需要持續化到可靠的分布式存儲,以防止節點丢失帶來狀态的丢失。
- 由于發送消息是一個兩階段的操作(即發送消息和收到對方的确認),重新開機之後的程序沒有辦法判斷崩潰前是否已經使用目前序列号發送過消息,是以可能會導緻重複使用序列号的問題。
- 被認為崩潰的程序有可能并沒有退出,随後再次連上來變為 zombie 程序繼續發送資料。
第2點和第3點其實是同一個問題,即需要區分出原本程序和重新開機後的程序。對此業界已經有比較成熟的解決方案: 引入 epoch 表示程序的不同世代并用分布式協調系統來負責管理。雖然還有一些衍生的細節問題,但總體來說問題都不大。但是第1點問題造成了一個比較深遠的影響,即為了減低 IO 成本,狀态的儲存必然是微批量(micro-batching)的而不是流式的,這會導緻狀态的儲存總是落後于流計算進度,因而為了保證 exactly-once 流計算引擎需要實作事務復原。
狀态 Exactly-Once 和端到端 Exactly-Once
Flink 提供 exactly-once 的狀态(state)投遞語義,這為有狀态的(stateful)計算提供了準确性保證。其中比較容易令人混淆的一點是狀态投遞語義和更加常見的端到端(end to end)投遞語義,而實作前者是實作後者的前置條件。
Flink 從 0.9 版本開始提供 State API,标志着 Flink 進入了 Stateful Streaming 的時代。State API 簡單來說是“不受程序重新開機影響的“資料結構,其命名規範也與常見的資料結構一緻,比如 MapState、ListState。Flink 官方提供的算子(比如 KafkaSource)和使用者開發的算子都可以使用 State API 來儲存狀态資訊。和大多數分布式系統一樣 Flink 采用快照的方式來将整個作業的狀态定期同步到外部存儲,也就是将 State API 儲存的資訊以序列化的形式存儲,作業恢複的時候隻要讀取外部存儲即可将作業恢複到先前某個時間點的狀态。由于從快照恢複同時會復原資料流的處理進度,是以 State 是天然的 exactly-once 投遞。
而端到端的一緻性則需要上下遊的外部系統配合,因為 Flink 無法将它們的狀态也儲存到快照并獨立地復原它們,否則就不叫作外部系統了。通常來說 Flink 的上遊是可以重複讀取或者消費的 pull-based 持續化存儲,是以要實作 source 端的 exactly-once 隻需要復原 source 的讀取進度即可(e.g. Kafka 的 offset)。而 sink 端的 exactly-once 則比較複雜,因為 sink 是 push-based 的。所謂覆水難收,要撤回發出去的消息是并不是容易的事情,因為這要求下遊根據消息作出的一系列反應都是可撤回的。這就需要用 State API 來儲存已發出消息的中繼資料,記錄哪些資料是重新開機後需要復原的。
下面将分析 Flink 是如何實作 exactly-once Sink 的。
Exactly-Once Sink 原理
Flink 的 exactly-once sink 均基于快照機制,按照實作原理可以分為幂等(Idempotent) sink 和事務性(Transactional) sink 兩種。
幂等 Sink
幂等性是分布式領域裡十分有用的特性,它意味着相同的操作執行一次和執行多次可以獲得相同的結果,是以 at-least-once 自然等同于 exactly-once。如此一來,在從快照恢複的時候幂等 sink 便不需要對外部系統撤回已發消息,相當于回避了外部系統的狀态復原問題。比如寫入 KV 資料庫的 sink,由于插入一行的操作是幂等的,是以 sink 可以無狀态的,在錯誤恢複時也不需要關心外部系統的狀态。從某種意義來講,上文提到的 TCP 協定也是利用了發送資料包幂等性來保證 exactly-once。
然而幂等 sink 的适用場景依賴于業務邏輯,如果下遊業務本來就無法保證幂等性,這時就需要應用事務性 sink。
事務性 Sink
事務性 sink 顧名思義類似于傳統 DBMS 的事務,将一系列(一般是一個 checkpoint 内)的所有輸出包裝為一個邏輯單元,理想的情況下提供 ACID 的事務保證。之是以說是“理想的情況下”,主要是因為 sink 依賴于目标輸出系統的事務保證,而分布式系統對于事務的支援并不一定很完整,比如 HBase 就不支援跨行事務,再比如 HDFS 等檔案系統是不提供事務的,這種情況下 sink 隻可以在用戶端的基礎上再包裝一層來盡最大努力地提供事務保證。
然而僅有下遊系統本身提供的事務保證對于 exactly-once sink 來說是不夠的,因為同一個 sink 的子任務(subtask)會有多個,對于下遊系統來說它們是處在不同會話和事務中的,并不能保證操作的原子性,是以 exactly-once sink 還需要實作分布式事務來達到所有 subtask 的一緻 commit 或 rollback。由于 sink 事務生命周期是與 checkpoint 一一對應的,或者說 checkpoint 本來就是實作作業狀态持久化的分布式事務,sink 的分布式事務也理所當然可以通過 checkpoint 機制提供的 hook 來實作。
Checkpoint 提供給算子的 hook 有 CheckpointedFunction 和 CheckpointListener 兩個,前者在算子進行 checkpoint 快照時被調用,後者在 checkpoint 成功後調用。為了簡單起見 Flink 結合上述兩個接口抽象出 exactly-once sink 的通用邏輯抽象 TwoPhaseCommitSinkFunction 接口,從命名即可看出這是對兩階段送出協定的一個實作,其主要方法如下:
- beginTransaction: 初始化一個事務。在有新資料到達并且目前事務為空時調用。
- preCommit: 預送出資料,即不再寫入目前事務并準好送出目前事務。在 sink 算子進行快照的時候調用。
- commit: 正式送出資料,将準備好的事務送出。在作業的 checkpoint 完成時調用。
- abort: 放棄事務。在作業 checkpoint 失敗的時候調用。
下面以 Bucketing File Sink 作為例子來說明如何基于異步 checkpoint 來實作事務性 sink。
Bucketing File Sink 是 Flink 提供的一個 FileSystem Connector,用于将資料流寫到固定大小的檔案裡。Bucketing File Sink 将檔案分為三種狀态,in-progress/pending/committed,分别表示正在寫的檔案、寫完準備送出的檔案和已經送出的檔案。
運作時,Bucketing File Sink 首先會打開一個臨時檔案并不斷地将收到的資料寫入(相當于事務的 beginTransaction 步驟),這時檔案處于 in-progress。直到這個檔案因為大小超過門檻值或者一段時間内沒有新資料寫入,這時檔案關閉并變為 pending 狀态(相當于事務的 pre-commit 步驟)。由于 Flink checkpoint 是異步的,可能有多個并發的 checkpoint,Bucketing File Sink 會記錄 pending 檔案對應的 checkpoint epoch,當某個 epoch 的 checkpoint 完成後,Bucketing File Sink 會收到 callback 并将對應的檔案改為 committed 狀态。這是通過原子操作重命名來完成的,是以可以保證 pre-commit 的事務要麼 commit 成功要麼 commit 失敗,不會出現其他中間狀态。
Commit 出現錯誤會導緻作業自動重新開機,重新開機後 Bucketing File Sink 本身已被恢複為上次 checkpoint 時的狀态,不過仍需要将檔案系統的狀态也恢複以保證一緻性。從 checkpoint 恢複後對應的事務會再次重試 commit,它會将記錄的 pending 檔案改為 committed 狀态,記錄的 in-progress 檔案 truncate 到 checkpoint 記錄下來的 offset,而其餘未被記錄的 pending 檔案和 in-progress 檔案都将被删除。
上面主要圍繞事務保證的 AC 兩點(Atomicity 和 Consistency),而在 I(Isolation)上 Flink exactly-once sink 也有不同的實作方式。實際上由于 Flink 的流計算特性,目前事務的未 commit 資料是一直在積累的,根據緩存未 commit 資料的地方的不同,可以将事務性 sink 分為兩種實作方式。
- 在 sink 端緩存未 commit 資料,等 checkpoint 完成以後将緩存的資料 flush 到下遊。這種方式可以提供 read-committed 的事務隔離級别,但同時由于未 commit 的資料不會發往下遊(與 checkpoint 同步),sink 端緩存會帶來一定的延遲,相當于退化為與 checkpoint 同步的 micro-batching 模式。
- 在下遊系統緩存未 commit 資料,等 checkpoint 完成後通知下遊 commit。這樣的好處是資料是流式發往下遊的,不會在每次 checkpoint 完成後出現網絡 IO 的高峰,并且事務隔離級别可以由下遊設定,下遊可以選擇低延遲弱一緻性的 read-uncommitted 或高延遲強一緻性的 read-committed。
在 Bucketing File Sink 的例子中,處于 in-progress 和 pending 狀态的檔案預設情況下都是隐藏檔案(在實踐中是使用下劃線作為檔案名字首,HDFS 的 FileInputFormat 會将其過濾掉),隻有 commit 成功後檔案才對使用者是可見的,即提供了 read-committed 的事務隔離性。理想的情況下 exactly-once sink 都應該使用在下遊系統緩存未 commit 資料的方式,因為這最為符合流式計算的理念。最為典型的是下遊系統本來就支援事務,那麼未 commit 的資料很自然地就是緩存在下遊系統的,否則 sink 可以選擇像上例的 Bucketing File Sink 一樣在下遊系統的使用者層面實作自己的事務,或者 fallback 到等待資料變為 committed 再發出的 micro-batching 模式。
總結
Exactly-once 是實時系統最為關鍵的準确性要求,也是目前限制大部分分布式實時系統應用到準确性要求更高的業務場景(比如線上事務處理 OLTP)的問題之一。目前來說流式計算的 exactly-once 在理論上已經有了很大的突破,而 Flink 社群也在積極汲取最先進的思想和實踐經驗。随着 Flink 在 exactly-once 上的技術愈發成熟,結合 Flink 本身的流處理特性,相信在不遠的将來,除了構造資料分析、資料管道應用, Flink 也可以在微服務領域占有一席之地。
參考文獻
1.Fault Tolerance Guarantees of Data Sources and Sinks
2.An Overview of End-to-End Exactly-Once Processing in Apache Flink
3.State Management in Apache Flink
4.An Overview of End-to-End Exactly-Once Processing in Apache Flink (with Apache Kafka, too!)