天天看點

深入了解Elasticsearch寫入過程

作者:CTO修煉之路

Elasticsearch 是目前主流的搜尋引擎,其具有擴充性好,查詢速度快,查詢結果近實時等優點,本文将對Elasticsearch的寫操作進行分析。

1. lucene的寫操作及其問題

Elasticsearch底層使用Lucene來實作doc的讀寫操作,Lucene通過

public long addDocument(...);
public long deleteDocuments(...);
public long updateDocument(...);           

三個方法來實作文檔的寫入,更新和删除操作。但是存在如下問題

  1. 沒有并發設計 lucene隻是一個搜尋引擎庫,并沒有涉及到分布式相關的設計,是以要想使用Lucene來處理海量資料,并利用分布式的能力,就必須在其之上進行分布式的相關設計。
  2. 非實時 将檔案寫入lucence後并不能立即被檢索,需要等待lucene生成一個完整的segment才能被檢索
  3. 資料存儲不可靠 寫入lucene的資料不會立即被持久化到磁盤,如果伺服器當機,那存儲在記憶體中的資料将會丢失
  4. 不支援部分更新 lucene中提供的updateDocuments僅支援對文檔的全量更新,對部分更新不支援

2. Elasticsearch的寫入方案

針對Lucene的問題,ES做了如下設計

2.1 分布式設計:

為了支援對海量資料的存儲和查詢,Elasticsearch引入分片的概念,一個索引被分成多個分片,每個分片可以有一個主分片和多個副本分片,每個分片副本都是一個具有完整功能的lucene執行個體。分片可以配置設定在不同的伺服器上,同一個分片的不同副本不能配置設定在相同的伺服器上。

在進行寫操作時,ES會根據傳入的_routing參數(或mapping中設定的_routing, 如果參數和設定中都沒有則預設使用_id), 按照公式 shard_num=hash(\routing)%num_primary_shards,計算出文檔要配置設定到的分片,在從叢集中繼資料中找出對應主分片的位置,将請求路由到該分片進行文檔寫操作。

深入了解Elasticsearch寫入過程

2.2 近實時性-refresh操作

當一個文檔寫入Lucene後是不能被立即查詢到的,Elasticsearch提供了一個refresh操作,會定時地調用lucene的reopen(新版本為openIfChanged)為記憶體中新寫入的資料生成一個新的segment,此時被處理的文檔均可以被檢索到。refresh操作的時間間隔由 refresh_interval參數控制,預設為1s, 當然還可以在寫入請求中帶上refresh表示寫入後立即refresh,另外還可以調用refresh API顯式refresh。

2.3 資料存儲可靠性

  1. 引入translog 當一個文檔寫入Lucence後是存儲在記憶體中的,即使執行了refresh操作仍然是在檔案系統緩存中,如果此時伺服器當機,那麼這部分資料将會丢失。為此ES增加了translog, 當進行文檔寫操作時會先将文檔寫入Lucene,然後寫入一份到translog,寫入translog是落盤的(如果對可靠性要求不是很高,也可以設定異步落盤,可以提高性能,由配置 index.translog.durability和 index.translog.sync_interval控制),這樣就可以防止伺服器當機後資料的丢失。由于translog是追加寫入,是以性能要比随機寫入要好。與傳統的分布式系統不同,這裡是先寫入Lucene再寫入translog,原因是寫入Lucene可能會失敗,為了減少寫入失敗復原的複雜度,是以先寫入Lucene.
  2. flush操作 另外每30分鐘或當translog達到一定大小(由 index.translog.flush_threshold_size控制,預設512mb), ES會觸發一次flush操作,此時ES會先執行refresh操作将buffer中的資料生成segment,然後調用lucene的commit方法将所有記憶體中的segment fsync到磁盤。此時lucene中的資料就完成了持久化,會清空translog中的資料(6.x版本為了實作sequenceIDs,不删除translog)
深入了解Elasticsearch寫入過程
  1. merge操作 由于refresh預設間隔為1s中,是以會産生大量的小segment,為此ES會運作一個任務檢測目前磁盤中的segment,對符合條件的segment進行合并操作,減少lucene中的segment個數,提高查詢速度,降低負載。不僅如此,merge過程也是文檔删除和更新操作後,舊的doc真正被删除的時候。使用者還可以手動調用_forcemerge API來主動觸發merge,以減少叢集的segment個數和清理已删除或更新的文檔。
  2. 多副本機制 另外ES有多副本機制,一個分片的主副分片不能分片在同一個節點上,進一步保證資料的可靠性。

2.4 部分更新

lucene僅支援對文檔的整體更新,ES為了支援局部更新,在Lucene的Store索引中存儲了一個_source字段,該字段的key值是文檔ID, 内容是文檔的原文。當進行更新操作時先從_source中擷取原文,與更新部分合并後,再調用lucene API進行全量更新, 對于寫入了ES但是還沒有refresh的文檔,可以從translog中擷取。另外為了防止讀取文檔過程後執行更新前有其他線程修改了文檔,ES增加了版本機制,當執行更新操作時發現目前文檔的版本與預期不符,則會重新擷取文檔再更新。

3. ES的寫入流程

ES的任意節點都可以作為協調節點(coordinating node)接受請求,當協調節點接受到請求後進行一系列處理,然後通過_routing字段找到對應的primary shard,并将請求轉發給primary shard, primary shard完成寫入後,将寫入并發發送給各replica, raplica執行寫入操作後傳回給primary shard, primary shard再将請求傳回給協調節點。大緻流程如下圖:

深入了解Elasticsearch寫入過程

3.1 coordinating節點

ES中接收并轉發請求的節點稱為coordinating節點,ES中所有節點都可以接受并轉發請求。當一個節點接受到寫請求或更新請求後,會執行如下操作:

  1. ingest pipeline 檢視該請求是否符合某個ingest pipeline的pattern, 如果符合則執行pipeline中的邏輯,一般是對文檔進行各種預處理,如格式調整,增加字段等。如果目前節點沒有ingest角色,則需要将請求轉發給有ingest角色的節點執行。
  2. 自動建立索引 判斷索引是否存在,如果開啟了自動建立則自動建立,否則報錯
  3. 設定routing 擷取請求URL或mapping中的_routing,如果沒有則使用_id, 如果沒有指定_id則ES會自動生成一個全局唯一ID。該_routing字段用于決定文檔配置設定在索引的哪個shard上。
  4. 建構BulkShardRequest 由于Bulk Request中包含多種(Index/Update/Delete)請求,這些請求分别需要到不同的shard上執行,是以協調節點,會将請求按照shard分開,同一個shard上的請求聚合到一起,建構BulkShardRequest
  5. 将請求發送給primary shard 因為目前執行的是寫操作,是以隻能在primary上完成,是以需要把請求路由到primary shard所在節點
  6. 等待primary shard傳回

3.2 primary shard

Primary請求的入口是PrimaryOperationTransportHandler的MessageReceived, 當接收到請求時,執行的邏輯如下

  1. 判斷操作類型 周遊bulk請求中的各子請求,根據不同的操作類型跳轉到不同的處理邏輯
  2. 将update操作轉換為Index和Delete操作 擷取文檔的目前内容,與update内容合并生成新文檔,然後将update請求轉換成index請求,此處文檔設定一個version v1
  3. Parse Doc 解析文檔的各字段,并添加如_uid等ES相關的一些系統字段
  4. 更新mapping 對于新增字段會根據dynamic mapping或dynamic template生成對應的mapping,如果mapping中有dynamic mapping相關設定則按設定處理,如忽略或抛出異常
  5. 擷取sequence Id和Version 從SequcenceNumberService擷取一個sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根據目前Versoin+1用于防止并發寫導緻資料不一緻。
  6. 寫入lucene 這一步開始會對文檔uid加鎖,然後判斷uid對應的version v2和之前update轉換時的versoin v1是否一緻,不一緻則傳回第二步重新執行。 如果version一緻,如果同id的doc已經存在,則調用lucene的updateDocument接口,如果是新文檔則調用lucene的addDoucument. 這裡有個問題,如何保證Delete-Then-Add的原子性,ES是通過在Delete之前會加上已refresh鎖,禁止被refresh,隻有等待Add完成後釋放了Refresh Lock, 這樣就保證了這個操作的原子性。
  7. 寫入translog 寫入Lucene的Segment後,會以key value的形式寫Translog, Key是Id, Value是Doc的内容。當查詢的時候,如果請求的是GetDocById則可以直接根據_id從translog中擷取。滿足nosql場景的實時性。
  8. 重構bulk request 因為primary shard已經将update操作轉換為index操作或delete操作,是以要對之前的bulkrequest進行調整,隻包含index或delete操作,不需要再進行update的處理操作。
  9. flush translog 預設情況下,translog要在此處落盤完成,如果對可靠性要求不高,可以設定translog異步,那麼translog的fsync将會異步執行,但是落盤前的資料有丢失風險。
  10. 發送請求給replicas 将構造好的bulkrequest并發發送給各replicas,等待replica傳回,這裡需要等待所有的replicas傳回,響應請求給協調節點。如果某個shard執行失敗,則primary會給master發請求remove該shard。這裡會同時把sequenceID, primaryTerm, GlobalCheckPoint等傳遞給replica。
  11. 等待replica響應 當所有的replica傳回請求時,更細primary shard的LocalCheckPoint。

3.3 replica shard

Replica 請求的入口是在ReplicaOperationTransportHandler的messageReceived,當replica shard接收到請求時執行如下流程:

  1. 判斷操作類型 replica收到的寫如請求隻會有add和delete,因update在primary shard上已經轉換為add或delete了。根據不同的操作類型執行對應的操作
  2. Parse Doc
  3. 更新mapping
  4. 擷取sequenceId和Version 直接使用primary shard發送過來的請求中的内容即可
  5. 寫如lucene
  6. write Translog
  7. Flush translog

4 總結與分析

Elasticsearch建立在Lucene基礎之上,底層采用Lucene來實作檔案的讀寫操作,實作了文檔的存儲和高效查詢。然後Lucene作為一個搜尋庫在應對海量資料的存儲上仍有一些不足之處。

Elasticsearch通過引入分片概念,成功地将lucene部署到分布式系統中,增強了系統的可靠性和擴充性。

Elasticsearch通過定期refresh lucene in-momory-buffer中的資料,使得ES具有了近實時的寫入和查詢能力。

Elasticsearch通過引入translog,多副本,以及定期執行flush,merge等操作保證了資料可靠性和較高的存儲性能。

Elasticsearch通過存儲_source字段結合verison字段實作了文檔的局部更新,使得ES的使用方式更加靈活多樣。

Elasticsearch基于lucene,又不簡單地隻是lucene,它完美地将lucene與分布式系統結合,既利用了lucene的檢索能力,又具有了分布式系統的衆多優點。

繼續閱讀