讀操作
實時性和的“寫操作”一樣,對于搜尋而言是近實時的,延遲在100ms以上,對于NoSQL則需要是實時的。
一緻性指的是寫入成功後,下次讀操作一定要能讀取到最新的資料。對于搜尋,這個要求會低一些,可以有一些延遲。但是對于NoSQL資料庫,則一般要求最好是強一緻性的。
結果比對上,NoSQL作為資料庫,查詢過程中隻有符合不符合兩種情況,而搜尋裡面還有是否相關,類似于NoSQL的結果隻能是0或1,而搜尋裡面可能會有0.1,0.5,0.9等部分比對或者更相關的情況。
結果召回上,搜尋一般隻需要召回最滿足條件的Top N結果即可,而NoSQL一般都需要傳回滿足條件的所有結果。
搜尋系統一般都是兩階段查詢,第一個階段查詢到對應的Doc ID,也就是PK;第二階段再通過Doc ID去查詢完整文檔,而NoSQL資料庫一般是一階段就傳回結果。在Elasticsearch中兩種都支援。
目前NoSQL的查詢,聚合、分析和統計等功能上都是要比搜尋弱的。
Lucene的讀
Elasticsearch使用了Lucene作為搜尋引擎庫,通過Lucene完成特定字段的搜尋等功能,在Lucene中這個功能是通過IndexSearcher的下列接口實作的:
public TopDocs search(Query query, int n);
public Document doc(int docID);
public int count(Query query);
......(其他)
第一個search接口實作搜尋功能,傳回最滿足Query的N個結果;第二個doc接口通過doc id查詢Doc内容;第三個count接口通過Query擷取到命中數。
這三個功能是搜尋中的最基本的三個功能點,對于大部分Elasticsearch中的查詢都是比較複雜的,直接用這個接口是無法滿足需求的,比如分布式問題。這些問題都留給了Elasticsearch解決,我們接下來看Elasticsearch中相關讀功能的剖析。
Elasticsearch的讀
Elasticsearch中每個Shard都會有多個Replica,主要是為了保證資料可靠性,除此之外,還可以增加讀能力,因為寫的時候雖然要寫大部分Replica Shard,但是查詢的時候隻需要查詢Primary和Replica中的任何一個就可以了。
Search On Replicas
在上圖中,該Shard有1個Primary和2個Replica Node,當查詢的時候,從三個節點中根據Request中的preference參數選擇一個節點查詢。preference可以設定_local,_primary,_replica以及其他選項。如果選擇了primary,則每次查詢都是直接查詢Primary,可以保證每次查詢都是最新的。如果設定了其他參數,那麼可能會查詢到R1或者R2,這時候就有可能查詢不到最新的資料。
上述代碼邏輯在OperationRouting.Java的searchShards方法中。
接下來看一下,Elasticsearch中的查詢是如何支援分布式的。
Elasticsearch中通過分區實作分布式,資料寫入的時候根據_routing規則将資料寫入某一個Shard中,這樣就能将海量資料分布在多個Shard以及多台機器上,已達到分布式的目标。這樣就導緻了查詢的時候,潛在資料會在目前index的所有的Shard中,是以Elasticsearch查詢的時候需要查詢所有Shard,同一個Shard的Primary和Replica選擇一個即可,查詢請求會分發給所有Shard,每個Shard中都是一個獨立的查詢引擎,比如需要傳回Top 10的結果,那麼每個Shard都會查詢并且傳回Top 10的結果,然後在Client Node裡面會接收所有Shard的結果,然後通過優先級隊列二次排序,選擇出Top 10的結果傳回給使用者。
這裡有一個問題就是請求膨脹,使用者的一個搜尋請求在Elasticsearch内部會變成Shard個請求,這裡有個優化點,雖然是Shard個請求,但是這個Shard個數不一定要是目前Index中的Shard個數,隻要是目前查詢相關的Shard即可,這個需要基于業務和請求内容優化,通過這種方式可以優化請求膨脹數。
Elasticsearch中的查詢主要分為兩類,Get請求:通過ID查詢特定Doc;Search請求:通過Query查詢比對Doc。
上圖中記憶體中的Segment是指剛Refresh Segment,但是還沒持久化到磁盤的新Segment,而非從磁盤加載到記憶體中的Segment。
對于Search類請求,查詢的時候是一起查詢記憶體和磁盤上的Segment,最後将結果合并後傳回。這種查詢是近實時(Near Real Time)的,主要是由于記憶體中的Index資料需要一段時間後才會重新整理為Segment。
對于Get類請求,查詢的時候是先查詢記憶體中的TransLog,如果找到就立即傳回,如果沒找到再查詢磁盤上的TransLog,如果還沒有則再去查詢磁盤上的Segment。這種查詢是實時(Real Time)的。這種查詢順序可以保證查詢到的Doc是最新版本的Doc,這個功能也是為了保證NoSQL場景下的實時性要求。
多階段查詢
所有的搜尋系統一般都是兩階段查詢,第一階段查詢到比對的DocID,第二階段再查詢DocID對應的完整文檔,這種在Elasticsearch中稱為query_then_fetch,還有一種是一階段查詢的時候就傳回完整Doc,在Elasticsearch中稱作query_and_fetch,一般第二種适用于隻需要查詢一個Shard的請求。
除了一階段,兩階段外,還有一種三階段查詢的情況。搜尋裡面有一種算分邏輯是根據TF(Term Frequency)和DF(Document Frequency)計算基礎分,但是Elasticsearch中查詢的時候,是在每個Shard中獨立查詢的,每個Shard中的TF和DF也是獨立的,雖然在寫入的時候通過_routing保證Doc分布均勻,但是沒法保證TF和DF均勻,那麼就有會導緻局部的TF和DF不準的情況出現,這個時候基于TF、DF的算分就不準。為了解決這個問題,Elasticsearch中引入了DFS查詢,比如DFS_query_then_fetch,會先收集所有Shard中的TF和DF值,然後将這些值帶入請求中,再次執行query_then_fetch,這樣算分的時候TF和DF就是準确的,類似的有DFS_query_and_fetch。這種查詢的優勢是算分更加精準,但是效率會變差。另一種選擇是用BM25代替TF/DF模型。
在新版本Elasticsearch中,使用者沒法指定DFS_query_and_fetch和query_and_fetch,這兩種隻能被Elasticsearch系統改寫。
Elasticsearch查詢流程
Elasticsearch中的大部分查詢,以及核心功能都是Search類型查詢,上面我們了解到查詢分為一階段,二階段和三階段,這裡我們就以最常見的的二階段查詢為例來介紹查詢流程。
查詢流程
注冊Action
Elasticsearch中,查詢和寫操作一樣都是在ActionModule.java中注冊入口處理函數的。
registerHandler.accept(new RestSearchAction(settings, restController));
......
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
......
如果請求是Rest請求,則會在RestSearchAction中解析請求,檢查查詢類型,不能設定為dfs_query_and_fetch或者query_and_fetch,這兩個目前隻能用于Elasticsearch中的優化場景,然後将請求發給後面的TransportSearchAction處理。然後構造SearchRequest,将請求發送給TransportSearchAction處理。
如果是第一階段的Query Phase請求,則會調用SearchService的executeQueryPhase方法。
如果是第二階段的Fetch Phase請求,則會調用SearchService的executeFetchPhase方法。
Client Node
Client Node 也包括了前面說過的Parse Request,這裡就不再贅述了,接下來看一下其他的部分。
1. Get Remove Cluster Shard
判斷是否需要跨叢集通路,如果需要,則擷取到要通路的Shard清單。
2. Get Search Shard Iterator
擷取目前Cluster中要通路的Shard,和上一步中的Remove Cluster Shard合并,建構出最終要通路的完整Shard清單。
這一步中,會根據Request請求中的參數從Primary Node和多個Replica Node中選擇出一個要通路的Shard。
3. For Every Shard:Perform
周遊每個Shard,對每個Shard執行後面邏輯。
4. Send Request To Query Shard
将查詢階段請求發送給相應的Shard。
5. Merge Docs
上一步将請求發送給多個Shard後,這一步就是異步等待傳回結果,然後對結果合并。這裡的合并政策是維護一個Top N大小的優先級隊列,每當收到一個shard的傳回,就把結果放入優先級隊列做一次排序,直到所有的Shard都傳回。
翻頁邏輯也是在這裡,如果需要取Top 30~ Top 40的結果,這個的意思是所有Shard查詢結果中的第30到40的結果,那麼在每個Shard中無法确定最終的結果,每個Shard需要傳回Top 40的結果給Client Node,然後Client Node中在merge docs的時候,計算出Top 40的結果,最後再去除掉Top 30,剩餘的10個結果就是需要的Top 30~ Top 40的結果。
上述翻頁邏輯有一個明顯的缺點就是每次Shard傳回的資料中包括了已經翻過的曆史結果,如果翻頁很深,則在這裡需要排序的Docs會很多,比如Shard有1000,取第9990到10000的結果,那麼這次查詢,Shard總共需要傳回1000 * 10000,也就是一千萬Doc,這種情況很容易導緻OOM。
另一種翻頁方式是使用search_after,這種方式會更輕量級,如果每次隻需要傳回10條結構,則每個Shard隻需要傳回search_after之後的10個結果即可,傳回的總資料量隻是和Shard個數以及本次需要的個數有關,和曆史已讀取的個數無關。這種方式更安全一些,推薦使用這種。
如果有aggregate,也會在這裡做聚合,但是不同的aggregate類型的merge政策不一樣,具體的可以在後面的aggregate文章中再介紹。
6. Send Request To Fetch Shard
選出Top N個Doc ID後發送給這些Doc ID所在的Shard執行Fetch Phase,最後會傳回Top N的Doc的内容。
Query Phase
接下來我們看第一階段查詢的步驟:
1. Create Search Context
建立Search Context,之後Search過程中的所有中間狀态都會存在Context中,這些狀态總共有50多個,具體可以檢視DefaultSearchContext或者其他SearchContext的子類。
2. Parse Query
解析Query的Source,将結果存入Search Context。這裡會根據請求中Query類型的不同建立不同的Query對象,比如TermQuery、FuzzyQuery等,最終真正執行TermQuery、FuzzyQuery等語義的地方是在Lucene中。
這裡包括了dfsPhase、queryPhase和fetchPhase三個階段的preProcess部分,隻有queryPhase的preProcess中有執行邏輯,其他兩個都是空邏輯,執行完preProcess後,所有需要的參數都會設定完成。
由于Elasticsearch中有些請求之間是互相關聯的,并非獨立的,比如scroll請求,是以這裡同時會設定Context的生命周期。
同時會設定lowLevelCancellation是否打開,這個參數是叢集級别配置,同時也能動态開關,打開後會在後面執行時做更多的檢測,檢測是否需要停止後續邏輯直接傳回。
3. Get From Cache
判斷請求是否允許被Cache,如果允許,則檢查Cache中是否已經有結果,如果有則直接讀取Cache,如果沒有則繼續執行後續步驟,執行完後,再将結果加入Cache。
4. Add Collectors
Collector主要目标是收集查詢結果,實作排序,對自定義結果集過濾和收集等。這一步會增加多個Collectors,多個Collector組成一個List。
- FilteredCollector:先判斷請求中是否有Post Filter,Post Filter用于Search,Agg等結束後再次對結果做Filter,希望Filter不影響Agg結果。如果有Post Filter則建立一個FilteredCollector,加入Collector List中。
- PluginInMultiCollector:判斷請求中是否制定了自定義的一些Collector,如果有,則建立後加入Collector List。
- MinimumScoreCollector:判斷請求中是否制定了最小分數門檻值,如果指定了,則建立MinimumScoreCollector加入Collector List中,在後續收集結果時,會過濾掉得分小于最小分數的Doc。
- EarlyTerminatingCollector:判斷請求中是否提前結束Doc的Seek,如果是則建立EarlyTerminatingCollector,加入Collector List中。在後續Seek和收集Doc的過程中,當Seek的Doc數達到Early Terminating後會停止Seek後續倒排鍊。
- CancellableCollector:判斷目前操作是否可以被中斷結束,比如是否已經逾時等,如果是會抛出一個TaskCancelledException異常。該功能一般用來提前結束較長的查詢請求,可以用來保護系統。
- EarlyTerminatingSortingCollector:如果Index是排序的,那麼可以提前結束對倒排鍊的Seek,相當于在一個排序遞減連結清單上傳回最大的N個值,隻需要直接傳回前N個值就可以了。這個Collector會加到Collector List的頭部。EarlyTerminatingSorting和EarlyTerminating的差別是,EarlyTerminatingSorting是一種對結果無損傷的優化,而EarlyTerminating是有損的,人為掐斷執行的優化。
- TopDocsCollector:這個是最核心的Top N結果選擇器,會加入到Collector List的頭部。TopScoreDocCollector和TopFieldCollector都是TopDocsCollector的子類,TopScoreDocCollector會按照固定的方式算分,排序會按照分數+doc id的方式排列,如果多個doc的分數一樣,先選擇doc id小的文檔。而TopFieldCollector則是根據使用者指定的Field的值排序。
5. lucene::search
這一步會調用Lucene中IndexSearch的search接口,執行真正的搜尋邏輯。每個Shard中會有多個Segment,每個Segment對應一個LeafReaderContext,這裡會周遊每個Segment,到每個Segment中去Search結果,然後計算分數。
搜尋裡面一般有兩階段算分,第一階段是在這裡算的,會對每個Seek到的Doc都計算分數,為了減少CPU消耗,一般是算一個基本分數。這一階段完成後,會有個排序。然後在第二階段,再對Top 的結果做一次二階段算分,在二階段算分的時候會考慮更多的因子。二階段算分在後續操作中。
具體請求,比如TermQuery、WildcardQuery的查詢邏輯都在Lucene中,後面會有專門文章介紹。
6. rescore
根據Request中是否包含rescore配置決定是否進行二階段排序,如果有則執行二階段算分邏輯,會考慮更多的算分因子。二階段算分也是一種計算機中常見的多層設計,是一種資源消耗和效率的折中。
Elasticsearch中支援配置多個Rescore,這些rescore邏輯會順序周遊執行。每個rescore内部會先按照請求參數window選擇出Top window的doc,然後對這些doc排序,排完後再合并回原有的Top 結果順序中。
7. suggest::execute()
如果有推薦請求,則在這裡執行推薦請求。如果請求中隻包含了推薦的部分,則很多地方可以優化。推薦不是今天的重點,這裡就不介紹了,後面有機會再介紹。
8. aggregation::execute()
如果含有聚合統計請求,則在這裡執行。Elasticsearch中的aggregate的處理邏輯也類似于Search,通過多個Collector來實作。在Client Node中也需要對aggregation做合并。aggregate邏輯更複雜一些,就不在這裡贅述了,後面有需要就再單獨開文章介紹。
上述邏輯都執行完成後,如果目前查詢請求隻需要查詢一個Shard,那麼會直接在目前Node執行Fetch Phase。
Fetch Phase
Elasticsearch作為搜尋系統時,或者任何搜尋系統中,除了Query階段外,還會有一個Fetch階段,這個Fetch階段在資料庫類系統中是沒有的,是搜尋系統中額外增加的階段。搜尋系統中額外增加Fetch階段的原因是搜尋系統中資料分布導緻的,在搜尋中,資料通過routing分Shard的時候,隻能根據一個主字段值來決定,但是查詢的時候可能會根據其他非主字段查詢,那麼這個時候所有Shard中都可能會存在相同非主字段值的Doc,是以需要查詢所有Shard才能不會出現結果遺漏。同時如果查詢主字段,那麼這個時候就能直接定位到Shard,就隻需要查詢特定Shard即可,這個時候就類似于資料庫系統了。另外,資料庫中的二級索引又是另外一種情況,但類似于查主字段的情況,這裡就不多說了。
基于上述原因,第一階段查詢的時候并不知道最終結果會在哪個Shard上,是以每個Shard中管都需要查詢完整結果,比如需要Top 10,那麼每個Shard都需要查詢目前Shard的所有資料,找出目前Shard的Top 10,然後傳回給Client Node。如果有100個Shard,那麼就需要傳回100 * 10 = 1000個結果,而Fetch Doc内容的操作比較耗費IO和CPU,如果在第一階段就Fetch Doc,那麼這個資源開銷就會非常大。是以,一般是當Client Node選擇出最終Top N的結果後,再對最終的Top N讀取Doc内容。通過增加一點網絡開銷而避免大量IO和CPU操作,這個折中是非常劃算的。
Fetch階段的目的是通過DocID擷取到使用者需要的完整Doc内容。這些内容包括了DocValues,Store,Source,Script和Highlight等,具體的功能點是在SearchModule中注冊的,系統預設注冊的有:
- ExplainFetchSubPhase
- DocValueFieldsFetchSubPhase
- ScriptFieldsFetchSubPhase
- FetchSourceSubPhase
- VersionFetchSubPhase
- MatchedQueriesFetchSubPhase
- HighlightPhase
- ParentFieldSubFetchPhase
除了系統預設的8種外,還有通過插件的形式注冊自定義的功能,這些SubPhase中最重要的是Source和Highlight,Source是加載原文,Highlight是計算高亮顯示的内容片斷。
上述多個SubPhase會針對每個Doc順序執行,可能會産生多次的随機IO,這裡會有一些優化方案,但是都是針對特定場景的,不具有通用性。
Fetch Phase執行完後,整個查詢流程就結束了。
總結
Elasticsearch中的查詢流程比較簡單,更多的查詢原理都在Lucene中,後續我們會有針對不同請求的Lucene原理介紹性文章。