作為支援 Flink 流批一體與雲原生的重要組成部分,Flink Remote Shuffle 今天正式開源了: https://github.com/flink-extended/flink-remote-shuffle Flink Remote Shuffle 是一種批場景下利用外部服務完成任務間資料交換的 Shuffle 實作,本文後續将詳細介紹 Flink Remote Shuffle 研發的背景,以及 Flink Remote Shuffle 的設計與使用。
一、為什麼需要 Flink Remote Shuffle ?
1.1 背景
Flink Remote Shuffle 的提出與實作,源自我們觀察到的使用者對流批一體與雲原生日益增加的需求。
由于實時處理可以大幅提升使用者體驗以及增加産品在市場的競争力,越來越多的使用者業務場景中同時包含了實時和離線處理需求。如果流處理和批處理采用不同的架構來完成,将帶來使用者在架構學習、代碼開發與線上運維的諸多不便。同時,對于許多應用場景,由于實時處理受限于延遲資料(例如使用者可能隔很久才會填寫評論)或業務邏輯更新等原因,必須采用離線任務進行資料訂正,采用兩種不同的架構編寫兩份代碼邏輯很容易産生計算結果不一緻的問題。
針對這些問題,Flink 提出了流批一體的資料模型,即用一套 API 來完成實時資料與離線資料的處理。為了支援這一目标,Flink 設計與實作了流批統一的 DataStream API [1] + Table / SQL API [2] + Connector [3][4] ,并在執行層支援流批一體的排程 [5] 與面向批處理進行優化的 Batch 執行模式 [6]。而為了支援 Batch 模式,還需要 Flink 能夠實作高效與穩定的 Blocking Shuffle。Flink 内置的 Blocking Shuffle 在上遊結束後繼續依賴上遊所在的 TaskManager 來為下遊提供資料讀取服務,這将導緻 TaskManager 不能立即釋放進而降低資源使用率,并導緻 Shuffle 服務的穩定性受 Task 執行穩定性的影響。
另一方面,由于雲原生可以更好的支援離線線上混部來提高叢集資源使用率,提供統一的運維操作接口減少運維成本,并支援通過資源動态編排來實作作業的自動伸縮,越來越多的使用者開始使用 K8s 來管理它們的叢集資源。Flink 積極擁抱雲原生,除提供了對 K8s 的原生支援外 [7][8] ,Flink 提供了根據資源量進行動态伸縮的 Adaptive Scheduler [9], 并逐漸推動 State 的存儲計算分離 [10]。為了使 Batch 模式也能更好的支援雲原生,Shuffle 過程做為本地磁盤的最大使用者,如何實作 Blocking Shuffle 的存儲計算分離,減少對本地磁盤的占用,使得計算資源與存儲資源不再互相耦合,也是必須要解決的問題。
是以,為了更好的支援流批一體與雲原生,通過使用獨立的 Shuffle 服務來實作任務間的資料傳輸是必由之路。
1.2 Flink Remote Shuffle 的優勢
Flink Remote Shuffle 正是基于上述思路來設計與實作的。它支援了許多重要特性,包括:
- 存儲計算分離:存儲計算分離使計算資源與存儲資源可以獨立伸縮,計算資源可以在計算完成後立即釋放,Shuffle 穩定性不再受計算穩定性影響。
- 支援多種部署模式:支援 Kubernetes、Yarn 以及 Standalone 環境下部署。
- 采用了類似 Flink Credit-Based 流量控制機制,實作了零拷貝資料傳輸,最大限度的使用受管理的記憶體 (managed memory) 以避免 OOM,提高了系統穩定性與性能。
- 實作了包括負載均衡、磁盤 IO 優化、資料壓縮、連接配接複用、小包合并等諸多優化,實作了優秀的性能與穩定性表現。
- 支援 Shuffle 資料正确性校驗,能夠容忍 Shuffle 程序乃至實體節點重新開機。
- 結合 FLIP-187: Flink Adaptive Batch Job Scheduler [11] 可支援動态執行優化,如動态決定算子并發度。
1.3 生産實踐
從 2020 雙十一開始,阿裡内部許多核心任務開始選擇基于Flink的流批一體處理鍊路,這也是業界首次完成流批一體大規模的生産實踐的落地。通過流批一體處理技術,解決了如天貓營銷引擎等場景下流批處理口徑一緻性的問題,将資料報表開發效率提升了 4 到 10 倍,并且通過流作業與批作業混部實作資源白天與夜晚的削峰填谷,資源成本節省了 1 倍。
而做為流批一體技術的重要一環,Flink Remote Shuffle 自上線已來,最大的叢集規模已達千台以上,在曆次大促中平穩的支援了天貓營銷引擎、天貓國際等多個業務方,作業規模超過 PB 級别,充分證明了系統的穩定性與性能。
二、Flink Remote Shuffle 的設計與實作
2.1 Flink Remote Shuffle 整體架構
Flink Remote Shuffle 是基于 Flink 統一插件化 Shuffle 接口來實作的。Flink 作為流批一體的資料處理平台,在不同場景可以适配多種不同的 Shuffle 政策,如基于網絡的線上 Pipeline Shuffle,基于 TaskManager 的 Blocking Shuffle 和基于遠端服務的 Remote Shuffle。
這些 Shuffle 政策在傳輸方式、存儲媒體等方面存在較大差異,但是它們在資料集的生命周期、中繼資料管理與通知下遊任務、資料分發政策等方面存在了許多共性的需求。為了為不同類型的 Shuffle 提供統一支援,簡化包括 Flink Remote Shuffle 在内的新的 Shuffle 政策的實作,Flink 中引入了插件化的 Shuffle 架構 [12] 。
如下圖所示,一個 Shuffle 插件主要由兩部分組成,即在 JobMaster 端負責資源申請與釋放的 ShuffleMaster 與在 TaskManager 端負責資料實際讀寫的 InputGate 與 ResultPartition。排程器通過 ShuffleMaster 申請資源後交由 PartitionTracker 進行管理,并在上遊和下遊任務啟動時攜帶 Shuffle 資源的描述符來描述資料輸出和讀取的位置。
基于 Flink 統一插件化 Shuffle 接口,Flink Remote Shuffle 通過一個單獨的叢集提供資料 shuffle 服務。該叢集采用經典的 master-slave 結構,其中 ShuffleManager 作為整個叢集的 master 結點,負責對 worker 結點進行管理,以及對 Shuffle 資料集進行配置設定與管理。ShuffleWorker 作為叢集的 slave 結點,負責資料集實際的讀寫與清理。
當上遊 Task 啟動時,Flink 的排程器将通過 RemoteShuffleMaster 插件向 ShuffleManager 申請資源,ShuffleManager 将根據資料集的類型與各個 Worker 的負載選擇合适的 Worker 提供服務。當排程器拿到相應的 Shuffle 資源描述符時,會在啟動上遊 Task 時攜帶該描述符。上遊 Task 根據描述符中記錄的 ShuffleWorker 位址将資料發送給相應的 ShuffleWorker 進行持久化存儲。相對的,當下遊 Task 啟動後,将根據看描述符中記錄的位址從相應的 ShuffleWorker 進行讀取,進而完成整個資料傳輸的過程。
作為一個長時間運作的服務,系統的錯誤容忍與自愈能力是非常關鍵的。Flink Remote Shuffle 通過心跳等機制對 ShuffleWorker 與 ShuffleMaster 進行監聽,并在心跳逾時、IO 失敗等異常出現時候對資料集進行删除與狀态同步,進而維護整個叢集狀态的最終一緻性。更多關于異常情況的處理,可以參考 Flink Remote Shuffle 相關文檔 [13] 。
2.2 資料 Shuffle 協定與優化
資料遠端 Shuffle 可劃分為讀寫兩個階段。在資料寫階段,上遊計算任務的輸出資料被寫到遠端的 ShuffleWorker;在資料讀階段,下遊計算任務從遠端的 ShuffleWorker 讀取上遊計算任務的輸出并進行處理。資料 Shuffle 協定定義了這一過程中的資料類型、粒度、限制以及流程等。總體而言,資料的寫出 - 讀取流程如下:
資料寫出
資料讀取
在整個資料讀寫的過程中,實作了多種優化手段,包括資料壓縮,流量控制,減少資料拷貝,使用受管理記憶體等:
- Credit-based 流量控制:流量控制是生産者 - 消費者模型需要考慮的一個重要問題,目的是避免消費速度慢導緻資料無限堆積。Flink Remote Shuffle 采用了和 Flink 類似的 Credit-based 流量控制機制,即隻有在資料接收端有足夠緩沖區來接收資料時,資料發送端才會發送這些資料。而資料接收端在不斷處理資料的過程中,也會将釋放的緩沖區回報給發送端繼續發送新的資料,這樣不斷往複實作流式的資料發送,類似于 TCP 的滑動視窗機制。Credit-based 流量控制機制可以很好的避免下遊在接收緩沖區不足時無效的落盤,也可以在 TCP 連接配接複用場景下避免因為一條邏輯鍊路擁塞影響其他邏輯鍊路。對這一機制感興趣可以參考 Flink 的部落格 [14]。
- 資料壓縮:資料壓縮是一個簡單而有效的優化手段,其效果已經被廣泛應用和證明,是一個必選項。Flink Remote Shuffle 也實作了資料壓縮。具體而言,資料會在生産者寫出到遠端 ShuffleWorker 前進行壓縮,并在消費者從遠端 ShuffleWorker 讀取後進行解壓。這樣可以達到同時減少網絡與檔案 IO 的目的,同時減少網絡帶寬與磁盤存儲空間的占用,提升了 IO 效率。
- 減少資料拷貝:在進行網絡與檔案 IO 時,Flink 最大限度的使用直接記憶體 (Direct Memory),這樣便減少了 Java 堆記憶體的拷貝,提升了效率,同時也有利于減少直接記憶體的動态申請,有利于提升穩定性。
- 使用受管理記憶體:對于 Shuffle 資料傳輸以及檔案 IO 所使用的大塊記憶體,Flink Remote Shuffle 均使用預申請的受管理記憶體,即預先申請記憶體建立記憶體池,後續記憶體申請釋放均在記憶體池中進行,這樣減少了記憶體動态申請釋放的開銷 (系統調用以及 GC),更重要的是有利于避免 OOM 問題的産生,極大的增強了系統的穩定性。
- TCP 連接配接複用:對于同一個 Flink 計算節點到同一個遠端 ShuffleWorker 的資料讀或寫連接配接會複用相同的實體 TCP 連接配接,這有利于減少網絡連接配接數量,提升資料讀寫穩定性。
2.3 存儲與檔案 IO 優化
對于落盤 Shuffle 而言,尤其是在機械硬碟上,檔案 IO 會成為重要的瓶頸,優化檔案 IO 會取得很好的加速效果。
除了上面提到的資料壓縮,一個被廣泛采用的技術方案是進行小檔案或者說是小資料塊合并,進而增加檔案的順序讀寫,避免過多的随機讀寫,最終優化檔案 IO 性能。對于非遠端的計算節點間的直接 Shuffle,包括 Spark 等系統都實作了将小塊資料合并成大塊的優化。
對于遠端 Shuffle 系統的資料合并方案,根據我們的調研,最早是由 Microsoft & LinkedIn & Quantcast 在一篇論文 Sailfish [15] 中提出的,後來包括 Princeton & Facebook 的 Riffle [16],Facebook 的 Cosco [17],LinkedIn 的 Magnet [18],Alibaba EMR 的 Spark 遠端 Shuffle [19] 都實作了類似的優化思路,即将由不同的上遊計算任務發送給相同下遊計算任務的 Shuffle 資料推送給相同的遠端 Shuffle 服務節點進行合并,下遊計算任務可以直接從這些遠端的 Shuffle 服務節點拉取合并後的資料。
除了這一優化思路外,我們在 Flink 的計算節點間的直接 Shuffle 實作中提出了另一種優化思路,即 Sort-Spill 加 IO 排程,簡單而言就是在計算任務的輸出資料填滿記憶體緩沖區後對資料進行排序 (Sort),排序後的資料寫出 (Spill) 到檔案中,并且在寫出過程中避免了寫出多個檔案,而是始終向同一個檔案追加資料,在資料讀取的過程中,增加對資料讀取請求的排程,始終按照檔案的偏移順序讀取資料,滿足讀取請求,在最優的情況下可以實作資料的完全順序讀取。下圖展示了基本的存儲結構與 IO 排程流程,更具體的細節可參考 Flink 部落格 [20] 或者中文版 [21]。
這兩種方案各有一些優勢和不足:
- 容錯方面,資料合并的方案對于資料丢失的容忍度更低,由于同一檔案中包含由所有并發計算任務合并産生資料,是以一旦一個檔案丢失,則需要重跑所有生産者并發,開銷巨大,是以為了避免這一開銷,可能需要采用備份等方式避免發生重算,然而備份也意味着更多的檔案 IO (降低性能) 以及更多存儲空間占用。而 IO 排程的方案,對于資料損壞或丢失,隻需要重新生成丢失的資料即可。此外,對于生産者任務的失敗處理,資料合并的方式也更為複雜,因為需要清理或者标記失敗的資料段,然後讀取時跳過這些資料,或者在讀取時進行去重,跳過這些資料。而對于 IO 排程的方案,隻需要丢棄失敗的生産者産生的資料檔案即可。
- 性能上,一般情況下,兩者都可以實作很好的檔案 IO 吞吐,然而特殊情況下,IO 排程方案也有一些不足,比如 IO 排程依賴消費者計算任務的資料請求,如果下遊消費者無法同時被拉起,則會影響資料的順序讀取,降低檔案 IO 性能。此外,如果需要對資料本身進行排序,資料合并的方式将更有利,因為需要排序的資料在同一個檔案中。類似的,如果需要寫資料到分布式檔案系統等外部系統,資料合并的方式也更為有利,因為這些外部系統不太容易實作 IO 排程優化。
- 在檔案數量上,資料合并的方式檔案數量和消費者任務的數量相等,IO 排程的方案檔案數量和生産者任務的數量相等。
Flink Remote Shuffle 的抽象不排斥任何一種優化政策。事實上,Flink Remote Shuffle 可被看作是一個可以感覺 Map-Reduce 語義的中間資料存儲服務,其基本資料存儲單元是資料分區 (DataPartition),資料分區有兩種基本的類型,分别是 MapPartition 和 ReducePartition。其中 MapPartition 包含的資料由一個上遊計算任務産生并可能會被多個下遊計算任務消費,下面的示意圖展示了 MapPartition 的産生與消費:
而 ReducePartition 由多個上遊計算任務的輸出合并産生并被單個下遊計算任務消費。下面的示意圖展示了 ReducePartition 的産生與消費:
三、部署使用與評估
3.1 多環境部署與運維
支援在多種環境部署,滿足差異化的部署需求是一個重要能力。具體而言,Flink Remote Shuffle 支援 Kubernetes、YARN 以及 Standalone 三種部署模式,可以滿足絕大部分使用者的部署環境需求。在每種部署模式下,都有一些便捷化腳本和模闆可供使用者使用。更加詳細的資訊可以參考文檔:Kubernetes 模式部署 [22] 、YARN 模式部署 [23] 以及 Standalone 模式部署 [24] 。其中,Kubernetes 模式與 YARN 模式部署實作了主節點 (ShuffleManager) 高可用,Standalone 模式部署的主節點高可用将在未來版本支援。
除此之外,Flink Remote Shuffle 的 Metric 系統還提供了若幹重要的監控名額可供使用者監控整個系統的運作狀态,包括活躍節點數量、作業總量、每個節點上可用緩沖區數量、資料分區數量、網絡連接配接數量、網絡吞吐、JVM 名額等資訊,未來會不斷增加更多的監控名額以友善使用者的運維操作。使用者可以直接通路各個程序 (ShuffleManager & ShuffleWorker) 的 Metric 服務查詢相應的名額資料,具體可參考使用者文檔 [25]。未來将會提供 Metric 名額彙報能力,允許使用者将名額主動彙報到外部系統如 Prometheus 等。
基本來說,Flink Remote Shuffle 的部署與運維比較簡單,未來會持續提升部署與運維方面的體驗,簡化資訊采集與問題定位、提高自動化程度、降低運維成本。
3.2 多版本相容性
由于遠端 Shuffle 系統分為用戶端和服務端兩個部分,服務端作為一個獨立的叢集單獨運作,而用戶端作為 Flink 作業通路遠端 Shuffle 服務的代理運作在 Flink 叢集,在部署模式上,可能存在有很多使用者通過不同 Flink 叢集通路同一套 Shuffle 服務的情況,是以多版本相容性是使用者比較關心的一個問題。而 Shuffle 服務本身的版本會随着新的 Feature 或者優化而不斷更新,如果出現用戶端與服務端的不相容,最簡單的辦法是讓不同使用者的用戶端也一起跟着更新,但這往往需要使用者的配合,不總是可以做到。
能夠絕對保證版本間相容是最好的,為了最大限度的實作這一點,Flink Remote Shuffle 也做了很多工作,包括:
- 版本資訊與保留字段:在所有的協定消息中加入版本資訊與保留字段,這樣有利于在後續更改協定字段時保持相容;
- 增加存儲格式版本:存儲格式版本保留在存儲的資料中,這樣新版本的 Shuffle 存儲節點可以直接接管老的資料,避免資料重新生成的開銷;
- 不同版本不同處理:通過對不同版本做不同的處理,可以使新版本相容老版本的邏輯,同時服務端還可以借此監控用戶端老版本的使用;
- 相容版本服務發現:用戶端的服務發現可以允許多個版本的 Shuffle 服務同時運作,并且總是會尋找使用與自己版本相容的服務。
通過這些努力,我們期望做到不同版本間的完全相容,避免出現不必要的 “驚喜”,當然如果期望使用新版本的更多新功能與優化,還是需要更新用戶端版本。
3.3 穩定性與性能評估
生産應用表明,Flink Remote Shuffle 具有良好的穩定性與性能表現。這主要得益于諸多的性能與穩定性優化。
能夠提高穩定性的設計與優化包括:存算分離使得 Shuffle 穩定性不受計算穩定性影響;Credit-based 流量控制可根據消費者處理能力發送資料,避免消費者被壓垮;連接配接複用,小包合并以及主動網絡連接配接健康檢查等優化提升網絡穩定性;最大限度的使用被管理的記憶體極大的避免了 OOM 的可能;資料校驗使得系統可容忍程序乃至實體節點重新開機。
而性能方面,資料壓縮、負載均衡以及檔案 IO 優化等都很好的提升了資料 Shuffle 性能。在小資料量場景下,由于 Shuffle 資料大多存在作業系統的緩存中,Flink Remote Shuffle 與計算節點間直接 Shuffle 性能接近,相差不大。在大資料量場景下,得益于中心化的決策能力 (ShuffleManager 節點的負載均衡,單個 ShuffleWorker 節點統一管理整個實體機 IO),Flink Remote Shuffle 的性能要更勝一籌,下面截圖展示了 Flink Remote Shuffle 在運作作業 (TPC-DS q78) 時的磁盤 IO 資訊:
從圖中可以看出,我們使用了 sdd、sde、sdf、sdg、sdi 與 sdk 這幾塊盤,磁盤吞吐還是比較高的,後我們會持續進行優化。
四、未來展望
目前的 Flink Remote Shuffle 版本通過内部的大規模上線使用,已經證明了其在性能與穩定性方面是生産可用的。未來,我們會對 Flink Remote Shuffle 進行持續的疊代改進與增強,已經有若幹工作項在我們的計劃中,包括性能、易用性等諸多方面,我們也非常希望有更多的感興趣的小夥與我們一起參與到後續的使用與改進中,共同推進 Flink 流批一體與雲原生發展。更多資訊交流請釘釘掃描下方二維碼或搜尋群号 35065720。
參考文獻
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface?src=contextnavpagetreemode[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API[5]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling[6]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams[7]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/[8]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/[9]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler[10]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints[11]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler[12]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Service[13]
/docs/user_guide.md#fault-tolerance
[14]
https://flink.apache.org/2019/06/05/flink-network-stack.html[15] Rao S, Ramakrishnan R, Silberstein A, et al. Sailfish: A framework for large scale data processing[C]//Proceedings of the Third ACM Symposium on Cloud Computing. 2012: 1-14.
[16] Zhang H, Cho B, Seyfe E, et al. Riffle: optimized shuffle service for large-scale data analytics[C]//Proceedings of the Thirteenth EuroSys Conference. 2018: 1-15.
[17]
https://databricks.com/session/cosco-an-efficient-facebook-scale-shuffle-service[18] Shen M, Zhou Y, Singh C. Magnet: push-based shuffle service for large-scale data processing[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 3382-3395.
[19]
https://developer.aliyun.com/article/772329[20]
https://flink.apache.org/2021/10/26/sort-shuffle-part2.html[21]
https://mp.weixin.qq.com/s/M5lGOYu0Bwaspa8G0x5NHQ[22]
https://github.com/flink-extended/flink-remote-shuffle/blob/master/docs/deploy_on_kubernetes.md[23]
https://github.com/flink-extended/flink-remote-shuffle/blob/master/docs/deploy_on_yarn.md[24]
https://github.com/flink-extended/flink-remote-shuffle/blob/master/docs/deploy_standalone_mode.md[25]
https://github.com/flink-extended/flink-remote-shuffle/blob/master/docs/user_guide.md更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群;
第一時間擷取最新技術文章和社群動态,請關注公衆号~
近期熱點
更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群
活動推薦
阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:
99 元試用
實時計算Flink版(包年包月、10CU)即有機會獲得 Flink 獨家定制衛衣;另包 3 個月及以上還有 85 折優惠!
了解活動詳情:
https://www.aliyun.com/product/bigdata/sc