天天看點

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

EMR-OLAP 團隊主要負責開源大資料 OLAP 引擎的研發,例如 ClickHouse,Starrocks,Trino 等。通過 EMR 産品向阿裡雲使用者提供一站式的大資料 OLAP 解決方案。本文介紹了如何支援 Flink 到 ClickHouse 的 Exactly-Once 寫入來保證整個實時數倉資料準确性的現有機制及實作方案。主要内容包括:
  1. 背景
  2. 機制梳理
  3. 技術方案
  4. 測試結果
  5. 未來規劃

一、背景

Flink 和 ClickHouse 分别是實時流式計算和 OLAP 領域的翹楚,很多網際網路、廣告、遊戲等客戶都将兩者聯合使用于建構使用者畫像、實時 BI 報表、應用監控名額查詢、監控等業務,形成了實時數倉解決方案 (如圖-1)。這些業務對資料的準确性要求都十分嚴格,是以實時數倉整個鍊路需要保證端到端的 Exactly-Once。

通常來說 Flink 的上遊是可以重複讀取或者消費的 pull-based 持久化存儲 (例如 Kafka),要實作 Source 端的 Exactly-Once 隻需要回溯 Source 端的讀取進度即可。Sink 端的 Exactly-Once 則比較複雜,因為 Sink 是 push-based 的,需要依賴目标輸出系統的事務保證,但社群 ClickHouse 對事務并不支援,是以針對此情況阿裡雲 EMR ClickHouse 與 Flink 團隊一起深度研發,支援了 Flink 到 ClickHouse 的 Exactly-Once 寫入來保證整個實時數倉資料的準确性。本文将分别介紹下現有機制以及實作方案。

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

圖-1 實時數倉架構

二、機制梳理

2.1 ClickHouse 寫入機制

ClickHouse 是一個 MPP 架構的列式 OLAP 系統 (如圖-2),各個節點是對等的,通過 Zookeeper 協同資料,可以通過并發對各個節點寫本地表的方式進行大批量的資料導入。

ClickHouse 的 data part 是資料存儲的最小單元,ClickHouse 接收到的資料 Block 在寫入時,會按照 partition 粒度進行拆分,形成一個或多個 data part。data part 在寫入磁盤後,會通過背景merge線程不斷的合并,将小塊的 data part 合并成大塊的 data part,以此降低存儲和讀取的開銷。

在向本地表寫入資料時,ClickHouse 首先會寫入一個臨時的 data part,這個臨時 data part 的資料對用戶端不可見,之後會直接進行 rename 操作,使這個臨時 data part 成為正式 data part,此時資料對用戶端可見。幾乎所有的臨時 data part 都會快速地成功被 rename 成正式 data part,沒有被 rename 成功的臨時 data part 最終将被 ClickHouse 清理政策從磁盤上删除。

通過上述分析,可以看出 ClickHouse 的資料寫入有一個從臨時 data part 轉為正式 data part 的機制,加以修改可以符合兩階段送出協定,這是實作分布式系統中事務送出一緻性的重要協定。

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

圖-2 Flink 作業寫入 ClickHouse

注:多個 Flink Task 可以寫入同一個 shard 或 replica

2.2 Flink 寫機制

​ Flink 作為一個分布式處理引擎,提供了基于事務的 Sink 機制,該機制可以保障寫入的 Exactly-Once,相應的資料接收方需要提供遵守 XA 規範的 JDBC 。由于完整的 XA 規範相當複雜,是以,我們先對 Flink 的處理機制進行梳理,結合 ClickHouse 的實際情況,确定需要實作的接口範圍。

​ 為了實作分布式寫入時的事務送出統一,Flink 借助了 checkpoint 機制。該機制能夠周期性地将各個 Operator 中的狀态生成快照并進行持久化存儲。在 checkpoint 機制中,有一個 Coordinator 角色,用來協調所有 Operator 的行為。從 Operator 的角度來看,一次 checkpoint 有三個階段,初始化-->生成快照-->完成/廢棄 checkpoint。從 Coordinator 的角度來看,需要定時觸發 checkpoint,以及在所有 Operator 完成快照後,觸發 complete 通知。(參考附錄 [1] )

​ 接下來介紹 Flink 中的 Operator 是如何借助事務和 checkpoint 機制來保障 Exactly-Once,Operator 的完整執行需要經過 initial、writeData、snapshot、commit 和 close 階段。

initial 階段:

  • 從快照中取出上次任務執行時持久化的 xid 記錄。快照中主要存儲兩種 xid,一組是未完成 snapshot 階段的 xid,一組是已經完成了 snapshot 的 xid。
  • 接下來對上次未完成 snapshot 的 xid 進行 rollback 操作;對上次已經完成了 snapshot 但 commit 未成功的 xid 進行 commit 重試操作。
  • 若上述操作失敗,則任務初始化失敗,任務中止,進入 close 階段;若上述操作成功,則繼續。
  • 建立一個新的唯一的 xid,作為本次事務 ID,将其記錄到快照中。
  • 使用新生成的 xid,調用 JDBC 提供的 start() 接口。

writeData 階段:

  • 事務開啟後,進入寫資料的階段,Operator 的大部分時間都會處于這個階段。在與 ClickHouse 的互動中,此階段為調用 JDBC 提供的 preparedStatement 的 addBatch() 和 executeBatch() 接口,每次寫資料時都會在封包中攜帶目前 xid。
  • 在寫資料階段,首先将資料寫到 Operator 記憶體中,向 ClickHouse 送出記憶體中的批量資料有三種觸發方式:記憶體中的資料條數達到 batchsize 的門檻值;背景定時線程每隔一段時間觸發自動 flush;在 snapshot 階段調用end() 和 prepare() 接口之前會調用 flush 清空緩存。

snapshot 階段:

  • 目前事務會調用 end() 和 prepare() 接口,等待 commit,并更新快照中的狀态。
  • 接下來,會開啟一個新的事務,作為本 Task 的下一次 xid,将新事務記錄到快照中,并調用 JDBC 提供的start() 接口開啟新事務。
  • 将快照持久化存儲。

complete 階段:

在所有 Operator 的 snapshot 階段全部正常完成後,Coordinator 會通知所有 Operator 對已經成功的checkpoint 進行 complete 操作,在與 ClickHouse 的互動中,此階段為 Operator 調用 JDBC 提供的 commit() 接口對事務進行送出。

close階段:

  • 若目前事務尚未進行到 snapshot 階段,則對目前事務進行 rollback 操作。
  • 關閉所有資源。

從上述流程可以總結出,Flink 通過 checkpoint 和事務機制,将上遊資料按 checkpoint 周期分割成批,保障每一批資料在全部寫入完成後,再由 Coordinator 通知所有 Operator 共同完成 commit 操作。當有 Operator 寫入失敗時,将會退回到上次成功的 checkpoint 的狀态,并根據快照記錄的 xid 對這一批 checkpoint 的所有 xid 進行 rollback 操作。在有 commit 操作失敗時,将會重試 commit 操作,仍然失敗将會交由人工介入處理。

三、技術方案

3.1 整體方案

根據 Flink 和 ClickHouse 的寫入機制,可以描繪出一個Flink 到 ClickHouse 的事務寫入的時序圖 (如圖-3)。由于寫的是 ClickHouse 的本地表,并且事務的統一送出由 Coordinator 保障,是以 ClickHouse 無需實作 XA 規範中标準的分布式事務,隻需實作兩階段送出協定中的少數關鍵接口,其他接口在 JDBC 側進行預設即可。

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

圖-3 Flink 到 ClickHouse 事務寫入的時序圖

3.2 ClickHouse-Server

3.2.1 狀态機

為了實作 ClickHouse 的事務,我們首先定義一下所要實作的事務允許的幾種操作:

  • Begin:開啟一個事務。
  • Write Data:在一個事務内寫資料。
  • Commit:送出一個事務。
  • Rollback:復原一個未送出的事務。

事務狀态:

  • Unknown:事務未開啟,此時執行任何操作都是非法的。
  • Initialized:事務已開啟,此時允許所有操作。
  • Committing:事務正在被送出,不再允許 Begin/Write Data 兩種操作。
  • Committed:事務已經被送出,不再允許任何操作。
  • Aborting:事務正在被復原,不再允許任何操作。
  • Aborted:事務已經被復原,不再允許任何操作。

完整的狀态機如下圖所示:

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

圖-4 ClickHouse Server 支援事務的狀态機

圖中所有操作均是幂等的。其中,Committing 到 Committed 和 Aborting 到 Aborted 是不需要執行任何操作的,在開始執行 Commit 或 Rollback 時,事務的狀态即轉成 Committing 或 Aborting;在執行完 Commit 或 Rollback 之後,事務的狀态會被設定成 Committed 或 Aborted。

3.2.2 事務處理

Client 通過 HTTP Restful API 通路 ClickHouse Server,Client 與 ClickHouse Server 間一次完整事務的互動過程如圖-5 所示:

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

圖-5 Clickhouse 事務處理的時序圖

正常流程:

  • Client 向 ClickHouse 叢集任意一個 ClickHouse Server 發送 Begin Transaction 請求,并攜帶由 Client 生成的全局唯一的 Transaction ID。ClickHouse Server 收到 Begin Transaction 請求時,會向 Zookeeper 注冊該Transaction ID (包括建立 Transaction ID 及子 Znode 節點),并初始化該 Transaction 的狀态為

    Initialized

  • Client 接收到 Begin Transaction 成功響應時,可以開始寫入資料。當 ClickHouse Server 收到來自 Client 發送的資料時,會生成臨時 data part,但不會将其轉為正式 data part,ClickHouse Server 會将寫入的臨時 data part 資訊,以 JSON 的形式,記錄到 Zookeeper 上該 Transaction 的資訊中。
  • Client 完成資料的寫入後,會向 ClickHouse Server 發送 Commit Transaction 請求。 ClickHouse Server 在收到 Commit Transaction 請求後,根據 ZooKeeper 上對應的Transaction的 data part 資訊,将 ClickHouse Server 本地臨時 data part 資料轉為正式的 data part 資料,并更新Transaction 狀态為Committed。Rollback 的過程與 Commit 類似。

異常處理:

  • 如果建立 Transaction ID 過程中發現 Zookeeper 中已經存在相同 Transaction ID,根據 Zookeeper 中記錄的 Transaction 狀态進行處理:如果狀态是

    Unknown

    則繼續進行處理;如果狀态是

    Initialized

    則直接傳回;否則會抛異常。
  • 目前實作的事務還不支援分布式事務,隻支援單機事務,是以 Client 隻能往記錄該 Transaction ID 的 ClickHouse Server 節點寫資料,如果 ClickHouse Server 接收到到非該節點事務的資料,ClickHouse Server 會直接傳回錯誤資訊。
  • 與寫入資料不同,如果 Commit 階段 Client 向未記錄該 Transaction ID 的 ClickHouse Server 發送了 Commit Transaction 請求,ClickHouse Server 不會傳回錯誤資訊,而是傳回記錄該 Transaction ID 的 ClickHouse Server 位址給 Client,讓 Client 端重定向到正确的 ClickHouse Server。Rollback 的過程與 Commit 類似。

3.3 ClickHouse-JDBC

根據 XA 規範,完整的分布式事務機制需要實作大量的标準接口 (參考附錄 [2] )。在本設計中,實際上隻需要實作少量關鍵接口,是以,采用了基于組合的擴充卡模式,向 Flink 提供基于标準 XA 接口的 XAResource 實作,同時對 ClickHouse Server 屏蔽了不需要支援的接口。

對于 XADataSource 的實作,采用了基于繼承的擴充卡模式,并針對 Exactly-Once 的特性,修改了部分預設配置,如發送失敗的重試次數等參數。

另外,在生産環境中,通常不會通過分布式表,而是通過 SLB 進行資料寫入時的負載均衡。在 Exactly-Once 場景中,Flink 側的 Task 需要保持針對某一 ClickHouse Server 節點的連接配接,是以不能使用 SLB 的方式進行負載均衡。針對這一問題,我們借鑒了 BalanceClickHouseDataSource 的思路,通過在 URL 中配置多個IP,并在 properties 配置中将 write_mode 設定為 Random ,可以使 XADataSource 在保障 Exactly-Once 的同時,具有負載均衡的能力。

3.4 Flink-Connector-ClickHouse

Flink 作為一個流式資料處理引擎,支援向多種資料接收端寫入的能力,每種接收端都需要實作特定的Connector。針對 Exactly-Once,ClickHouse Connector 增加了對于 XADataSource 的選項配置,根據用戶端的配置提供 Exactly-Once 功能。

四、測試結果

4.1 ClickHouse 事務性能測試

  • 寫入 ClickHouse 單批次資料量和總批次相同,Client端并發寫線程不同性能比較。

    由圖-6 可以看出,無論 ClickHouse 是否開啟事務, ClickHouse 的吞吐量都與 Client 端并發寫的線程數成正比。開啟事務時,ClickHouse 中臨時 data part 不會立刻被轉為正式 data part,是以在事務完成前大量臨時 data part 不會參與 ClickHouse merge 過程,降低磁盤 IO 對寫性能的影響,是以開啟事務寫性能較未開啟事務寫性能更好;但事務内包含的批次變多,臨時 data part 在磁盤上的增多導緻了合并時 CPU 的壓力增大,進而影響了寫入的性能,開啟事務的寫性能也會降低。

    基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃
    圖-6 ClickHouse 寫入性能壓測 (一)
  • 寫入 ClickHouse 總批次 和 Client 端并發寫線程相同,單批次寫入 ClickHouse 資料量不同性能比較。

    由圖-7 可以看出,無論 ClickHouse 是否開啟事務, ClickHouse 的吞吐量都與單批次資料量大小成正比。開啟事務時,每批次資料越小,ClickHouse 的吞吐量受事務是否開啟的影響就越大,這是因為每批次寫入的時間在事務處理的占比較小,事務會對此産生一定的影響,是以,一次事務包含的批次數量越多,越能夠減少事務對寫入性能的影響;當事務包含批次的增大,事務處理時間在寫入中的占比逐漸降低,ClickHouse merge 産生的影響越來越大,進而影響了寫入的性能,開啟事務較不開啟事務寫性能更好。

    基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃
    圖-7 ClickHouse寫入性能壓測 (二)

總體來說,開啟事務對寫入性能幾乎沒有影響,這個結論是符合我們預期的。

4.2 Flink 寫入 ClickHouse 性能比較

對于相同資料量和不同 checkpoint 周期,Flink 寫入 ClickHouse 總耗時如圖-8 所示。可以看出,checkpoint 周期對于不開啟 Exactly-Once 的任務耗時沒有影響。對于開啟 Exactly-Once 的任務,在 5s 到 60s 的範圍内,耗時呈現一個先降低後增長的趨勢。原因是在 checkpoint 周期較短時,開啟 Exactly-Once 的 Operator 與 Clickhouse 之間有關事務的互動過于頻繁;在 checkpoint 周期較長時,開啟 Exactly-Once 的 Operator 需要等待 checkpoint 周期結束才能送出最後一次事務,使資料可見。在本測試中,checkpoint 周期資料僅作為一個參考,生産環境中,需要根據機器規格和資料寫入速度進行調整。

總體來說,Flink 寫入 Clickhouse 時開啟 Exactly-Once 特性,性能會稍有影響,這個結論是符合我們預期的。

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

圖-8 Flink 寫入 ClickHouse 測試

五、未來規劃

該版本 EMR ClickHouse 實作的事務還不是很完善,隻支援單機事務,不支援分布式事務。分布式系統一般都是通過 Meta Server 來做統一進制資料管理來支援分布式事務機制。目前我們也正在規劃設計 ClickHouse MetaServer 來支援分布式事務,同時可以移除 ClickHouse 對 ZooKeeper 的依賴。

附錄

[1]

https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html

[2]

https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf

Flink Forward Asia 2021 重磅開啟,全球 40+ 多行業一線廠商,80+ 幹貨議題,帶來專屬于開發者的技術盛宴。

https://flink-forward.org.cn/

另有首屆 Flink Forward Asia Hackathon 正式啟動,20W 獎金等你來!

https://www.aliyun.com/page-source//tianchi/promotion/FlinkForwardAsiaHackathon
基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群

第一時間擷取最新技術文章和社群動态,請關注公衆号~

基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99 元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制衛衣;另包 3 個月及以上還有 85 折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
基于 EMR OLAP 的開源實時數倉解決方案之 Flink + ClickHouse 事務實作一、背景二、機制梳理三、技術方案四、測試結果五、未來規劃