高可用架構
ZSearch是目前公司内最大的Elasticsearch服務平台,随着業務的深入,越來越多的關鍵鍊路使用者對資料的可用性和容災能力提出更高的需求,而在這塊領域 社群一直沒有完整的解決政策,原生的 Snapshot And Restore 隻能做快照的恢複,不能做到實時同步;業内主流的隊列分發模式(通過消息隊列緩存請求資料,多個叢集消費資料實作請求複制)也隻能做到請求的同步,一個不可預期的操作如delete_by_query 叢集間便會産生資料差異。歸根到底我們需要一個類似Mysql的binlog同步方案, 從底層機制上保證資料的最終一緻性,為此 ZSearch核心團隊經過數月源碼研究 精心打造了Elasticsearch-XDCR,一款基于translog原生複制協定的跨叢集同步産品。
Elasticsearch-XDCR
elasticsearch-xdcr 采用插件形式,産品形态上依然保留了ES社群一貫的簡單風格。
- 安裝簡單,隻要在兩個叢集安裝插件重新開機即可。
- 使用簡單,兩個restful操作完成資料同步。
- 資料一緻,基于原生複制協定,支援實時同步。
- 穩定友善,支援mapping自動更新以及斷點續傳。
使用樣例
第一步 在主叢集建立備份叢集倉庫
PUT _xdcr/{cluser_name} -d {
"settings": {
"seeds": [{node_1}, {node_2}, ... ]
}
}
第二步 開啟索引同步
_xdcr/{cluster_name}/{job_name} -d '
{
"indices": "index_1,index_2 ..."
}
'
源碼實作
本文将通過Elasticsearch源碼分析,結合其原生複制處理過程來講解xdcr的設計思路,首先我們了解ES的一些基礎概念。
- 一份索引(index)可分成多個分片(shard)并配置設定在不同節點上(node) 。
- 一份shard可由一個主分片(primary)和多個備副本(replica)組成。
- 資料先寫到主分片,寫入成功則同步到各個備副本上。
elasticsearch-xdcr的實作思路便是分析primary到replica的内部複制處理流程,将複制目标擴充到外部叢集達成跨叢集資料同步效果。
叢集排程模型
- 類似于k8s|hippo,ES的分布式排程模型也采用一種目标式的資源管理方式,即master釋出一個全局狀态(ClusterState),工作節點共同協作,直到整體叢集狀态達到一緻。
- RoutingTable(路由表)是狀态的一個子子產品,他管理了所有索引的路由狀态,如建立的索引放在那些節點上,某些節點磁盤快滿了應該遷移出去等等。
- ShardRouting 是RoutingTable的具體表現,他表示一個分片的路由規則,如分片是否為主分片、放在哪個節點、現在處于什麼狀态等等。
當我們為一個索引建立一個新的副本,Master節點便會釋出一個新的叢集狀态,被配置設定的Work節點根據ShardRouting找到主分片位置并建立恢複任務,此過程在ES中被稱之為peer_recovery。
核心源碼指引: org.elasticsearch.indices.cluster.IndicesClusterStateService
“推模式”索引拷貝流程
了解了總的處理流程,我們可以把關注點聚焦到主-副間的具體恢複流程。ES采用的是一種推模式,主分片在接收到副本節點發來的start_recovery請求,會源源不斷的往目标分片推送索引資料,直到全部結束後,備節點将shard更新到可服務狀态 完成peer_recovery流程,推的過程分為三個階段:
- phase1: 同步索引檔案,已經建構好的最終索引檔案。
- phase2: 同步translog,需要做恢複形成最終索引檔案。
- finalize: 同步checkpoint,更新全局送出點,保持和主分片的Sequnce位點同步。
源碼及調用關系參照下圖
核心源碼指引
org.elasticsearch.indices.recovery.*
跨叢集改造
ES的原生PeerRecovery過程隻能在索引的建立初始化階段(init),而跨叢集方案中外部叢集通常已經建好了對等的索引,處于服務狀态(started)的shard不允許做任何背景任務。最初的方案我們是想設計一種新的ShardRouting,在索引核心加載層擴充新的Recovery類型支援遠端recovery,但這樣的改動會侵入到ES core層源碼,不利于後續的版本疊代。如果想無侵入以插件形式實作跨叢集複制,隻能利用二階段的translog recovery,shard在服務狀态translog可僞裝成primary操作寫入引擎,但是純translog recovery在速度上遠不及索引檔案拷貝,經過各種利弊權衡,我們最終還是采用零侵入的translog方式,畢竟不綁架使用者的産品更具生命力。
代碼片段
Engine.Operation.Origin.PEER_RECOVERY -> Engine.Operation.Origin.PRIMARY
for (Translog.Operation operation : operations) {
Engine.Result result = shard.applyTranslogOperation(operation, Engine.Operation.Origin.PRIMARY, update -> {
mappingUpdater.updateMappings(update, shard.shardId(), ((Translog.Index) operation).type());
throw new ReplicationOperation.RetryOnPrimaryException(shard.shardId(), "Mapping updated");
});
}
SequnceNumber & 斷點續傳
ES 6.0之後引入了一個非常重要的特性 SequnceNumber,shard上任何類型的寫操作,包括index、create、update和delete,都會生成一個連續遞增的_seq_no。這使得Node異常或重新開機時,能夠從最後的位點(checkpoint)快速恢複,這個特性同樣也使的跨叢集複制具備了第二個重要條件 - 斷點續傳 。利用該特性,在多機房容場景抵禦各種不可預知的問題,如網絡抖動、網絡故障、叢集當機等等,恢複後的recovery流依然可以通過seqNo快速恢複。
Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(startingSeqNo);
結尾
由于篇幅有限,本文介紹了實作ES跨叢集同步的兩個重要條件和實作思路,後期可結合思路詳細講解實作的過程。