如今,大資料領域的開源架構(hadoop,spark,storm)都使用的 jvm,當然也包括 flink。基于 jvm 的資料分析引擎都需要面對将大量資料存到記憶體中,這就不得不面對 jvm 存在的幾個問題:
java 對象存儲密度低。一個隻包含 boolean 屬性的對象占用了16個位元組記憶體:對象頭占了8個,boolean 屬性占了1個,對齊填充占了7個。而實際上隻需要一個bit(1/8位元組)就夠了。
full gc 會極大地影響性能,尤其是為了處理更大資料而開了很大記憶體空間的jvm來說,gc 會達到秒級甚至分鐘級。
oom 問題影響穩定性。outofmemoryerror是分布式計算架構經常會遇到的問題,當jvm中所有對象大小超過配置設定給jvm的記憶體大小時,就會發生outofmemoryerror錯誤,導緻jvm崩潰,分布式架構的健壯性和性能都會受到影響。
是以目前,越來越多的大資料項目開始自己管理jvm記憶體了,像 spark、flink、hbase,為的就是獲得像 c 一樣的性能以及避免 oom 的發生。本文将會讨論 flink 是如何解決上面的問題的,主要内容包括記憶體管理、定制的序列化工具、緩存友好的資料結構和算法、堆外記憶體、jit編譯優化等。
flink 并不是将大量對象存在堆上,而是将對象都序列化到一個預配置設定的記憶體塊上,這個記憶體塊叫做 <code>memorysegment</code>,它代表了一段固定長度的記憶體(預設大小為 32kb),也是 flink 中最小的記憶體配置設定單元,并且提供了非常高效的讀寫方法。你可以把 memorysegment 想象成是為 flink 定制的 <code>java.nio.bytebuffer</code>。它的底層可以是一個普通的 java 位元組數組(<code>byte[]</code>),也可以是一個申請在堆外的 <code>bytebuffer</code>。每條記錄都會以序列化的形式存儲在一個或多個<code>memorysegment</code>中。
flink 中的 worker 名叫 taskmanager,是用來運作使用者代碼的 jvm 程序。taskmanager 的堆記憶體主要被分成了三個部分:
memory manager pool: 這是一個由 <code>memorymanager</code> 管理的,由衆多<code>memorysegment</code>組成的超大集合。flink 中的算法(如 sort/shuffle/join)會向這個記憶體池申請 memorysegment,将序列化後的資料存于其中,使用完後釋放回記憶體池。預設情況下,池子占了堆記憶體的 70% 的大小。
remaining (free) heap: 這部分的記憶體是留給使用者代碼以及 taskmanager 的資料結構使用的。因為這些資料結構一般都很小,是以基本上這些記憶體都是給使用者代碼使用的。從gc的角度來看,可以把這裡看成的新生代,也就是說這裡主要都是由使用者代碼生成的短期對象。
注意:memory manager pool 主要在batch模式下使用。在steaming模式下,該池子不會預配置設定記憶體,也不會向該池子請求記憶體塊。也就是說該部分的記憶體都是可以給使用者代碼使用的。不過社群是打算在 streaming 模式下也能将該池子利用起來。
flink 采用類似 dbms 的 sort 和 join 算法,直接操作二進制資料,進而使序列化/反序列化帶來的開銷達到最小。是以 flink 的内部實作更像 c/c++ 而非 java。如果需要處理的資料超出了記憶體限制,則會将部分資料存儲到硬碟上。如果要操作多塊memorysegment就像操作一塊大的連續記憶體一樣,flink會使用邏輯視圖(<code>abstractpagedinputview</code>)來友善操作。下圖描述了 flink 如何存儲序列化後的資料到記憶體塊中,以及在需要的時候如何将資料存儲到磁盤上。
從上面我們能夠得出 flink 積極的記憶體管理以及直接操作二進制資料有以下幾點好處:
減少gc壓力。顯而易見,因為所有常駐型資料都以二進制的形式存在 flink 的<code>memorymanager</code>中,這些<code>memorysegment</code>一直呆在老年代而不會被gc回收。其他的資料對象基本上是由使用者代碼生成的短生命周期對象,這部分對象可以被 minor gc 快速回收。隻要使用者不去建立大量類似緩存的常駐型對象,那麼老年代的大小是不會變的,major gc也就永遠不會發生。進而有效地降低了垃圾回收的壓力。另外,這裡的記憶體塊還可以是堆外記憶體,這可以使得 jvm 記憶體更小,進而加速垃圾回收。
避免了oom。所有的運作時資料結構和算法隻能通過記憶體池申請記憶體,保證了其使用的記憶體大小是固定的,不會因為運作時資料結構和算法而發生oom。在記憶體吃緊的情況下,算法(sort/join等)會高效地将一大批記憶體塊寫到磁盤,之後再讀回來。是以,<code>outofmemoryerrors</code>可以有效地被避免。
節省記憶體空間。java 對象在存儲上有很多額外的消耗(如上一節所談)。如果隻存儲實際資料的二進制内容,就可以避免這部分消耗。
高效的二進制操作 & 緩存友好的計算。二進制資料以定義好的格式存儲,可以高效地比較與操作。另外,該二進制形式可以把相關的值,以及hash值,鍵值和指針等相鄰地放進記憶體中。這使得資料結構可以對高速緩存更友好,可以從 l1/l2/l3 緩存獲得性能的提升(下文會詳細解釋)。
目前 java 生态圈提供了衆多的序列化架構:java serialization, kryo, apache avro 等等。但是 flink 實作了自己的序列化架構。因為在 flink 中處理的資料流通常是同一類型,由于資料集對象的類型固定,對于資料集可以隻儲存一份對象schema資訊,節省大量的存儲空間。同時,對于固定大小的類型,也可通過固定的偏移位置存取。當我們需要通路某個對象成員變量的時候,通過定制的序列化工具,并不需要反序列化整個java對象,而是可以直接通過偏移量,隻是反序列化特定的對象成員變量。如果對象的成員變量較多時,能夠大大減少java對象的建立開銷,以及記憶體資料的拷貝大小。
flink支援任意的java或是scala類型。flink 在資料類型上有很大的進步,不需要實作一個特定的接口(像hadoop中的<code>org.apache.hadoop.io.writable</code>),flink 能夠自動識别資料類型。flink 通過 java reflection 架構分析基于 java 的 flink 程式 udf (user define function)的傳回類型的類型資訊,通過 scala compiler 分析基于 scala 的 flink 程式 udf 的傳回類型的類型資訊。類型資訊由 <code>typeinformation</code> 類表示,typeinformation 支援以下幾種類型:
<code>basictypeinfo</code>: 任意java 基本類型(裝箱的)或 string 類型。
<code>basicarraytypeinfo</code>: 任意java基本類型數組(裝箱的)或 string 數組。
<code>writabletypeinfo</code>: 任意 hadoop writable 接口的實作類。
<code>tupletypeinfo</code>: 任意的 flink tuple 類型(支援tuple1 to tuple25)。flink tuples 是固定長度固定類型的java tuple實作。
<code>caseclasstypeinfo</code>: 任意的 scala caseclass(包括 scala tuples)。
<code>pojotypeinfo</code>: 任意的 pojo (java or scala),例如,java對象的所有成員變量,要麼是 public 修飾符定義,要麼有 getter/setter 方法。
<code>generictypeinfo</code>: 任意無法比對之前幾種類型的類。
前六種資料類型基本上可以滿足絕大部分的flink程式,針對前六種類型資料集,flink皆可以自動生成對應的typeserializer,能非常高效地對資料集進行序列化和反序列化。對于最後一種資料類型,flink會使用kryo進行序列化和反序列化。每個typeinformation中,都包含了serializer,類型會自動通過serializer進行序列化,然後用java unsafe接口寫入memorysegments。對于可以用作key的資料類型,flink還同時自動生成typecomparator,用來輔助直接對序列化後的二進制資料進行compare、hash等操作。對于 tuple、caseclass、pojo 等組合類型,其typeserializer和typecomparator也是組合的,序列化和比較時會委托給對應的serializers和comparators。如下圖展示 一個内嵌型的tuple3 對象的序列化過程。
可以看出這種序列化方式存儲密度是相當緊湊的。其中 int 占4位元組,double 占8位元組,pojo多個一個位元組的header,pojoserializer隻負責将header序列化進去,并委托每個字段對應的serializer對字段進行序列化。
flink 的類型系統可以很輕松地擴充出自定義的typeinformation、serializer以及comparator,來提升資料類型在序列化和比較時的性能。
flink 提供了如 group、sort、join 等操作,這些操作都需要通路海量資料。這裡,我們以sort為例,這是一個在 flink 中使用非常頻繁的操作。
首先,flink 會從 memorymanager 中申請一批 memorysegment,我們把這批 memorysegment 稱作 sort buffer,用來存放排序的資料。
我們會把 sort buffer 分成兩塊區域。一個區域是用來存放所有對象完整的二進制資料。另一個區域用來存放指向完整二進制資料的指針以及定長的序列化後的key(key+pointer)。如果需要序列化的key是個變長類型,如string,則會取其字首序列化。如上圖所示,當一個對象要加到 sort buffer 中時,它的二進制資料會被加到第一個區域,指針(可能還有key)會被加到第二個區域。
将實際的資料和指針加定長key分開存放有兩個目的。第一,交換定長塊(key+pointer)更高效,不用交換真實的資料也不用移動其他key和pointer。第二,這樣做是緩存友好的,因為key都是連續存儲在記憶體中的,可以大大減少 cache miss(後面會詳細解釋)。
排序的關鍵是比大小和交換。flink 中,會先用 key 比大小,這樣就可以直接用二進制的key比較而不需要反序列化出整個對象。因為key是定長的,是以如果key相同(或者沒有提供二進制key),那就必須将真實的二進制資料反序列化出來,然後再做比較。之後,隻需要交換key+pointer就可以達到排序的效果,真實的資料不用移動。
随着磁盤io和網絡io越來越快,cpu逐漸成為了大資料領域的瓶頸。從 l1/l2/l3 緩存讀取資料的速度比從主記憶體讀取資料的速度快好幾個量級。通過性能分析可以發現,cpu時間中的很大一部分都是浪費在等待資料從主記憶體過來上。如果這些資料可以從 l1/l2/l3 緩存過來,那麼這些等待時間可以極大地降低,并且所有的算法會是以而受益。
在上面讨論中我們談到的,flink 通過定制的序列化架構将算法中需要操作的資料(如sort中的key)連續存儲,而完整資料存儲在其他地方。因為對于完整的資料來說,key+pointer更容易裝進緩存,這大大提高了緩存命中率,進而提高了基礎算法的效率。這對于上層應用是完全透明的,可以充分享受緩存友好帶來的性能提升。
flink 基于堆記憶體的記憶體管理機制已經可以解決很多jvm現存問題了,為什麼還要引入堆外記憶體?
啟動超大記憶體(上百gb)的jvm需要很長時間,gc停留時間也會很長(分鐘級)。使用堆外記憶體的話,可以極大地減小堆記憶體(隻需要配置設定remaining heap那一塊),使得 taskmanager 擴充到上百gb記憶體不是問題。
高效的 io 操作。堆外記憶體在寫磁盤或網絡傳輸時是 zero-copy,而堆記憶體的話,至少需要 copy 一次。
堆外記憶體是程序間共享的。也就是說,即使jvm程序崩潰也不會丢失資料。這可以用來做故障恢複(flink暫時沒有利用起這個,不過未來很可能會去做)。
但是強大的東西總是會有其負面的一面,不然為何大家不都用堆外記憶體呢。
堆記憶體的使用、監控、調試都要簡單很多。堆外記憶體意味着更複雜更麻煩。
flink 有時需要配置設定短生命周期的 <code>memorysegment</code>,這個申請在堆上會更廉價。
有些操作在堆記憶體上會快一點點。
flink用通過<code>bytebuffer.allocatedirect(numbytes)</code>來申請堆外記憶體,用 <code>sun.misc.unsafe</code> 來操作堆外記憶體。
基于 flink 優秀的設計,實作堆外記憶體是很友善的。flink 将原來的 <code>memorysegment</code> 變成了抽象類,并生成了兩個子類。<code>heapmemorysegment</code> 和 <code>hybridmemorysegment</code>。從字面意思上也很容易了解,前者是用來配置設定堆記憶體的,後者是用來配置設定堆外記憶體和堆記憶體的。是的,你沒有看錯,後者既可以配置設定堆外記憶體又可以配置設定堆記憶體。為什麼要這樣設計呢?
首先假設<code>hybridmemorysegment</code>隻提供配置設定堆外記憶體。在上述堆外記憶體的不足中的第二點談到,flink 有時需要配置設定短生命周期的 buffer,這些buffer用<code>heapmemorysegment</code>會更高效。那麼當使用堆外記憶體時,為了也滿足堆記憶體的需求,我們需要同時加載兩個子類。這就涉及到了 jit 編譯優化的問題。因為以前 <code>memorysegment</code> 是一個單獨的 final 類,沒有子類。jit 編譯時,所有要調用的方法都是确定的,所有的方法調用都可以被去虛化(de-virtualized)和内聯(inlined),這可以極大地提高性能(memroysegment的使用相當頻繁)。然而如果同時加載兩個子類,那麼 jit 編譯器就隻能在真正運作到的時候才知道是哪個子類,這樣就無法提前做優化。實際測試的性能差距在 2.7 被左右。
flink 使用了兩種方案:
方案1:隻能有一種 memorysegment 實作被加載
代碼中所有的短生命周期和長生命周期的memorysegment都執行個體化其中一個子類,另一個子類根本沒有執行個體化過(使用工廠模式來控制)。那麼運作一段時間後,jit 會意識到所有調用的方法都是确定的,然後會做優化。
方案2:提供一種實作能同時處理堆記憶體和堆外記憶體
這就是 <code>hybridmemorysegment</code> 了,能同時處理堆與堆外記憶體,這樣就不需要子類了。這裡 flink 優雅地實作了一份代碼能同時操作堆和堆外記憶體。這主要歸功于 <code>sun.misc.unsafe</code>提供的一系列方法,如getlong方法:
如果reference不為空,則會取該對象的位址,加上後面的offset,從相對位址處取出8位元組并得到 long。這對應了堆記憶體的場景。
如果reference為空,則offset就是要操作的絕對位址,從該位址處取出資料。這對應了堆外記憶體的場景。
這裡我們看下 <code>memorysegment</code> 及其子類的實作。
可以發現,hybridmemorysegment 中的很多方法其實都下沉到了父類去實作。包括堆内堆外記憶體的初始化。<code>memorysegment</code> 中的 <code>getxxx</code>/<code>putxxx</code> 方法都是調用了 unsafe 方法,可以說<code>memorysegment</code>已經具有了些 hybrid 的意思了。<code>heapmemorysegment</code>隻調用了父類的<code>memorysegment(byte[] buffer, object owner)</code>方法,也就隻能申請堆記憶體。另外,閱讀代碼你會發現,許多方法(大量的 getxxx/putxxx)都被标記成了 final,兩個子類也是 final 類型,為的也是優化 jit 編譯器,會提醒 jit 這個方法是可以被去虛化和内聯的。
對于堆外記憶體,使用 <code>hybridmemorysegment</code> 能同時用來代表堆和堆外記憶體。這樣隻需要一個類就能代表長生命周期的堆外記憶體和短生命周期的堆記憶體。既然<code>hybridmemorysegment</code>已經這麼全能,為什麼還要方案1呢?因為我們需要工廠模式來保證隻有一個子類被加載(為了更高的性能),而且heapmemorysegment比heap模式的hybridmemorysegment要快。
segment
time
heapmemorysegment, exclusive
1,441 msecs
heapmemorysegment, mixed
3,841 msecs
hybridmemorysegment, heap, exclusive
1,626 msecs
hybridmemorysegment, off-heap, exclusive
1,628 msecs
hybridmemorysegment, heap, mixed
3,848 msecs
hybridmemorysegment, off-heap, mixed
3,847 msecs
本文主要總結了 flink 面對 jvm 存在的問題,而在記憶體管理的道路上越走越深。從自己管理記憶體,到序列化架構,再到堆外記憶體。其實縱觀大資料生态圈,其實會發現各個開源項目都有同樣的趨勢。比如最近炒的很火熱的 spark tungsten 項目,與 flink 在記憶體管理上的思想是及其相似的。
[off-heap memory in apache flink and the curious jit compiler
[juggling with bits and bytes
[peeking into apache flink's engine room
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageid=53741525">flink: memory management</a>
[big data performance engineering
<a href="http://mishadoff.com/blog/java-magic-part-4-sun-dot-misc-dot-unsafe/">sun.misc.misc.unsafe usage for c style memory management</a>
<a href="http://howtodoinjava.com/core-java/related-concepts/usage-of-class-sun-misc-unsafe/">sun.misc.misc.unsafe usage for c style memory management - how to do it.</a>
<a href="http://www.javamex.com/tutorials/memory/object_memory_usage.shtml">memory usage of java objects: general guide</a>
<a href="http://www.36dsj.com/archives/33650">脫離jvm? hadoop生态圈的掙紮與演化</a>