天天看点

ElasticSearch Recovery 分析

另外你可能还需要了解下 recovery 阶段迁移过程:

<b>init -&gt;  index -&gt; verify_index -&gt; translog -&gt; finalize -&gt; done</b>

recovery 其实有两种:

primary的迁移/replication的生成和迁移 

primary的恢复

org.elasticsearch.indices.cluster.indicesclusterstateservice.clusterchanged 被触发后,会触发applyneworupdatedshards 函数的调用,这里是我们整个分析的起点。大家可以跑进去看看,然后跟着文章打开对应的源码浏览。

阅读完这篇文章,我们能够得到:

熟悉整个recovery 流程

了解translog机制

掌握对应的代码体系结构

这个是一般出现故障集群重启的时候可能遇到的。首先需要从store里进行恢复。

primary 进行自我恢复,所以并不需要其他节点的支持。所以判定的函数叫做ispeerrecovery 其实还是挺合适的。

indexservice.shard(shardid).recoverfromstore  调用的是 org.elasticsearch.index.shard.indexshard的方法。

逻辑还是很清晰的,判断分片必须存在,接着将任务委托给 org.elasticsearch.index.shard.storerecoveryservice.recover 方法,该方法有个细节需要了解下:

es会根据restoresource 决定是从snapshot或者从store里进行恢复。这里的indexshard.recovering并没有执行真正的recovering 操作,而是返回了一个recover的信息对象,里面包含了譬如节点之类的信息。

之后就将其作为一个任务提交出去了:

这里我们只走一条线,也就是进入 recoverfromstore 方法,该方法会执行索引文件的恢复动作,本质上是进入了index stage.

接着进行translogrecovery了

这个方法里面,最核心的是 internalperformtranslogrecovery方法,进入该方法后先进入 verify_index stage,进行索引的校验,校验如果没有问题,就会进入我们期待的translog 状态了。

进入translog 后,先进行一些设置:

这里的gc 指的是tranlog日志的删除问题,也就是不允许删除translog,接着会创建一个新的internalengine了,然后返回调用

不过你看这个代码会比较疑惑,其实我一开始看也觉得纳闷:

我们并没有看到做translog replay的地方,而从上层的调用方来看:

performtranslogrecovery 返回后,就立马进入扫尾(finalizerecovery)阶段。 里面唯一的动作是createnewengine,并且传递了skiptranslogrecovery 参数。 也就说,真正的translog replay动作是在createnewengine里完成,我们经过探索,发现是在internalengine 的初始化过程完成的,具体代码如下:

里面有个recoverfromtranslog,我们进去瞅瞅:

目前来看,所有的translog recovery 动作其实都是由 translogrecoveryperformer 来完成的。当然这个名字也比较好,翻译过来就是 translogrecovery 执行者。先对translog 做一个snapshot,然后根据这个snapshot开始进行恢复,进入 recoveryfromsnapshot 方法我们查看细节,然后会引导你进入下面的方法:

终于看到了实际的translog replay 逻辑了。这里调用了标准的internalengine.create 等方法进行日志的恢复。其实比较有意思的是,我们在日志回放的过程中,依然会继续写translog。这里就会导致一个问题,如果我在做日志回放的过程中,服务器由当掉了(或者es instance 重启了),那么就会导致translog 变多了。这个地方是否可以再优化下?

假设我们完成了translog 回放后,如果确实有重放,那么就行flush动作,删除translog,否则就commit index。具体逻辑由如下的代码来完成:

接着就进入了finalizerecovery,然后,就没然后了。

一般这种recovery其实就是发生relocation或者调整副本的时候发生的。所以集群是在正常状态,一定有健康的primary shard存在,所以我们也把这种recovery叫做peer recovery。  入口和前面的primary恢复是一样的,代码如下:

核心代码自然是 recoverytarget.startrecovery。这里的recoverytarget的类型是:

startrecovery方法的核心代码是:

也是启动一个县城异步执行的。recoveryrunner调用的是recoverytarget的 dorecovery方法,在该方法里,会发出一个rpc请求:

这个时候进入 index stage。 那谁接受处理的呢? 我们先看看现在的类名叫啥? recoverytarget。 我们想当然的想,是不是有recoverysource呢? 发现确实有,而且该类确实也有一个处理类:

es里这种通过netty进行交互的方式,大家可以看看我之前写文章elasticsearch rest/rpc 接口解析。 

这里我们进入recoversource对象的recover方法:

我们看到具体负责处理的类是recoverysourcehandler,之后调用该类的recovertotarget方法。我对下面的代码做了精简,方便大家看清楚。

首先创建一个translog的视图(创建视图的细节我现在也还没研究),接着的话对当前的索引进行snapshot。 然后进入phase1阶段,该阶段是把索引文件和请求的进行对比,然后得出有差异的部分,主动将数据推送给请求方。之后进入文件清理阶段,然后就进入translog 阶段:

protected void preparetargetfortranslog(final translog.view translogview) {

接着进入第二阶段:

对当前的translogview 进行一次snapshot,然后进行translog发送:

具体的发送逻辑如下:

这里发的请求,都是被 recoverytarget的translogoperationsrequesthandler 处理器来完成的,具体代码是:

这里调用indexshard.performbatchrecovery进行translog 的回放。

最后发送一个finalizerecovery给target 节点,完成recovering操作。

关于recovery translog 配置相关

在如下的类里有:

当服务器恢复时发现有存在的translog日志,就会进入translog 阶段进行replay。translog 的recovery 是走的标准的internalengine.create/update等方法,并且还会再写translog,同时还有一个影响性能的地方是很多数据可能已经存在,会走update操作,所以性能还是非常差的。这个目前能够想到的解决办法是调整flush日志的频率,保证存在的translog 尽量的少。 上面的话可以看出有三个控制选项:

本质上translog的恢复速度和条数的影响关系更大些,所以建议大家设置下 index.translog.flush_threshold_ops,比如多少条就一定要flush,否则积累的太多,出现故障,恢复就慢了。这些参数都可以动态设置,但建议放到配置文件。