天天看點

Flink記憶體管理源碼解讀之基礎資料結構 在分布式實時計算領域,如何讓架構/引擎足夠高效地在記憶體中存取、處理海量資料是一個非常棘手的問題。在應對這一問題上Flink無疑是做得非常傑出的,Flink的自主記憶體管理設計也許比它自身的知名度更高一些。正好最近在研讀Flink的源碼,是以開兩篇文章來談談Flink的記憶體管理設計。 引用

Flink的記憶體管理的亮點展現在作為以Java為主的(部分功能用Scala實作,也是一種遵循JVM規範并依賴JVM解釋執行的函數式程式設計語言)的程式卻自主實作記憶體的管理而不完全依賴于JVM的記憶體管理機制。它的優勢在于靈活、為大資料場景而生、避免(不受控的)頻繁GC導緻的性能波動,某種程度上跳出了JVM的限制,是一種思路上的開拓。

基本上我們将Flink的記憶體設計分為兩個部分(遵循package的劃分方式):

基礎資料結構(package:org.apache.flink.core.memory)

記憶體管理機制(package:org.apache.flink.runtime.memory)

我們将分開來進行講解,本篇主要關注基本資料結構。記憶體管理機制請等待後續文章分析。

下圖是該package中所有類的關系圖:

其中:<code>MemorySegment</code>,<code>HeapMemorySegment</code>,<code>HybridMemorySegment</code>是最為關鍵的三個類,我們将重點分析。

Flink将其管理的記憶體抽象為兩種類型(主要的抽象依據記憶體的位置):

HEAP:JVM堆記憶體

OFF_HEAP:非堆記憶體

這在Flink中被定義為一個枚舉類型:<code>MemoryType</code>。

Flink所管理的記憶體被抽象為資料結構:<code>MemorySegment</code>。

據此,Flink為它提供了兩種實作:

HeapMemorySegment : 管理的記憶體還是JVM堆記憶體的一部分

HybridMemorySegment : Hybrid(on-heap or off-heap)MemorySegment,記憶體可能為JVM堆記憶體,也可能不是。

MemorySegment的相關字段:

UNSAFE : 用來對堆/非堆記憶體進行操作,是JVM的非安全的API

BYTE_ARRAY_BASE_OFFSET : 二進制位元組數組的起始索引,相對于位元組數組對象

LITTLE_ENDIAN : 布爾值,是否為小端對齊(涉及到位元組序的問題)

heapMemory : 如果為堆記憶體,則指向通路的記憶體的引用,否則若記憶體為非堆記憶體,則為null

address : 位元組數組對應的相對位址(若heapMemory為null,即可能為off-heap記憶體的絕對位址,後續會詳解)

addressLimit : 辨別位址結束位置(address+size)

size : 記憶體段的位元組數

其中,LITTLE_ENDIAN擷取的是目前作業系統的位元組順序,它是布爾值,後續的很多put/get操作都需要先判斷是bigedian(大端)還是littleedian(小端)。

關于位元組序的問題,如果不明白請自行Google

進入代碼主題,針對on-heap記憶體和off-heap記憶體提供了兩個構造器:

并且,提供了一大堆get/put方法,這些getXXX/putXXX大都直接或者間接調用了unsafe.getXXX/unsafe.putXXX。這些處理不同記憶體類型公共的方法在<code>MemorySegment</code>中實作。

當然不止這麼多,這隻是部分。

而特定的記憶體通路實作在兩個各自類中。

在MemorySegment類中還有三個值得關注的方法:

這是一個批量拷貝方法,用于從目前memory segment的offset偏移量開始拷貝numBytes長度的位元組到target memory segment中從targetOffset起始的地方。

自實作的比較方法,用于對目前memory segment偏移offset1長度為len的資料與seg2偏移起始位offset2長度為len的資料進行比較。

這裡有兩個while循環:

第一個while是逐位元組比較,如果len的長度大于8就從各自的起始偏移量開始擷取其資料的長整形表示進行對比,如果相等則各自後移8位(一個位元組),并且長度減8,以此循環往複。

第二個循環比較的是最後剩餘不到一個位元組(八個比特位),是以是按位比較

這個方法用于對兩個memory segment中的一段資料進行交換。除了一些邊界值判斷,就是一個借助于臨時變量的資料交換,隻不過用<code>unsafe.copyMemory</code>代替了指派号而已。

下面我們将探讨Flink提供的對兩種類型的記憶體管理:on-heap 以及 off-heap。

基于JVM堆記憶體(on-heap)實作的memory segment,這也是Flink最早的記憶體自管理機制。該類内部定義一個位元組數組的引用指向該記憶體段,之前提到<code>MemorySegment</code>裡的那些抽象方法在該類中的實作都基于該内部位元組數組的引用進行操作的,以此來獲得内建的而非額外的自實作檢查(這些檢查比如數組越界等)。這是什麼意思呢?當你定義

該memory指向<code>MemorySegment</code>中的heapMemory時,實作類似如下這種方法時

你就可以利用JVM自身的機制來判斷index是否在0到length - 1之間。而不用去結合address等屬性來判斷索引範圍了,比如上面這個方法在<code>HybridMemorySegment</code>裡是這麼實作的

這個實作必須這麼自行check邊界值。

因為是JVM的堆記憶體,是以很多方法的調用可以直接利用JDK自帶的方法,比如數組拷貝

其他方法的實作都很正常,沒有太多值得提點的地方。

這是另一種記憶體管理實作:它既支援on-heap記憶體也支援off-heap記憶體。乍一看,似乎有些匪夷所思,因為已經有一個對on-heap的實作了,為什麼還要搞一個Hybrid的,而不是off-heap的? 而且在一個類中對兩種不同的記憶體區域進行操作,也會顯得混亂。

那麼我們先來看看Flink是如何“優雅”地避免混亂的。這一切還要歸功于JVM提供的非安全的操作類(unsafe)提供的一系列方法

這些方法有如下特點:

(1)如果對象o不為null,并且後面的位址或者位置是相對位置,那麼會直接對目前對象(比如數組)的相對位置進行操作,既然這裡對象不為null,那麼這種情況自然滿足on-heap的場景;

(2)如果對象o為null,并且後面的位址是某個記憶體塊的絕對位址,那麼這些方法的調用也相當于對該記憶體塊進行操作。這裡對象o為null,所操作的記憶體塊不是JVM堆記憶體,這種情況滿足了off-heap的場景。

還記得我們在介紹<code>MemorySegment</code>類時,提到的兩個屬性:

heapMemory

address

這兩個屬性組合就可以适配上面的兩種場景了。而且,<code>MemorySegment</code>的一個構造參數:offHeapAddress ,已經基本指明了該構造器是專門針對off-heap的了。

<code>MemorySegment</code>給出了一些針對特定資料類型的公共實作,大部分也調用了unsafe的具有如上這種特性的方法,是以其實<code>MemorySegment</code>裡已經具有 Hybrid 的意思了。

問題來了,那麼Flink是如何獲得某個off-heap資料的記憶體位址呢?答案在如下代碼段

通過反射Buffer類獲得 address 屬性的Field表示,然後

拿到一個buffer的off-heap的位址表示。

雖然通過如上的<code>MemorySegment</code>的兩個屬性再加上unsafe相關方法的特殊性,HybridMemorySegment的實作已經很清晰,簡潔。但它内部還維護了一個指向它管理的off-heap資料的引用:offHeapBuffer。一方面是為了hold住那段記憶體空間不被釋放,另一方面是為了實作自身的一些方法。

<code>MemorySegmentFactory</code>是用來建立<code>MemorySegment</code>,而且Flink嚴重推薦使用它來建立<code>MemorySegment</code>的執行個體,而不是手動執行個體化。其目的是:為了讓運作時隻存在某一種MemorySegment的子類實作的執行個體,而不是<code>MemorySegment</code>的兩個子類的執行個體都同時存在,因為這會讓JIT有加載和選擇上的開銷,導緻大幅降低性能。關于這一點,Flink官方部落格專門開了一篇博文來解釋他們的對比以及測試方案,請見最後的引用。

如下圖:

顯而易見,這是設計模式中的工廠方法模式。

<code>MemorySegmentFactory</code>有個内部接口類<code>Factory</code>,<code>MemorySegment</code>的兩個實作類的内部類各自實作了該接口,并定義了各自<code>Factory</code>的實作。這塊并沒有特别的,隻是為了防止外部直接執行個體化<code>HybridMemorySegmentFactory</code>和<code>HeapMemorySegmentFactory</code>,它們各自的構造器都被設定為 private。

<code>MemorySegmentFactory</code>類提供了跟<code>Factory</code>接口類似的方法,或者應該說包裹了一層用來指定<code>Factory</code>具體執行個體的邏輯(基本上每個方法都先調用了<code>ensureInitialized</code>方法):

從上面可以看出,<code>MemorySegmentFactory</code>預設使用的是<code>HeapMemorySegment</code>類的執行個體來實作<code>MemorySegment</code>。

除了<code>MemorySegment</code>的相關實作,Flink的Core包還提供了建立在<code>MemorySegment</code>之上的更高的抽象:DataView(資料視圖)。

資料視圖相關的類關系圖:

有兩個接口,分别為輸出視圖<code>DataOutputView</code>(資料寫相關),輸入視圖<code>DataInputView</code>(資料讀相關)。兩個接口下分别各有一個子接口提供基于position的seek動作(即指定位置的資料讀寫操作)。另外分别有兩個實作類,它們各自包裝了對應的Stream接口。這塊也沒什麼特别的,不做過多說明。

以上是對Flink自主管理記憶體的資料結構部分的實作解讀。

原文釋出時間為:2016-03-24

本文作者:vinoYang