天天看點

Flink Sort-Shuffle 實作簡介

本文介紹 Sort-Shuffle 如何幫助 Flink 在應對大規模批資料處理任務時更加遊刃有餘。主要内容包括:
  1. 資料 Shuffle 簡介
  2. 引入 Sort-Shuffle 的意義
  3. Flink Sort-Shuffle 實作
  4. 測試結果
  5. 調優參數
  6. 未來展望

GitHub 位址

https://github.com/apache/flink

歡迎大家關注 Flink ~

Flink 作為批流一體的大資料計算引擎,大規模批資料處理也是 Flink 資料處理能力的重要組成部分。随着 Flink 的版本疊代,其批資料處理能力也在不斷增強,sort-shuffle 的引入,使得 Flink 在應對大規模批資料處理任務時更加遊刃有餘。

一、資料 Shuffle 簡介

資料 shuffle 是批資料處理作業的一個重要階段,在這一階段中,上遊處理節點的輸出資料會被持久化到外部存儲中,之後下遊的計算節點會讀取這些資料并進行處理。這些持久化的資料不僅僅是一種計算節點間的資料交換形式,還在錯誤恢複中發揮着重要作用。

目前,有兩種批資料 shuffle 模型被現有的大規模分布式計算系統采用,分别是基于 hash 的方式以及基于 sort 的方式:

  1. 基于 hash 方式的核心思路是将發送給下遊不同并發消費任務的資料寫到單獨的檔案中,這樣檔案本身就成了一個自然的區分不同資料分區的邊界;
  2. 基于 sort 方式的核心思路是先将所有分區的資料寫在一起,然後通過 sort 來區分不同資料分區的邊界。

我們在 Flink 1.12 版本将基于 sort 的批處理 shuffle 實作引入了 Flink 并在後續進行了持續的性能與穩定性優化;到 Flink 1.13 版本,sort-shuffle 已經實作生産可用。

二、引入 Sort-Shuffle 的意義

我們之是以要在 Flink 中引入 sort-shuffle 的實作,一個重要的原因是 Flink 原本的基于 hash 的實作對大規模批作業不可用。這個也是被現有的其他大規模分布式計算系統所證明的:

  1. 穩定性方面:對于高并發批作業,基于 hash 的實作會産生大量的檔案,并且會對這些檔案進行并發讀寫,這會消耗很多資源并對檔案系統會産生較大的壓力。檔案系統需要維護大量的檔案中繼資料,會産生檔案句柄以及 inode 耗盡等不穩定風險。
  2. 性能方面:對于高并發批作業,并發讀寫大量的檔案意味着大量的随機 IO,并且每次 IO 實際讀寫的資料量可能是非常少的,這對于 IO 性能是一個巨大的挑戰,在機械硬碟上,這使得資料 shuffle 很容易成為批處理作業的性能瓶頸。

通過引入基于 sort 的批資料 shuffle 實作,并發讀寫的檔案數量可以大大降低,有利于實作更好的資料順序讀寫,進而能夠提高 Flink 大規模批處理作業的穩定性與性能。除此之外,新的 sort-shuffle 實作還可以減小記憶體緩沖區的消耗。對于基于 hash 的實作,每個資料分區都需要一塊讀寫緩沖區,記憶體緩沖區消耗和并發成正比。而基于 sort 的實作則可以做到記憶體緩沖區消耗和作業并發解耦(盡管更大的記憶體可能會帶來更高的性能)。

更為重要的一點是我們實作了新的存儲結構與讀寫 IO 優化,這使得 Flink 的批資料 shuffle 相比于其他的大規模分布式資料處理系統更具優勢。下面的章節會更為詳細的介紹 Flink 的 sort-shuffle 實作以及所取得的結果。

三、Flink Sort-Shuffle 實作

和其他分布式系統的批資料 sort-shuffle 實作類似,Flink 的整個 shuffle 過程分為幾個重要的階段,包括寫資料到記憶體緩沖區、對記憶體緩沖區進行排序、将排好序的資料寫出到檔案以及從檔案中讀取 shuffle 資料并發送給下遊。但是,與其他系統相比,Flink 的實作有一些根本性的不同,包括多段資料存儲格式、省掉資料合并流程以及資料讀取 IO 排程等。這些都使得 Flink 的實作有着更優秀的表現。

1. 設計目标

在 Flink sort-shuffle 的整個實作過程中,我們把下面這些點作為主要的設計目标加以考量:

1.1 減少檔案數量

正如上面所讨論的,基于 hash 的實作會産生大量的檔案,而減少檔案的數量有利于提高穩定性和性能。Sort-Spill-Merge 的方式被分布式計算系統廣泛采納以達到這一目标,首先将資料寫入記憶體緩沖區,當記憶體緩沖區填滿後對資料進行排序,排序後的資料被寫出到一個檔案中,這樣總的檔案數量是:(總資料量 / 記憶體緩沖區大小),進而檔案數量被減少。當所有資料寫出完成後,将産生的檔案合并成一個檔案,進而進一步減少檔案數量并增大每個資料分區的大小(有利于順序讀取)。

相比于其他系統的實作,Flink 的實作有一個重要的不同,即 Flink 始終向同一個檔案中不斷追加資料,而不會寫多個檔案再進行合并,這樣的好處始終隻有一個檔案,檔案數量實作了最小化。

1.2 打開更少的檔案

同時打開的檔案過多會消耗更多的資源,同時容易導緻檔案句柄不夠用的問題,導緻穩定性變差。是以,打開更少的檔案有利于提升系統的穩定性。對于資料寫出,如上所述,通過始終向同一個檔案中追加資料,每個并發任務始終隻打開一個檔案。對于資料讀取,雖然每個檔案都需要被大量下遊的并發任務讀取,Flink 依然通過隻打開檔案一次,并在這些并發讀取任務間共享檔案句柄實作了每個檔案隻打開一次的目标。

1.3 最大化順序讀寫

檔案的順序讀寫對檔案的 IO 性能至關重要。通過減少 shuffle 檔案數量,我們已經在一定程度上減少了随機檔案 IO。除此之外,Flink 的批資料 sort-shuffle 還實作了更多 IO 優化來最大化檔案的順序讀寫。在資料寫階段,通過将要寫出的資料緩沖區聚合成更大的批并通過 wtitev 系統調用寫出進而實作了更好的順序寫。在資料讀取階段,通過引入讀取 IO 排程,總是按照檔案的偏移順序服務資料讀取請求進而最大限度的實作的檔案的順序讀。實驗表明這些優化極大的提升了批資料 shuffle 的性能。

1.4 減少讀寫 IO 放大

傳統的 sort-spill-merge 方式通過将生成的多個檔案合并成一個更大的檔案從增大讀取資料塊的大小。這種實作方案雖然帶來了好處,但也有一些不足,最終要的一點便是讀寫 IO 放大,對于計算節點間的資料 shuffle 而言,在不發生錯誤的情況下,本身隻需要寫入和讀取資料一次,但是資料合并使得相同的資料被讀寫多次,進而導緻 IO 總量變多,并且存儲空間的消耗也會變大。

Flink 的實作通過不斷向同一個檔案中追加資料以及獨特的存儲結構規避了檔案和并的過程,雖然單個資料塊的大小小于和并後的大小,但由于規避了檔案合并的開銷再結合 Flink 獨有的 IO 排程,最終可以實作比 sort-spill-merge 方案更高的性能。

1.5 減少記憶體緩沖區消耗

類似于其他分布式計算系統中 sort-shuffle 的實作,Flink 利用一塊固定大小的記憶體緩沖區進行資料的緩存與排序。這塊記憶體緩沖區的大小是與并發無關的,進而使得上遊 shuffle 資料寫所需要的記憶體緩沖區大小與并發解耦。結合另一個記憶體管理方面的優化 FLINK-16428 可以同時實作下遊 shuffle 資料讀取的記憶體緩沖區消耗并發無關化,進而可以減少大規模批作業的記憶體緩沖區消耗。(注:FLINK-16428 同時适用于批作業與流作業)

2. 實作細節

2.1 記憶體資料排序

在 shuffle 資料的 sort-spill 階段,每條資料被首先序列化并寫入到排序緩沖區中,當緩沖區被填滿後,會對緩沖區中的所有二進制資料按照資料分區的順序進行排序。此後,排好序的資料會按照資料分區的順序被寫出到檔案中。雖然,目前并沒有對資料本身進行排序,但是排序緩沖區的接口足夠的泛化,可以實作後續潛在的更為複雜的排序要求。排序緩沖區的接口定義如下:

public interface SortBuffer {
 
   */** Appends data of the specified channel to this SortBuffer. \*/*
   boolean append(ByteBuffer source, int targetChannel, Buffer.DataType dataType) throws IOException;
 
   */** Copies data in this SortBuffer to the target MemorySegment. \*/*
   BufferWithChannel copyIntoSegment(MemorySegment target);
 
   long numRecords();
 
   long numBytes();
 
   boolean hasRemaining();
 
   void finish();
 
   boolean isFinished();
 
   void release();
 
   boolean isReleased();
 }           

在排序算法上,我們選擇了複雜度較低的 bucket-sort。具體而言,每條序列化後的資料前面都會被插入一個 16 位元組的中繼資料。包括 4 位元組的長度、4 位元組的資料類型以及 8 位元組的指向同一資料分區中下一條資料的指針。結構如下圖所示:

Flink Sort-Shuffle 實作簡介

當從緩沖區中讀取資料時,隻需要按照每個資料分區的鍊式索引結構就可以讀取到屬于這個資料分區的所有資料,并且這些資料保持了資料寫入時的順序。這樣按照資料分區的順序讀取所有的資料就可以達到按照資料分區排序的目标。

2.2 檔案存儲結構

如前所述,每個并行任務産生的 shuffle 資料會被寫到一個實體檔案中。每個實體檔案包含多個資料區塊(data region),每個資料區塊由資料緩沖區的一次 sort-spill 生成。在每個資料區塊中,所有屬于不同資料分區(data partition,由下遊計算節點不同并行任務消費)的資料按照資料分區的序号順序進行排序聚合。下圖展示了 shuffle 資料檔案的詳細結構。其中(R1,R2,R3)是 3 個不同的資料區塊,分别對應 3 次資料的 sort-spill 寫出。每個資料塊中有 3 個不同的資料分區,分别将由(C1,C2,C3)3 個不同的并行消費任務進行讀取。也就是說資料 B1.1,B2.1 及 B3.1 将由 C1 處理,資料 B1.2,B2.2 及 B3.2 将由 C2 處理,而資料 B1.3,B2.3 及 B3.3 将由 C3 處理。

Flink Sort-Shuffle 實作簡介

類似于其他的分布式處理系統實作,在 Flink 中,每個資料檔案還對應一個索引檔案。索引檔案用來在讀取時為每個消費者索引屬于它的資料(data partition)。索引檔案包含和資料檔案相同的 data region,在每個 data region 中有與 data partition 相同數量的索引項,每個索引項包含兩個部分,分别對應到資料檔案的偏移量以及資料的長度。作為一個優化。Flink 為每個索引檔案緩存最多 4M 的索引資料。資料檔案與索引檔案的對應關系如下:

Flink Sort-Shuffle 實作簡介

2.3 讀取 IO 排程

為了進一步提高檔案 IO 性能,基于上面的存儲結構,Flink 進一步引入了 IO 排程機制,類似于磁盤排程的電梯算法,Flink 的 IO 排程總是按照 IO 請求的檔案偏移順序進行排程。更具體來說,如果資料檔案有 n 個 data region,每個 data region 有 m 個 data partition,同時有 m 個下遊計算任務讀取這一資料檔案,那麼下面的僞代碼展示了 Flink 的 IO 排程算法的工作流程:

*// let data_regions as the data region list indexed from 0 to n - 1*
 *// let data_readers as the concurrent downstream data readers queue indexed from 0 to m - 1*
 for (data_region in data_regions) {
   data_reader = poll_reader_of_the_smallest_file_offset(data_readers);
   if (data_reader == null)
     break;
   reading_buffers = request_reading_buffers();
   if (reading_buffers.isEmpty())
     break;
   read_data(data_region, data_reader, reading_buffers);
 }              

2.4 資料廣播優化

資料廣播是指發送相同的資料給下遊計算節點的所有并行任務,一個常見的應用場景是 broadcast-join。Flink 的 sort-shuffle 實作對這一過程進行了優化,使得在包括記憶體排序緩沖區和 shuffle 檔案中,廣播資料隻儲存一份,這可以大大提升資料廣播的性能。更具體來說,當寫入一條廣播資料到排序緩沖區時,這條資料隻會被序列化并且拷貝一次,同樣在将資料寫出到 shuffle 檔案時,也隻會寫一份資料。在索引檔案中,對于不同 data partition 的資料索引項,他們均指向資料檔案中的同一塊資料。下圖展示了資料廣播優化的所有細節:

Flink Sort-Shuffle 實作簡介

2.5 資料壓縮

資料壓縮是一個簡單而有效的優化手段,測試結果顯示資料壓縮可以提高 TPC-DS 總體性能超過 30%。類似于 Flink 的基于 hash 的批處理 shuffle 實作,資料壓縮是以網絡緩沖區(network buffer)為機關進行的,資料壓縮不跨 data partition,也就是說發給不同下遊并行任務的資料分開壓縮,壓縮發生在資料排序後寫出前,下遊消費任務在收到資料後進行解壓。下圖展示了資料壓縮的整個流程:

Flink Sort-Shuffle 實作簡介

四、測試結果

1. 穩定性

新的 sort-shuffle 的實作極大的提高 Flink 運作批處理作業的穩定性。除了解決了潛在的檔案句柄以及 inode 耗盡的不穩定問題外,還解決了一些 Flink 原有 hash-shuffle 存在的已知問題,如 FLINK-21201(建立過多檔案導緻主線程阻塞),FLINK-19925(在網絡 netty 線程中執行 IO 操作導緻網絡穩定性受到影響)等。

2. 性能

我們在 1000 規模的并發下運作了 TPC-DS 10T 資料規模的測試,結果表明,相比于 Flink 原本的批資料 shuffle 實作,新的資料 shuffle 實作可以實作 2-6 倍的性能提升,如果排除計算時間,隻統計資料 shuffle 時間可以是先最高 10 倍的性能提升。下表展示了性能提升的詳細資料:

Jobs Time Used for Sort-Shuffle (s) Time Used for Hash-Shuffle (s) Speed up Factor
q4.sql 986 5371 5.45
q11.sql 348 798 2.29
q14b.sql 883 2129 2.51
q17.sql 269 781 2.90
q23a.sql 418 1199 2.87
q23b.sql 376 843 2.24
q25.sql 413 873 2.11
q29.sql 354 1038 2.93
q31.sql 223 498 2.23
q50.sql 215 550 2.56
q64.sql 217 442 2.04
q74.sql 270 962 3.56
q75.sql 166 713 4.30
q93.sql 204 540 2.65

在我們的測試叢集上,每塊機械硬碟的資料讀取以及寫入帶寬可以達到 160MB/s:

Disk Name SDI SDJ SDK
Writing Speed (MB/s) 189 173 186
Reading Speed (MB/s) 112 154 158

注:我們的測試環境配置如下,由于我們有較大的記憶體,是以一些 shuffle 資料量小的作業實際資料 shuffle 僅為讀寫記憶體,是以上面的表格僅列出了一些 shuffle 資料量大,性能提升明顯的查詢:

Number of Nodes Memory Size Per Node Cores Per Node Disks Per Node
12 About 400G 96 3

五、調優參數

在 Flink 中,sort-shuffle 預設是不開啟的,想要開啟需要調小這個參數的配置:

taskmanager.network.sort-shuffle.min-parallelism

。這個參數的含義是如果資料分區的個數(一個計算任務并發需要發送資料給幾個下遊計算節點)低于這個值,則走 hash-shuffle 的實作,如果高于這個值則啟用 sort-shuffle。實際應用時,在機械硬碟上,可以配置為 1,即使用 sort-shuffle。

Flink 沒有預設開啟資料壓縮,對于批處理作業,大部分場景下是建議開啟的,除非資料壓縮率低。開啟的參數為

taskmanager.network.blocking-shuffle.compression.enabled

對于 shuffle 資料寫和資料讀,都需要占用記憶體緩沖區。其中,資料寫緩沖區的大小由

taskmanager.network.sort-shuffle.min-buffers

控制,資料讀緩沖區由

taskmanager.memory.framework.off-heap.batch-shuffle.size

控制。資料寫緩沖區從網絡記憶體中切分出來,如果要增大資料寫緩沖區可能還需要增大網絡記憶體總大小,以避免出現網絡記憶體不足的錯誤。資料讀緩沖區從架構的 off-heap 記憶體中切分出來,如果要增大資料讀緩沖區,可能還需要增大架構的 off-heap 記憶體,以避免出現 direct 記憶體 OOM 錯誤。一般而言更大的記憶體緩沖區可以帶來更好的性能,對于大規模批作業,幾百兆的資料寫緩沖區與讀緩沖區是足夠的。

六、未來展望

還有一些後續的優化工作,包括但不限于:

1)網絡連接配接複用,這可以提高網絡的建立的性能與穩定性,相關 Jira 包括 FLINK-22643 以及 FLINK-15455;

2)多磁盤負載均衡,這有利于解決負載不均的問題,相關 Jira 包括 FLINK-21790 以及 FLINK-21789;

3)實作遠端資料 shuffle 服務,這有利于進一步提升批資料 shuffle 的性能與穩定性;

4)允許使用者選擇磁盤類型,這可以提高易用性,使用者可以根據作業的優先級選擇使用 HDD 或者 SSD。

英文原文連結:

https://flink.apache.org/2021/10/26/sort-shuffle-part1.html https://flink.apache.org/2021/10/26/sort-shuffle-part2.html

12 月 4-5 日,Flink Forward Asia 2021 重磅開啟,全球 40+ 多行業一線廠商,80+ 幹貨議題,帶來專屬于開發者的技術盛宴。

https://flink-forward.org.cn/

另有首屆 Flink Forward Asia Hackathon 正式啟動,10W 獎金等你來!

https://www.aliyun.com/page-source//tianchi/promotion/FlinkForwardAsiaHackathon
Flink Sort-Shuffle 實作簡介

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群

第一時間擷取最新技術文章和社群動态,請關注公衆号~

Flink Sort-Shuffle 實作簡介

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99 元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制衛衣;另包 3 個月及以上還有 85 折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
Flink Sort-Shuffle 實作簡介