開篇
在上一篇文章當中,我們講解了NodeImpl在init方法裡面會初始化話的動作,選舉也是在這個方法裡面進行的,這篇文章來從這個方法裡詳細講一下選舉的過程。
由于我這裡介紹的是如何實作的,是以請大家先看一下原理:SOFAJRaft 選舉機制剖析 | SOFAJRaft 實作原理
文章比較長,我也慢慢的寫了半個月時間~
選舉過程分析
我在這裡隻把有關選舉的代碼列舉出來,其他的代碼暫且忽略
NodeImpl#init
public boolean init(final NodeOptions opts) {
....
// Init timers
//設定投票計時器
this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
//處理投票逾時
handleVoteTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定範圍内傳回一個随機的時間戳
return randomTimeout(timeoutMs);
}
};
//設定預投票計時器
//當leader在規定的一段時間内沒有與 Follower 艦船進行通信時,
// Follower 就可以認為leader已經不能正常擔任旗艦的職責,則 Follower 可以去嘗試接替leader的角色。
// 這段通信逾時被稱為 Election Timeout
//候選者在發起投票之前,先發起預投票
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {
@Override
protected void onTrigger() {
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
//在一定範圍内傳回一個随機的時間戳
//為了避免同時發起選舉而導緻失敗
return randomTimeout(timeoutMs);
}
};
//leader下台的計時器
//定時檢查是否需要重新選舉leader
this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {
@Override
protected void onTrigger() {
handleStepDownTimeout();
}
};
....
if (!this.conf.isEmpty()) {
//新啟動的node需要重新選舉
stepDown(this.currTerm, false, new Status());
}
....
}
在這個init方法裡面會初始化三個計時器是和選舉有關的:
- voteTimer:這個timer負責定期的檢查,如果目前的state的狀态是候選者(STATE_CANDIDATE),那麼就會發起選舉
- electionTimer:在一定時間内如果leader沒有與 Follower 進行通信時,Follower 就可以認為leader已經不能正常擔任leader的職責,那麼就會進行選舉,在選舉之前會先發起預投票,如果沒有得到半數以上節點的回報,則候選者就會識趣的放棄參選。是以這個timer負責預投票
- stepDownTimer:定時檢查是否需要重新選舉leader,如果目前的leader沒有獲得超過半數的Follower響應,那麼這個leader就應該下台然後重新選舉。
RepeatedTimer的分析我已經寫好了:2. SOFAJRaft源碼分析—JRaft的定時任務排程器是怎麼做的?
我們先跟着init方法的思路往下看,一般來說this.conf裡面裝的是整個叢集的節點資訊,是不會為空的,是以會調用stepDown,是以先從這個方法看起。
leader下台
private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {
LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
wakeupCandidate);
//校驗一下目前節點的狀态是否有異常,或正在關閉
if (!this.state.isActive()) {
return;
}
//如果是候選者,那麼停止選舉
if (this.state == State.STATE_CANDIDATE) {
//調用voteTimer的stop方法
stopVoteTimer();
//如果目前狀态是leader或TRANSFERRING
} else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
//讓啟動的stepDownTimer停止運作
stopStepDownTimer();
//清空選票箱中的内容
this.ballotBox.clearPendingTasks();
// signal fsm leader stop immediately
if (this.state == State.STATE_LEADER) {
//發送leader下台的事件給其他Follower
onLeaderStop(status);
}
}
// reset leader_id
//重置目前節點的leader
resetLeaderId(PeerId.emptyPeer(), status);
// soft state in memory
this.state = State.STATE_FOLLOWER;
//重置Configuration的上下文
this.confCtx.reset();
updateLastLeaderTimestamp(Utils.monotonicMs());
if (this.snapshotExecutor != null) {
//停止目前的快照生成
this.snapshotExecutor.interruptDownloadingSnapshots(term);
}
//設定任期為大的那個
// meta state
if (term > this.currTerm) {
this.currTerm = term;
this.votedId = PeerId.emptyPeer();
//重設中繼資料資訊儲存到檔案中
this.metaStorage.setTermAndVotedFor(term, this.votedId);
}
if (wakeupCandidate) {
this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf);
if (this.wakingCandidate != null) {
Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs());
}
} else {
//把replicatorGroup裡面的所有replicator标記為stop
this.replicatorGroup.stopAll();
}
//leader轉移的時候會用到
if (this.stopTransferArg != null) {
if (this.transferTimer != null) {
this.transferTimer.cancel(true);
}
// There is at most one StopTransferTimer at the same term, it's safe to
// mark stopTransferArg to NULL
this.stopTransferArg = null;
}
//啟動
this.electionTimer.start();
}
一個leader的下台需要做很多交接的工作:
- 如果目前的節點是個候選人(STATE_CANDIDATE),那麼這個時候會讓它暫時不要投票
- 如果目前的節點狀态是(STATE_TRANSFERRING)表示正在轉交leader或是leader(STATE_LEADER),那麼就需要把目前節點的stepDownTimer這個定時器給關閉
- 如果目前是leader(STATE_LEADER),那麼就需要告訴狀态機leader下台了,可以在狀态機中對下台的動作做處理
- 重置目前節點的leader,把目前節點的state狀态設定為Follower,重置confCtx上下文
- 停止目前的快照生成,設定新的任期,讓所有的複制節點停止工作
- 啟動electionTimer
調用stopVoteTimer和stopStepDownTimer方法裡面主要是調用相應的RepeatedTimer的stop方法,在stop方法裡面會将stopped狀态設定為ture,并将timeout設定為取消,并将這個timeout加入到cancelledTimeouts集合中去:
如果看了2. SOFAJRaft源碼分析—JRaft的定時任務排程器是怎麼做的?這篇文章的話,那麼下面這段代碼應該一看就明白是怎麼回事了的。
public void stop() {
this.lock.lock();
try {
if (this.stopped) {
return;
}
this.stopped = true;
if (this.timeout != null) {
this.timeout.cancel();
this.running = false;
this.timeout = null;
}
} finally {
this.lock.unlock();
}
}
狀态機處理LEADER_STOP事件
在調用NodeImpl的onLeaderStop方法中,實際上是調用了FSMCallerImpl的onLeaderStop方法
NodeImpl#onLeaderStop
private void onLeaderStop(final Status status) {
this.replicatorGroup.clearFailureReplicators();
this.fsmCaller.onLeaderStop(status);
}
FSMCallerImpl#onLeaderStop
public boolean onLeaderStop(final Status status) {
return enqueueTask((task, sequence) -> {
//設定目前task的狀态為LEADER_STOP
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
}
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
if (this.shutdownLatch != null) {
// Shutting down
LOG.warn("FSMCaller is stopped, can not apply new task.");
return false;
}
//使用Disruptor釋出事件
this.taskQueue.publishEvent(tpl);
return true;
}
這個方法裡像taskQueue隊列裡面釋出了一個LEADER_STOP事件,taskQueue是在FSMCallerImpl的init方法中被初始化的:
public boolean init(final FSMCallerOptions opts) {
.....
this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
.setEventFactory(new ApplyTaskFactory()) //
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) //
.build();
this.disruptor.handleEventsWith(new ApplyTaskHandler());
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
this.taskQueue = this.disruptor.start();
.....
}
在taskQueue中釋出了一個任務之後會交給ApplyTaskHandler進行處理
ApplyTaskHandler
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;
@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
}
每當有任務到達taskQueue隊列的時候會調用ApplyTaskHandler的onEvent方法來處理事件,具體的執行邏輯由runApplyTask方法進行處理
FSMCallerImpl#runApplyTask
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
...
switch (task.type) {
...
case LEADER_STOP:
this.currTask = TaskType.LEADER_STOP;
doLeaderStop(task.status);
break;
...
}
....
}
在runApplyTask方法裡會對很多的事件進行處理,我們這裡隻看LEADER_STOP是怎麼做的:
在switch裡會調用doLeaderStop方法,這個方法會調用到FSMCallerImpl裡面封裝的StateMachine狀态機的onLeaderStart方法:
private void doLeaderStop(final Status status) {
this.fsm.onLeaderStop(status);
}
這樣就可以對leader停止時進行客制化的處理了。
重置leader
接下來會調用resetLeaderId(PeerId.emptyPeer(), status);方法來重置leader
private void resetLeaderId(final PeerId newLeaderId, final Status status) {
if (newLeaderId.isEmpty()) {
//這個判斷表示如果目前節點是候選者或者是Follower,并且已經有leader了
if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
//向狀态機裝釋出停止跟随該leader的事件
this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status));
}
//把目前的leader設定為一個空值
this.leaderId = PeerId.emptyPeer();
} else {
//如果目前節點沒有leader
if (this.leaderId == null || this.leaderId.isEmpty()) {
//那麼釋出要跟随該leader的事件
this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
}
this.leaderId = newLeaderId.copy();
}
}
這個方法由兩個作用,如果傳入的newLeaderId不是個空的,那麼就會設定一個新的leader,并向狀态機發送一個START_FOLLOWING事件;如果傳入的newLeaderId是空的,那麼就會發送一個STOP_FOLLOWING事件,并把目前的leader置空。
啟動electionTimer,進行leader選舉
electionTimer是RepeatedTimer的實作類,在這裡我就不多說了,上一篇文章已經介紹過了。
我這裡來看看electionTimer的onTrigger方法是怎麼處理選舉事件的,electionTimer的onTrigger方法會調用NodeImpl的handleElectionTimeout方法,是以直接看這個方法:
NodeImpl#handleElectionTimeout
private void handleElectionTimeout() {
boolean doUnlock = true;
this.writeLock.lock();
try {
if (this.state != State.STATE_FOLLOWER) {
return;
}
//如果目前選舉沒有逾時則說明此輪選舉有效
if (isCurrentLeaderValid()) {
return;
}
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
this.leaderId));
doUnlock = false;
//預投票 (pre-vote) 環節
//候選者在發起投票之前,先發起預投票,
// 如果沒有得到半數以上節點的回報,則候選者就會識趣的放棄參選
preVote();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
在這個方法裡,首先會加上一個寫鎖,然後進行校驗,最後先發起一個預投票。
校驗的時候會校驗目前的狀态是不是follower,校驗leader和follower上次的通信時間是不是超過了ElectionTimeoutMs,如果沒有超過,說明leader存活,沒必要發起選舉;如果通信逾時,那麼會将leader置空,然後調用預選舉。
NodeImpl#isCurrentLeaderValid
private boolean isCurrentLeaderValid() {
return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs();
}
用目前時間和上次leader通信時間相減,如果小于ElectionTimeoutMs(預設1s),那麼就沒有逾時,說明leader有效
預選票preVote
我們在handleElectionTimeout方法中最後調用了preVote方法,接下來重點看一下這個方法。
下面我将preVote拆分成幾部分來進行講解:
NodeImpl#preVote part1
private void preVote() {
long oldTerm;
try {
LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn(
"Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
getNodeId());
return;
}
//conf裡面記錄了叢集節點的資訊,如果目前的節點不包含在叢集裡說明是由問題的
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
return;
}
//設定一下目前的任期
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}
....
}
這部分代碼是一開始進到preVote這個方法首先要經過一些校驗,例如目前的節點不能再安裝快照的時候進行選舉;檢視一下目前的節點是不是在自己設定的conf裡面,conf這個屬性會包含了叢集的所有節點;最後設定一下目前的任期後解鎖。
NodeImpl#preVote part2
private void preVote() {
....
//傳回最新的log實體類
final LogId lastLogId = this.logManager.getLastLogId(true);
boolean doUnlock = true;
this.writeLock.lock();
try {
// pre_vote need defense ABA after unlock&writeLock
//因為在上面沒有重新加鎖的間隙裡可能會被别的線程改變了,是以這裡校驗一下
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
return;
}
//初始化預投票投票箱
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
for (final PeerId peer : this.conf.listPeers()) {
//如果周遊的節點是目前節點就跳過
if (peer.equals(this.serverId)) {
continue;
}
//失聯的節點也跳過
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
//設定一個回調的類
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被周遊到的這個節點發送一個預投票的請求
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,注意這裡發送過去的任期會加一
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
}
//自己也可以投給自己
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
electSelf();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
這部分代碼:
- 首先會擷取最新的log資訊,由LogId封裝,裡面包含兩部分,一部分是這個日志的index和寫入這個日志所對應當時節點的一個term任期
- 初始化預投票投票箱
- 周遊所有的叢集節點
- 如果周遊的節點是目前節點就跳過,如果周遊的節點因為當機或者手動下線等原因連接配接不上也跳過
- 向周遊的節點發送一個RequestVoteRequest請求預投票給自己
- 最後因為自己也是叢集節點的一員,是以自己也投票給自己
初始化預投票箱是調用了Ballot的init方法進行初始化,分别傳入新的叢集節點資訊,和老的叢集節點資訊
public boolean init(Configuration conf, Configuration oldConf) {
this.peers.clear();
this.oldPeers.clear();
quorum = oldQuorum = 0;
int index = 0;
//初始化新的節點
if (conf != null) {
for (PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
}
//設定需要多少票數才能成為leader
this.quorum = this.peers.size() / 2 + 1;
....
return true;
}
我這裡為了使邏輯更清晰,假設沒有oldConf,省略oldConf相關設定。
這個方法裡會周遊所有的peer節點,并将peer封裝成UnfoundPeerId插入到peers集合中;然後設定quorum屬性,這個屬性會在每獲得一個投票就減1,當減到0以下時說明獲得了足夠多的票數,就代表預投票成功。
發起預投票請求
//設定一個回調的類
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被周遊到的這個節點發送一個預投票的請求
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term,注意這裡發送過去的任期會加一
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
在構造RequestVoteRequest的時候,會将PreVote屬性設定為true,表示這次請求是預投票;設定目前節點為ServerId;傳給對方的任期是目前節點的任期加一。最後在發送成功收到響應之後會回調OnPreVoteRpcDone的run方法。
OnPreVoteRpcDone#run
public void run(final Status status) {
NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs);
if (!status.isOk()) {
LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status);
} else {
handlePreVoteResponse(this.peer, this.term, getResponse());
}
}
在這個方法中如果收到正常的響應,那麼會調用handlePreVoteResponse方法處理響應
OnPreVoteRpcDone#handlePreVoteResponse
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//隻有follower才可以嘗試發起選舉
if (this.state != State.STATE_FOLLOWER) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
getNodeId(), peerId, this.state);
return;
}
if (term != this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
peerId, term, this.currTerm);
return;
}
//如果傳回的任期大于目前的任期,那麼這次請求也是無效的
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
response.getTerm(), response.getGranted());
// check granted quorum?
if (response.getGranted()) {
this.prevVoteCtx.grant(peerId);
//得到了半數以上的響應
if (this.prevVoteCtx.isGranted()) {
doUnlock = false;
//進行選舉
electSelf();
}
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
這裡做了3重校驗,我們分别來談談:
-
第一重試校驗了目前的狀态,如果不是FOLLOWER那麼就不能發起選舉。因為如果是leader節點,那麼它不會選舉,隻能stepdown下台,把自己變成FOLLOWER後重新選舉;如果是CANDIDATE,那麼隻能進行由FOLLOWER發起的投票,是以從功能上來說,隻能FOLLOWER發起選舉。
從Raft 的設計上來說也隻能由FOLLOWER來發起選舉,是以這裡進行了校驗。
- 第二重校驗主要是校驗發送請求時的任期和接受到響應時的任期還是不是一個,如果不是那麼說明已經不是上次那輪的選舉了,是一次失效的選舉
- 第三重校驗是校驗響應傳回的任期是不是大于目前的任期,如果大于目前的任期,那麼重置目前的leader
校驗完之後響應的節點會傳回一個授權,如果授權通過的話則調用Ballot的grant方法,表示給目前的節點投一票
Ballot#grant
public void grant(PeerId peerId) {
this.grant(peerId, new PosHint());
}
public PosHint grant(PeerId peerId, PosHint hint) {
UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0);
if (peer != null) {
if (!peer.found) {
peer.found = true;
this.quorum--;
}
hint.pos0 = peer.index;
} else {
hint.pos0 = -1;
}
....
return hint;
}
grant方法會根據peerId去叢集集合裡面去找被封裝的UnfoundPeerId執行個體,然後判斷一下,如果沒有被記錄過,那麼就将quorum減一,表示收到一票,然後将found設定為ture表示已經找過了。
在查找UnfoundPeerId執行個體的時候方法裡面做了一個很有趣的設定:
首先在存入到peers集合裡面的時候是這樣的:
int index = 0;
for (PeerId peer : conf) {
this.peers.add(new UnfoundPeerId(peer, index++, false));
}
這裡會周遊conf,然後會存入index,index從零開始。
然後在查找的時候會傳入peerId和posHint還有peers集合:
private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int posHint) {
if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) {
for (UnfoundPeerId ufp : peers) {
if (ufp.peerId.equals(peerId)) {
return ufp;
}
}
return null;
}
return peers.get(posHint);
}
這裡傳入的posHint預設是-1 ,即如果是第一次傳入,那麼會周遊整個peers集合,然後一個個比對之後傳回。
因為PosHint執行個體會在調用完之後将pos0設定為peer的index,如果grant方法不是第一次調用,那麼在調用findPeer方法的時候就可以直接通過get方法擷取,不用再周遊整個集合了。
這種寫法也可以運用到平時的代碼中去。
調用了grant方法之後會調用Ballot的isGranted判斷一下是否達到了半數以上的響應。
Ballot#isGranted
public boolean isGranted() {
return this.quorum <= 0 && oldQuorum <= 0;
}
即判斷一下投票箱裡面的票數是不是被減到了0。如果傳回是的話,那麼就調用electSelf進行選舉。
選舉的方法暫時先不看,我們來看看預選舉的請求是怎麼被處理的
響應RequestVoteRequest請求
RequestVoteRequest請求的處理器是在RaftRpcServerFactory的addRaftRequestProcessors方法中被安置的,具體的處理時RequestVoteRequestProcessor。
具體的處理方法是交由processRequest0方法來處理的。
RequestVoteRequestProcessor#processRequest0
public Message processRequest0(RaftServerService service, RequestVoteRequest request, RpcRequestClosure done) {
//如果是預選舉
if (request.getPreVote()) {
return service.handlePreVoteRequest(request);
} else {
return service.handleRequestVoteRequest(request);
}
}
因為這個處理器可以處理選舉和預選舉,是以加了個判斷。預選舉的方法交給NodeImpl的handlePreVoteRequest來具體實作的。
NodeImpl#handlePreVoteRequest
public Message handlePreVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//校驗這個節點是不是正常的節點
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
//發送過來的request請求攜帶的ServerId格式不能錯
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}
boolean granted = false;
// noinspection ConstantConditions
do {
//已經有leader的情況
if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
LOG.info(
"Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
break;
}
//請求的任期小于目前的任期
if (request.getTerm() < this.currTerm) {
LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
// A follower replicator may not be started when this node become leader, so we must check it.
//那麼這個節點也可能是leader,是以校驗一下請求的節點是不是複制節點,重新加入到replicatorGroup中
checkReplicator(candidateId);
break;
} else if (request.getTerm() == this.currTerm + 1) {
// A follower replicator may not be started when this node become leader, so we must check it.
// check replicator state
//因為請求的任期和目前的任期相等,那麼這個節點也可能是leader,
// 是以校驗一下請求的節點是不是複制節點,重新加入到replicatorGroup中
checkReplicator(candidateId);
}
doUnlock = false;
this.writeLock.unlock();
//擷取最新的日志
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
//比較目前節點的日志完整度和請求節點的日志完整度
granted = requestLastLogId.compareTo(lastLogId) >= 0;
LOG.info(
"Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
lastLogId);
} while (false);//這個while蠻有意思,為了用break想盡了辦法
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(granted) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
這個方法裡面也是蠻有意思的,寫的很長,但是邏輯很清楚。
- 首先調用isActive,看一下目前節點是不是正常的節點,不是正常節點要傳回Error資訊
- 将請求傳過來的ServerId解析到candidateId執行個體中
- 校驗目前的節點如果有leader,并且leader有效的,那麼就直接break,傳回granted為false
- 如果目前的任期大于請求的任期,那麼調用checkReplicator檢查自己是不是leader,如果是leader,那麼将目前節點從failureReplicators移除,重新加入到replicatorMap中。然後直接break
- 請求任期和目前任期相等的情況也要校驗,隻是不用break
- 如果請求的日志比目前的最新的日志還要新,那麼傳回granted為true,代表授權成功
這裡有一個有意思的地方是,因為java中隻能在循環中goto,是以這裡使用了do-while(false)隻做單次的循環,這樣就可以do代碼塊裡使用break了。
下面稍微看一下checkReplicator:
NodeImpl#checkReplicator
private void checkReplicator(final PeerId candidateId) {
if (this.state == State.STATE_LEADER) {
this.replicatorGroup.checkReplicator(candidateId, false);
}
}
這裡判斷一下是不是leader,然後就會調用ReplicatorGroupImpl的checkReplicator
ReplicatorGroupImpl#checkReplicator
private final ConcurrentMap<PeerId, ThreadId> replicatorMap = new ConcurrentHashMap<>();
private final Set<PeerId> failureReplicators = new ConcurrentHashSet<>();
public void checkReplicator(final PeerId peer, final boolean lockNode) {
//根據傳入的peer擷取相應的ThreadId
final ThreadId rid = this.replicatorMap.get(peer);
// noinspection StatementWithEmptyBody
if (rid == null) {
// Create replicator if it's not found for leader.
final NodeImpl node = this.commonOptions.getNode();
if (lockNode) {
node.writeLock.lock();
}
try {
//如果目前的節點是leader,并且傳入的peer在failureReplicators中,那麼重新添加到replicatorMap
if (node.isLeader() && this.failureReplicators.contains(peer) && addReplicator(peer)) {
this.failureReplicators.remove(peer);
}
} finally {
if (lockNode) {
node.writeLock.unlock();
}
}
} else { // NOPMD
// Unblock it right now.
// Replicator.unBlockAndSendNow(rid);
}
}
checkReplicator會從replicatorMap根據傳入的peer執行個體校驗一下是不是為空。因為replicatorMap裡面存放了叢集所有的節點。然後通過ReplicatorGroupImpl的commonOptions擷取目前的Node執行個體,如果目前的node執行個體是leader,并且如果存在失敗集合failureReplicators中的話就重新添加進replicatorMap中。
ReplicatorGroupImpl#addReplicator
public boolean addReplicator(final PeerId peer) {
//校驗目前的任期
Requires.requireTrue(this.commonOptions.getTerm() != 0);
//如果replicatorMap裡面已經有這個節點了,那麼将它從failureReplicators集合中移除
if (this.replicatorMap.containsKey(peer)) {
this.failureReplicators.remove(peer);
return true;
}
//指派一個新的ReplicatorOptions
final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();
//新的ReplicatorOptions添加這個PeerId
opts.setPeerId(peer);
final ThreadId rid = Replicator.start(opts, this.raftOptions);
if (rid == null) {
LOG.error("Fail to start replicator to peer={}.", peer);
this.failureReplicators.add(peer);
return false;
}
return this.replicatorMap.put(peer, rid) == null;
}
addReplicator裡面主要是做了兩件事:1. 将要加入的節點從failureReplicators集合裡移除;2. 将要加入的節點放入到replicatorMap集合中去。
投票electSelf
private void electSelf() {
long oldTerm;
try {
LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
//1. 如果目前節點不在叢集裡面則不進行選舉
if (!this.conf.contains(this.serverId)) {
LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
return;
}
//2. 大概是因為要進行正式選舉了,把預選舉關掉
if (this.state == State.STATE_FOLLOWER) {
LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
this.electionTimer.stop();
}
//3. 清空leader
resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
"A follower's leader_id is reset to NULL as it begins to request_vote."));
this.state = State.STATE_CANDIDATE;
this.currTerm++;
this.votedId = this.serverId.copy();
LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
//4. 開始發起投票定時器,因為可能投票失敗需要循環發起投票
this.voteTimer.start();
//5. 初始化投票箱
this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
oldTerm = this.currTerm;
} finally {
this.writeLock.unlock();
}
final LogId lastLogId = this.logManager.getLastLogId(true);
this.writeLock.lock();
try {
// vote need defense ABA after unlock&writeLock
if (oldTerm != this.currTerm) {
LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
return;
}
//6. 周遊所有節點
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(false) // It's not a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
}
this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
this.voteCtx.grant(this.serverId);
if (this.voteCtx.isGranted()) {
//7. 投票成功,那麼就晉升為leader
becomeLeader();
}
} finally {
this.writeLock.unlock();
}
}
不要看這個方法這麼長,其實都是和前面預選舉的方法preVote重複度很高的。方法太長,是以标了号,從上面号碼來一步步講解:
- 對目前的節點進行校驗,如果目前節點不在叢集裡面則不進行選舉
- 因為是Follower發起的選舉,是以大概是因為要進行正式選舉了,把預選舉定時器關掉
- 清空leader再進行選舉,注意這裡會把votedId設定為目前節點,代表自己參選
- 開始發起投票定時器,因為可能投票失敗需要循環發起投票,voteTimer裡面會根據目前的CANDIDATE狀态調用electSelf進行選舉
- 調用init方法初始化投票箱,這裡和prevVoteCtx是一樣的
- 周遊所有節點,然後向其他叢集節點發送RequestVoteRequest請求,這裡也是和preVote一樣的,請求是被RequestVoteRequestProcessor處理器處理的。
- 如果有超過半數以上的節點投票選中,那麼就調用becomeLeader晉升為leader
我先來看看RequestVoteRequestProcessor怎麼處理的選舉:
在RequestVoteRequestProcessor的processRequest0會調用NodeImpl的handleRequestVoteRequest來處理具體的邏輯。
處理投票請求
NodeImpl#handleRequestVoteRequest
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
//是否存活
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
}
// noinspection ConstantConditions
do {
// check term
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
//1. 如果請求的任期大于目前任期
// increase current term, change state to follower
if (request.getTerm() > this.currTerm) {
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term RequestVoteRequest."));
}
} else {
// ignore older term
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock();
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}
//2. 判斷日志完整度
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
.compareTo(lastLogId) >= 0;
//3. 判斷目前的節點是不是已經投過票了
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
"Raft node votes for some candidate, step down to restart election_timer."));
this.votedId = candidateId.copy();
this.metaStorage.setVotedFor(candidateId);
}
} while (false);
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
//4.同意投票的條件是目前的任期和請求的任期一樣,并且已經将votedId設定為請求節點
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
這個方法大緻也和handlePreVoteRequest差不多。我這裡隻分析一下我标注的。
- 這裡是判斷目前的任期是小于請求的任期的,并且調用stepDown将請求任期設定為目前的任期,将目前的狀态設定被Follower
- 作為一個leader來做日志肯定是要比被請求的節點完整,是以這裡判斷一下日志是不是比被請求的節點日志完整
- 如果日志是完整的,并且被請求的節點沒有投票給其他的候選人,那麼就将votedId設定為目前請求的節點
- 給請求發送響應,同意投票的條件是目前的任期和請求的任期一樣,并且已經将votedId設定為請求節點
晉升leader
投票完畢之後如果收到的票數大于一半,那麼就會晉升為leader,調用becomeLeader方法。
private void becomeLeader() {
Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
this.conf.getConf(), this.conf.getOldConf());
// cancel candidate vote timer
//晉升leader之後就會把選舉的定時器關閉了
stopVoteTimer();
//設定目前的狀态為leader
this.state = State.STATE_LEADER;
this.leaderId = this.serverId.copy();
//複制叢集中設定新的任期
this.replicatorGroup.resetTerm(this.currTerm);
//周遊所有的叢集節點
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
LOG.debug("Node {} add replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
//如果成為leader,那麼需要把自己的日志資訊複制到其他節點
if (!this.replicatorGroup.addReplicator(peer)) {
LOG.error("Fail to add replicator, peer={}.", peer);
}
}
// init commit manager
this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
// Register _conf_ctx to reject configuration changing before the first log
// is committed.
if (this.confCtx.isBusy()) {
throw new IllegalStateException();
}
this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
//如果是leader了,那麼就要定時的檢查不是有資格勝任
this.stepDownTimer.start();
}
這個方法裡面首先會停止選舉定時器,然後設定目前的狀态為leader,并設值任期,然後周遊所有的節點将節點加入到複制叢集中,最後将stepDownTimer打開,定時對leader進行校驗是不是又半數以上的節點響應目前的leader。
好了,到這裡就講完了,希望下次還可以see you again~