天天看點

ElasticSearch Aggregations GroupBy 實作源碼分析

為了友善調試,我對索引做了如下配置

這樣隻有一個分片,友善ide的跟蹤,也算是個看源碼的技巧

資料

假定的查詢如下:

其語義類似這個sql 語句: 

select avg(num) from twitter group by newtype

也就是按newtype 字段進行group by,然後對num求平均值。在我們實際的業務系統中,這種統計需求也是最多的。

在查詢過程中,es是将整個查詢分成幾個階段的,大體如下:

queryphase

rescorephase

suggestphase

aggregationphase

fetchphase

對于全文檢索,可能還有dfsphase。

順帶提一點,spark sql + es 的組合,最影響響應時間的地方其實是fetch original source 。

而對于這些phase,并不是一個鍊路的模式,而是在某個phase調用另外一個phase。這個在源碼中也很明顯,我們看如下一段代碼:

要了解具體是如何實作聚合功能的,則需要了解es 的aggregator相關的概念。大體有五個:

aggregatorfactory (典型的工廠模式)負責建立aggregator執行個體

aggregator (負責提供collector,并且提供具體聚合邏輯的類)

aggregations (聚合結果)

pipelineaggregator (對聚合結果進一步處理)

aggregator 的嵌套,比如 示例中的avgaggregator 就是根據globalordinalsstringtermsaggregator 的以bucket為次元,對相關資料進行操作.這種嵌套結構也是

bucket 其實就是被groupby 字段的數字表示形式。用數字表示,可以節省對應字段列式存儲的空間,并且提高性能。

我們知道,無論檢索亦或是聚合查詢,本質上都需要轉化到lucene裡的collector,以上面的案例為例,其實由兩個collector 完成最後的計算:

totalhitcountcollecotr

globalordinalsstringtermsaggregator(裡面還有個aggregator)

因為我們沒有定義過濾條件,是以最後的query 是個matchallquery,之後基于這個基礎上,這兩個collector 完成對應的計算。通常,這兩個collector 會被wrap成一個新的multicollector ,最終傳入indexsearcher的collector 就是multicollector。

根據上面的分析,我們知道示例中的聚合計算完全由globalordinalsstringtermsaggregator負責。

基于docvalues實作groupby概覽

對于每一個segment,我們都會為每個列單獨存儲成一個檔案,為了壓縮,我們可能會将裡面具體的值轉換成數字,然後再形成一個字典和數字對應關系的檔案。我們進行所謂的groupby操作,以最後進行avg為例子,其實就是維護了兩個大數組,

counts是newtype(我們例子中被groupby的字段)次數統計,對應的數組下标是newtype(我們已經将newtype轉化為數字表示了)。我們周遊文檔的時候(matchallquery),可以擷取doc,然後根據doc到列存檔案擷取對應的newtype,然後給counts 對應的newtype +1。 這樣我們就知道每個newtype 出現的次數了。

這裡我們也可以看到,消耗記憶體的地方取決于newtype的數量(distinct後),我們稱之為基數。基數過高的話,是比較消耗記憶體的。

sums 也是一樣的,下标是newtype的值,而對應的值則是不斷累加num(我們例子中需要被avg的字段)。

之後就可以周遊兩個數組得到結果了,代碼大體如下:

globalordinalsstringtermsaggregator/avgaggregator組合實作

globalordinalsstringtermsaggregator 首先要提供一個collector 給主流程,是以其提供了一個newcollector方法:

接着判定是不是隻有一個列檔案(docvalues):

final sorteddocvalues singlevalues = docvalues.unwrapsingleton(words);

//如果singlevalues!=null 則是一個,否則有多個列檔案

如果是一個的話:

通過doc 拿到ord(newtype),然後交給avg的collector 接着處理,進入avgaggregator 裡的collector的collect邏輯:

這個和我上面的概述中描述是一緻的。

如果是多個docvalues(此時索引還沒有對那些segment做合并),這個時候會走下面的流程:

這裡的ords 包括了多個docvalues檔案,然後做了全局映射,因為要把檔案的下标做映射。為啥要有下标映射呢?因為多個列檔案(docvalues)的全集才具有完整的newtype,但是每個列檔案都是從0開始遞增的。現在要擴張到一個global的空間上。 ords.cardinality()拿到了列檔案(docvalues)的數目,然後對每個檔案都處理一遍,通過ords.ordat(i) 拿到newtype的全局下标,這個時候就可以繼續交給avg完成了。

到這個階段,我們其實已經算好了每個newtype 出現的次數,以及num的累計值,也就是我們前面提到的兩個數組。

最終我們是要把這個資料輸出輸出的,不論是輸出給别的es節點,還是直接輸出給調用方。是以有個buildaggregation的過程,可以根據名字進行直覺的了解。

考慮到記憶體問題,es允許你設定一些threshhold,然後通過bucketpriorityqueue(優先隊列)來完成實際的資料收集以及排序(預設按文檔出現次數排序)。 裡面的元素是ordbucket,ordbucket包含了幾個值:

接着取出 topn 的對象,放到internalterms.bucket[] 數組中。然後周遊該數組,調用子aggregator的buildaggregation方法,這裡的子aggregator是avgaggregator ,每個bucket(newtype)就擷取到一個avg aggregations了,該aggregations通過internalaggregations 包裹,internalaggregations 包含了一個reduce 方法,該方法會調用具體internalaggregation的doreduce 方法,比如avgaggregator就有自己的reduce方法。說這個主要給下一小結做鋪墊。

最後會被包裝成stringterms ,然後就可以序列化成json格式,基本就是你在接口上看到的樣子了。

前面我們讨論的,都是基于一個分片,但是最終是要把結果資料進行merge的。 這個功能是由searchphasecontroller 對象來完成,大體如下:

sortedshardlist = searchphasecontroller.sortdocs(usescroll, firstresults);

其中merge 動作是按分類進行merge的,比如:

counter(計數,譬如total_hits)

hits

aggregations

suggest

profile (性能相關的資料)

這裡我們隻關注aggregations的merge

代碼有點長,核心是

裡面實際的邏輯也是比較簡單直覺的。會調用internalterms的reduce方法做merge,但是不同的類型的aggregator産生aggregations 合并邏輯是不一樣的,是以會委托給對應實作。比如globalordinalsstringtermsaggregator則會委托給internalterms的doreduce方法,而如avgaggregator會委托給internalavg的doreduce。 這裡就不展開。未來會單獨出一片文章講解。

這裡我們再額外講講valuesource (es 對fielddata/docvalues的抽象)。

前文我們提到,大部分aggregator 都是依賴于fielddata/docvalues 來實作的,而valuesource 則是他們在es裡的表示。是以了解他們是很有必要的。valuessource 全類名是:

該類就是es 為了管理 docvalues 而封裝的。它是一個抽象類,内部還有很多實作類,bytes,withordinals,fielddata,numeric,longvalues 等等。這些都是對特定類型docvalues類型的 es 表示。

按上面我們的查詢示例來看,newtype 字段對應的是

對象。這個對象是es對lucene string 類型的docvalues的一個表示。你會發現在valuesource類裡,有不同的fielddata。不同的fielddata 可能繼承自不同基類進而表示不同類型的資料。在現在這個fielddata 裡面有一個對象:

該對象在newtype(我們示例中的字段)是string類型的時候,對應的是實作類是

該對象的大體作用是,建構出docvalue的es的wraper。

具體代碼如下:

以第一種情況為例,上面的代碼new 了一個新的

我們看到,通過reader擷取到最後的列就是在該類裡的getordinalsvalues 方法裡實作的。

該方法最後傳回的randomaccessords 就是lucene的docvalues實作了。

分析了這麼多,所有的邏輯就濃縮在getleafcollector的第一行代碼上。globalords  的類型是randomaccessords,并且是直接和lucene對應上了。 

我們知道,在lucene裡,大部分檔案都是不可更新的。一個段一旦生成後就是不可變的,新的資料或者删除資料都需要生成新的段。docvalues的存儲檔案也是類似的。是以docvalues.unwrapsingleton其實就是做這個判定的,是不是有多個檔案 。無論是否則不是都直接建立了一個匿名的collector。

當個檔案的很好了解,包含了索引中newtype字段所有的值,其下标擷取也很自然。

根據文檔号擷取值對應的位置,如果ord >=0 則代表有值,否則代表沒有值。

如果有多個檔案,則會傳回如下的collecor:

上面的代碼可以保證多個檔案最終合起來保持一個檔案的序号。什麼意思呢?比如a檔案有一個文檔,b檔案有一個,那麼最終擷取的globalord 就是0,1 而不會都是0。此時的 ords 實作類 不是singletonsortedsetdocvalues 而是

對象了。

計數的方式兩個都大體類似。

這裡的bucketord 其實就是前面的ord/globalord。是以整個計算就是填充doccounts

es的 aggregation機制還是挺複雜的。本文試圖通過一個簡單的group by 的例子來完成對其機制的解釋。其中valuesource 那層我目前也沒沒完全吃透,如有表述不合适的地方,歡迎大家指出。