leadership transfer可以把raft group中的leader身份轉給其中一個follower。這個功能可以用來做負載均衡,比如可以把leader放在性能更好的機器或者離用戶端更近的機器上。
對于一個大規模分布式系統來說,負載均衡非常重要。然而raft本身在選主方面必須要求新主包含所有的意境committed的log,從這點上看,在選主階段,不能加入自定義的選主邏輯。而paxos協定不太一樣,paxos對選主沒有要求,任何一個成員都可以成為主,選主協定可以自己實作。paxos leader當選後,從其他成員把commit的log拉過來即可。是以為了這個feature,raft作者提出了一個方案作為raft的擴充。
大概原理就是保證transferee(transfer的目标follower)擁有和原leader有一樣新的日志,期間需要停寫,然後給transferee發送一個特殊的消息,讓這個follower可以馬上進行選主,而不用等到election timeout,正常情況下,這個follower的term最大,當選,原來的leader變為備。
還是一樣看看etcd實作的raft library怎麼做,省略無關代碼
首先應用通過如下函數來啟動leader transfer,其中lead是目前的leader,transferee是目标leader,在任意一個成員上調用即可。
func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
select {
// manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
case <-n.done:
case <-ctx.Done():
}
}
跑raft的goroutine從recvc中拿出message,首先做各種各樣的檢查,比如是否已經有transfer leader正在進行中,如果正在進行,目标是誰,然後做相應的處理。如果沒有,則調用一下代碼:
r.leadTransferee = leadTransferee
if pr.Match == r.raftLog.lastIndex() {
r.sendTimeoutNow(leadTransferee)
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
} else {
r.sendAppend(leadTransferee)
}
首先将目标leader儲存在leadTransferee中,标示着有transfer正在進行,後續如果有請求propose進來,會檢查:
if r.leadTransferee != None {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return
}
這裡相當于停寫。
回到上面:
- 如果transferee和leader的log一樣新,則給transferee發送MsgTimeoutNow類型的消息,告訴transferee可以立即選主,不需要等到election timeout。transferee端:
r.campaign(campaignTransfer)
raft為了防止出現網絡分區的情況下,candidate頻繁增加term進而導緻term爆炸,在選主的時候新增加了一個PreVote階段,通過了這個階段才會真正開始Vote,這裡,由于transferee明确知道是transfer,就沒有必要采用這種兩階段的選主,是以傳入的參數是campaignTransfer
- 如果leader發現transferee的日志落後,則給transferee append日志,leader在收到響應MsgAppResp後,會檢查:
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
r.sendTimeoutNow(m.From)
}
如果發現transferee已經日志最新,則同樣,給transferee發送MsgTimeoutNow
最後,看看etcd如何調用:
func (s *EtcdServer) transferLeadership(ctx context.Context, lead, transferee uint64) error {
now := time.Now()
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))
s.r.TransferLeadership(ctx, lead, transferee)
for s.Lead() != transferee {
select {
case <-ctx.Done(): // time out
return ErrTimeoutLeaderTransfer
case <-time.After(interval):
}
}
// TODO: drain all requests, or drop all messages to the old leader
plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))
return nil
}
調用TransferLeadership後,每隔一段時間檢查是否transfer成功,要麼逾時,直接傳回。