天天看點

Elasticsearch核心解析 - 查詢篇

讀操作

實時性和的“寫操作”一樣,對于搜尋而言是近實時的,延遲在100ms以上,對于NoSQL則需要是實時的。

一緻性指的是寫入成功後,下次讀操作一定要能讀取到最新的資料。對于搜尋,這個要求會低一些,可以有一些延遲。但是對于NoSQL資料庫,則一般要求最好是強一緻性的。

結果比對上,NoSQL作為資料庫,查詢過程中隻有符合不符合兩種情況,而搜尋裡面還有是否相關,類似于NoSQL的結果隻能是0或1,而搜尋裡面可能會有0.1,0.5,0.9等部分比對或者更相關的情況。

結果召回上,搜尋一般隻需要召回最滿足條件的Top N結果即可,而NoSQL一般都需要傳回滿足條件的所有結果。

搜尋系統一般都是兩階段查詢,第一個階段查詢到對應的Doc ID,也就是PK;第二階段再通過Doc ID去查詢完整文檔,而NoSQL資料庫一般是一階段就傳回結果。在Elasticsearch中兩種都支援。

目前NoSQL的查詢,聚合、分析和統計等功能上都是要比搜尋弱的。

Lucene的讀

Elasticsearch使用了Lucene作為搜尋引擎庫,通過Lucene完成特定字段的搜尋等功能,在Lucene中這個功能是通過IndexSearcher的下列接口實作的:

public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
......(其他)
           

第一個search接口實作搜尋功能,傳回最滿足Query的N個結果;第二個doc接口通過doc id查詢Doc内容;第三個count接口通過Query擷取到命中數。

這三個功能是搜尋中的最基本的三個功能點,對于大部分Elasticsearch中的查詢都是比較複雜的,直接用這個接口是無法滿足需求的,比如分布式問題。這些問題都留給了Elasticsearch解決,我們接下來看Elasticsearch中相關讀功能的剖析。

Elasticsearch的讀

Elasticsearch中每個Shard都會有多個Replica,主要是為了保證資料可靠性,除此之外,還可以增加讀能力,因為寫的時候雖然要寫大部分Replica Shard,但是查詢的時候隻需要查詢Primary和Replica中的任何一個就可以了。

Elasticsearch核心解析 - 查詢篇

Search On Replicas

在上圖中,該Shard有1個Primary和2個Replica Node,當查詢的時候,從三個節點中根據Request中的preference參數選擇一個節點查詢。preference可以設定_local,_primary,_replica以及其他選項。如果選擇了primary,則每次查詢都是直接查詢Primary,可以保證每次查詢都是最新的。如果設定了其他參數,那麼可能會查詢到R1或者R2,這時候就有可能查詢不到最新的資料。

上述代碼邏輯在OperationRouting.Java的searchShards方法中。

接下來看一下,Elasticsearch中的查詢是如何支援分布式的。

Elasticsearch核心解析 - 查詢篇

Elasticsearch中通過分區實作分布式,資料寫入的時候根據_routing規則将資料寫入某一個Shard中,這樣就能将海量資料分布在多個Shard以及多台機器上,已達到分布式的目标。這樣就導緻了查詢的時候,潛在資料會在目前index的所有的Shard中,是以Elasticsearch查詢的時候需要查詢所有Shard,同一個Shard的Primary和Replica選擇一個即可,查詢請求會分發給所有Shard,每個Shard中都是一個獨立的查詢引擎,比如需要傳回Top 10的結果,那麼每個Shard都會查詢并且傳回Top 10的結果,然後在Client Node裡面會接收所有Shard的結果,然後通過優先級隊列二次排序,選擇出Top 10的結果傳回給使用者。

這裡有一個問題就是請求膨脹,使用者的一個搜尋請求在Elasticsearch内部會變成Shard個請求,這裡有個優化點,雖然是Shard個請求,但是這個Shard個數不一定要是目前Index中的Shard個數,隻要是目前查詢相關的Shard即可,這個需要基于業務和請求内容優化,通過這種方式可以優化請求膨脹數。

Elasticsearch中的查詢主要分為兩類,Get請求:通過ID查詢特定Doc;Search請求:通過Query查詢比對Doc。

Elasticsearch核心解析 - 查詢篇
上圖中記憶體中的Segment是指剛Refresh Segment,但是還沒持久化到磁盤的新Segment,而非從磁盤加載到記憶體中的Segment。

對于Search類請求,查詢的時候是一起查詢記憶體和磁盤上的Segment,最後将結果合并後傳回。這種查詢是近實時(Near Real Time)的,主要是由于記憶體中的Index資料需要一段時間後才會重新整理為Segment。

對于Get類請求,查詢的時候是先查詢記憶體中的TransLog,如果找到就立即傳回,如果沒找到再查詢磁盤上的TransLog,如果還沒有則再去查詢磁盤上的Segment。這種查詢是實時(Real Time)的。這種查詢順序可以保證查詢到的Doc是最新版本的Doc,這個功能也是為了保證NoSQL場景下的實時性要求。

Elasticsearch核心解析 - 查詢篇

多階段查詢

所有的搜尋系統一般都是兩階段查詢,第一階段查詢到比對的DocID,第二階段再查詢DocID對應的完整文檔,這種在Elasticsearch中稱為query_then_fetch,還有一種是一階段查詢的時候就傳回完整Doc,在Elasticsearch中稱作query_and_fetch,一般第二種适用于隻需要查詢一個Shard的請求。

除了一階段,兩階段外,還有一種三階段查詢的情況。搜尋裡面有一種算分邏輯是根據TF(Term Frequency)和DF(Document Frequency)計算基礎分,但是Elasticsearch中查詢的時候,是在每個Shard中獨立查詢的,每個Shard中的TF和DF也是獨立的,雖然在寫入的時候通過_routing保證Doc分布均勻,但是沒法保證TF和DF均勻,那麼就有會導緻局部的TF和DF不準的情況出現,這個時候基于TF、DF的算分就不準。為了解決這個問題,Elasticsearch中引入了DFS查詢,比如DFS_query_then_fetch,會先收集所有Shard中的TF和DF值,然後将這些值帶入請求中,再次執行query_then_fetch,這樣算分的時候TF和DF就是準确的,類似的有DFS_query_and_fetch。這種查詢的優勢是算分更加精準,但是效率會變差。另一種選擇是用BM25代替TF/DF模型。

在新版本Elasticsearch中,使用者沒法指定DFS_query_and_fetch和query_and_fetch,這兩種隻能被Elasticsearch系統改寫。

Elasticsearch查詢流程

Elasticsearch中的大部分查詢,以及核心功能都是Search類型查詢,上面我們了解到查詢分為一階段,二階段和三階段,這裡我們就以最常見的的二階段查詢為例來介紹查詢流程。

Elasticsearch核心解析 - 查詢篇

查詢流程

注冊Action

Elasticsearch中,查詢和寫操作一樣都是在ActionModule.java中注冊入口處理函數的。

registerHandler.accept(new RestSearchAction(settings, restController));
......
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
......
           

如果請求是Rest請求,則會在RestSearchAction中解析請求,檢查查詢類型,不能設定為dfs_query_and_fetch或者query_and_fetch,這兩個目前隻能用于Elasticsearch中的優化場景,然後将請求發給後面的TransportSearchAction處理。然後構造SearchRequest,将請求發送給TransportSearchAction處理。

Elasticsearch核心解析 - 查詢篇

如果是第一階段的Query Phase請求,則會調用SearchService的executeQueryPhase方法。

Elasticsearch核心解析 - 查詢篇

如果是第二階段的Fetch Phase請求,則會調用SearchService的executeFetchPhase方法。

Client Node

Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。

1. Get Remove Cluster Shard

判斷是否需要跨叢集通路,如果需要,則擷取到要通路的Shard清單。

2. Get Search Shard Iterator

擷取目前Cluster中要通路的Shard,和上一步中的Remove Cluster Shard合并,建構出最終要通路的完整Shard清單。

這一步中,會根據Request請求中的參數從Primary Node和多個Replica Node中選擇出一個要通路的Shard。

3. For Every Shard:Perform

周遊每個Shard,對每個Shard執行後面邏輯。

4. Send Request To Query Shard

将查詢階段請求發送給相應的Shard。

5. Merge Docs

上一步将請求發送給多個Shard後,這一步就是異步等待傳回結果,然後對結果合并。這裡的合并政策是維護一個Top N大小的優先級隊列,每當收到一個shard的傳回,就把結果放入優先級隊列做一次排序,直到所有的Shard都傳回。

翻頁邏輯也是在這裡,如果需要取Top 30~ Top 40的結果,這個的意思是所有Shard查詢結果中的第30到40的結果,那麼在每個Shard中無法确定最終的結果,每個Shard需要傳回Top 40的結果給Client Node,然後Client Node中在merge docs的時候,計算出Top 40的結果,最後再去除掉Top 30,剩餘的10個結果就是需要的Top 30~ Top 40的結果。

上述翻頁邏輯有一個明顯的缺點就是每次Shard傳回的資料中包括了已經翻過的曆史結果,如果翻頁很深,則在這裡需要排序的Docs會很多,比如Shard有1000,取第9990到10000的結果,那麼這次查詢,Shard總共需要傳回1000 * 10000,也就是一千萬Doc,這種情況很容易導緻OOM。

另一種翻頁方式是使用search_after,這種方式會更輕量級,如果每次隻需要傳回10條結構,則每個Shard隻需要傳回search_after之後的10個結果即可,傳回的總資料量隻是和Shard個數以及本次需要的個數有關,和曆史已讀取的個數無關。這種方式更安全一些,推薦使用這種。

如果有aggregate,也會在這裡做聚合,但是不同的aggregate類型的merge政策不一樣,具體的可以在後面的aggregate文章中再介紹。

6. Send Request To Fetch Shard

選出Top N個Doc ID後發送給這些Doc ID所在的Shard執行Fetch Phase,最後會傳回Top N的Doc的内容。

Query Phase

接下來我們看第一階段查詢的步驟:

1. Create Search Context

建立Search Context,之後Search過程中的所有中間狀态都會存在Context中,這些狀态總共有50多個,具體可以檢視DefaultSearchContext或者其他SearchContext的子類。

2. Parse Query

解析Query的Source,将結果存入Search Context。這裡會根據請求中Query類型的不同建立不同的Query對象,比如TermQuery、FuzzyQuery等,最終真正執行TermQuery、FuzzyQuery等語義的地方是在Lucene中。

這裡包括了dfsPhase、queryPhase和fetchPhase三個階段的preProcess部分,隻有queryPhase的preProcess中有執行邏輯,其他兩個都是空邏輯,執行完preProcess後,所有需要的參數都會設定完成。

由于Elasticsearch中有些請求之間是互相關聯的,并非獨立的,比如scroll請求,是以這裡同時會設定Context的生命周期。

同時會設定lowLevelCancellation是否打開,這個參數是叢集級别配置,同時也能動态開關,打開後會在後面執行時做更多的檢測,檢測是否需要停止後續邏輯直接傳回。

3. Get From Cache

判斷請求是否允許被Cache,如果允許,則檢查Cache中是否已經有結果,如果有則直接讀取Cache,如果沒有則繼續執行後續步驟,執行完後,再将結果加入Cache。

4. Add Collectors

Collector主要目标是收集查詢結果,實作排序,對自定義結果集過濾和收集等。這一步會增加多個Collectors,多個Collector組成一個List。

  1. FilteredCollector:先判斷請求中是否有Post Filter,Post Filter用于Search,Agg等結束後再次對結果做Filter,希望Filter不影響Agg結果。如果有Post Filter則建立一個FilteredCollector,加入Collector List中。
  2. PluginInMultiCollector:判斷請求中是否制定了自定義的一些Collector,如果有,則建立後加入Collector List。
  3. MinimumScoreCollector:判斷請求中是否制定了最小分數門檻值,如果指定了,則建立MinimumScoreCollector加入Collector List中,在後續收集結果時,會過濾掉得分小于最小分數的Doc。
  4. EarlyTerminatingCollector:判斷請求中是否提前結束Doc的Seek,如果是則建立EarlyTerminatingCollector,加入Collector List中。在後續Seek和收集Doc的過程中,當Seek的Doc數達到Early Terminating後會停止Seek後續倒排鍊。
  5. CancellableCollector:判斷目前操作是否可以被中斷結束,比如是否已經逾時等,如果是會抛出一個TaskCancelledException異常。該功能一般用來提前結束較長的查詢請求,可以用來保護系統。
  6. EarlyTerminatingSortingCollector:如果Index是排序的,那麼可以提前結束對倒排鍊的Seek,相當于在一個排序遞減連結清單上傳回最大的N個值,隻需要直接傳回前N個值就可以了。這個Collector會加到Collector List的頭部。EarlyTerminatingSorting和EarlyTerminating的差別是,EarlyTerminatingSorting是一種對結果無損傷的優化,而EarlyTerminating是有損的,人為掐斷執行的優化。
  7. TopDocsCollector:這個是最核心的Top N結果選擇器,會加入到Collector List的頭部。TopScoreDocCollector和TopFieldCollector都是TopDocsCollector的子類,TopScoreDocCollector會按照固定的方式算分,排序會按照分數+doc id的方式排列,如果多個doc的分數一樣,先選擇doc id小的文檔。而TopFieldCollector則是根據使用者指定的Field的值排序。

5. lucene::search

這一步會調用Lucene中IndexSearch的search接口,執行真正的搜尋邏輯。每個Shard中會有多個Segment,每個Segment對應一個LeafReaderContext,這裡會周遊每個Segment,到每個Segment中去Search結果,然後計算分數。

搜尋裡面一般有兩階段算分,第一階段是在這裡算的,會對每個Seek到的Doc都計算分數,為了減少CPU消耗,一般是算一個基本分數。這一階段完成後,會有個排序。然後在第二階段,再對Top 的結果做一次二階段算分,在二階段算分的時候會考慮更多的因子。二階段算分在後續操作中。

具體請求,比如TermQuery、WildcardQuery的查詢邏輯都在Lucene中,後面會有專門文章介紹。

6. rescore

根據Request中是否包含rescore配置決定是否進行二階段排序,如果有則執行二階段算分邏輯,會考慮更多的算分因子。二階段算分也是一種計算機中常見的多層設計,是一種資源消耗和效率的折中。

Elasticsearch中支援配置多個Rescore,這些rescore邏輯會順序周遊執行。每個rescore内部會先按照請求參數window選擇出Top window的doc,然後對這些doc排序,排完後再合并回原有的Top 結果順序中。

7. suggest::execute()

如果有推薦請求,則在這裡執行推薦請求。如果請求中隻包含了推薦的部分,則很多地方可以優化。推薦不是今天的重點,這裡就不介紹了,後面有機會再介紹。

8. aggregation::execute()

如果含有聚合統計請求,則在這裡執行。Elasticsearch中的aggregate的處理邏輯也類似于Search,通過多個Collector來實作。在Client Node中也需要對aggregation做合并。aggregate邏輯更複雜一些,就不在這裡贅述了,後面有需要就再單獨開文章介紹。

上述邏輯都執行完成後,如果目前查詢請求隻需要查詢一個Shard,那麼會直接在目前Node執行Fetch Phase。

Fetch Phase

Elasticsearch作為搜尋系統時,或者任何搜尋系統中,除了Query階段外,還會有一個Fetch階段,這個Fetch階段在資料庫類系統中是沒有的,是搜尋系統中額外增加的階段。搜尋系統中額外增加Fetch階段的原因是搜尋系統中資料分布導緻的,在搜尋中,資料通過routing分Shard的時候,隻能根據一個主字段值來決定,但是查詢的時候可能會根據其他非主字段查詢,那麼這個時候所有Shard中都可能會存在相同非主字段值的Doc,是以需要查詢所有Shard才能不會出現結果遺漏。同時如果查詢主字段,那麼這個時候就能直接定位到Shard,就隻需要查詢特定Shard即可,這個時候就類似于資料庫系統了。另外,資料庫中的二級索引又是另外一種情況,但類似于查主字段的情況,這裡就不多說了。

基于上述原因,第一階段查詢的時候并不知道最終結果會在哪個Shard上,是以每個Shard中管都需要查詢完整結果,比如需要Top 10,那麼每個Shard都需要查詢目前Shard的所有資料,找出目前Shard的Top 10,然後傳回給Client Node。如果有100個Shard,那麼就需要傳回100 * 10 = 1000個結果,而Fetch Doc内容的操作比較耗費IO和CPU,如果在第一階段就Fetch Doc,那麼這個資源開銷就會非常大。是以,一般是當Client Node選擇出最終Top N的結果後,再對最終的Top N讀取Doc内容。通過增加一點網絡開銷而避免大量IO和CPU操作,這個折中是非常劃算的。

Fetch階段的目的是通過DocID擷取到使用者需要的完整Doc内容。這些内容包括了DocValues,Store,Source,Script和Highlight等,具體的功能點是在SearchModule中注冊的,系統預設注冊的有:

  • ExplainFetchSubPhase
  • DocValueFieldsFetchSubPhase
  • ScriptFieldsFetchSubPhase
  • FetchSourceSubPhase
  • VersionFetchSubPhase
  • MatchedQueriesFetchSubPhase
  • HighlightPhase
  • ParentFieldSubFetchPhase

除了系統預設的8種外,還有通過插件的形式注冊自定義的功能,這些SubPhase中最重要的是Source和Highlight,Source是加載原文,Highlight是計算高亮顯示的内容片斷。

上述多個SubPhase會針對每個Doc順序執行,可能會産生多次的随機IO,這裡會有一些優化方案,但是都是針對特定場景的,不具有通用性。

Fetch Phase執行完後,整個查詢流程就結束了。

總結

Elasticsearch中的查詢流程比較簡單,更多的查詢原理都在Lucene中,後續我們會有針對不同請求的Lucene原理介紹性文章。