天天看點

zookeeper3.3.3源碼分析(二)FastLeader選舉算法

 如何在zookeeper叢集中選舉出一個leader,zookeeper使用了三種算法,具體使用哪種算法,在配置檔案中是可以配置的,對應的配置項是”electionAlg”,其中1對應的是LeaderElection算法,2對應的是AuthFastLeaderElection算法,3對應的是FastLeaderElection算法.預設使用FastLeaderElection算法.其他兩種算法我沒有研究過,就不多說了.

要了解這個算法,最好需要一些paxos算法的理論基礎.

1) 資料恢複階段

首先,每個在zookeeper伺服器先讀取目前儲存在磁盤的資料,zookeeper中的每份資料,都有一個對應的id值,這個值是依次遞增的,換言之,越新的資料,對應的ID值就越大.

2) 首次發送自己的投票值

在讀取資料完畢之後,每個zookeeper伺服器發送自己選舉的leader,這個協定中包含了以下幾部分的資料:

1)所選舉leader的id(就是配置檔案中寫好的每個伺服器的id) ,在初始階段,每台伺服器的這個值都是自己伺服器的id,也就是它們都選舉自己為leader.

2) 伺服器最大資料的id,這個值大的伺服器,說明存放了更新的資料.

3)邏輯時鐘的值,這個值從0開始遞增,每次選舉對應一個值,也就是說:如果在同一次選舉中,那麼這個值應該是一緻的 2)邏輯時鐘值越大,說明這一次選舉leader的程序更新.

4) 本機在目前選舉過程中的狀态,有以下幾種:LOOKING,FOLLOWING,OBSERVING,LEADING,顧名思義不必解釋了吧.

    每台伺服器将自己伺服器的以上資料發送到叢集中的其他伺服器之後,同樣的也需要接收來自其他伺服器的資料,它将做以下的處理:

1) 如果所接收資料伺服器的狀态還是在選舉階段(LOOKING 狀态),那麼首先判斷邏輯時鐘值,又分為以下三種情況:

a) 如果發送過來的邏輯時鐘大于目前的邏輯時鐘,那麼說明這是更新的一次選舉,此時需要更新一下本機的邏輯時鐘值,同時将之前收集到的來自其他伺服器的選舉清空,因為這些資料已經不再有效了.然後判斷是否需要更新目前自己的選舉情況.在這裡是根據選舉leader id,儲存的最大資料id來進行判斷的,這兩種資料之間對這個選舉結果的影響的權重關系是:首先看資料id,資料id大者勝出;其次再判斷leader id,leader id大者勝出.然後再将自身最新的選舉結果(也就是上面提到的三種資料廣播給其他伺服器).代碼如下:

if (n.epoch > logicalclock) {
logicalclock = n.epoch;
recvset.clear();
if(totalOrderPredicate(n.leader, n.zxid,getInitId(), getInitLastLoggedZxid()))
   updateProposal(n.leader, n.zxid);
else
updateProposal(getInitId(),getInitLastLoggedZxid());

sendNotifications();
           

    其中的totalOrderPredicate函數就是根據發送過來的封包中的leader id,資料id來與本機儲存的相應資料進行判斷的函數,傳回true說明需要更新資料,于是調用updateProposal函數更新資料

b) 發送過來資料的邏輯時鐘小于本機的邏輯時鐘

說明對方在一個相對較早的選舉程序中,這裡隻需要将本機的資料發送過去就是了

c) 兩邊的邏輯時鐘相同,此時也隻是調用totalOrderPredicate函數判斷是否需要更新本機的資料,如果更新了再将自己最新的選舉結果廣播出去就是了.

實際上,在處理選票之前,還有一個預處理的動作,它發生在剛剛接收到關于vote的message的時候,具體過程如下:

1.判斷message的來源是不是observer,如果是,則告訴該observer我目前認為的Leader的資訊,否則進入2
2.判斷message是不是vote資訊,是則進入3
3.根據message建立一張vote
4.如果目前server處理LOOKING狀态,将vote放入自己的投票箱,而且如果vote源server處于LOOKING狀态同時vote源server的選舉時舊的,則目前server通知它新的一輪投票;
5如果目前server不處于LOOKING狀态而vote源server處理LOOKING狀态,則目前server告訴它目前的Leader資訊。
           

三種情況的處理完畢之後,再處理兩種情況:

1)伺服器判斷是不是已經收集到了所有伺服器的選舉狀态,如果是那麼根據選舉結果設定自己的角色(FOLLOWING還是LEADER),然後退出選舉過程就是了.

2)即使沒有收集到所有伺服器的選舉狀态,也可以判斷一下根據以上過程之後最新的選舉leader是不是得到了超過半數以上伺服器的支援,如果是,那麼嘗試在200ms内接收一下資料,如果沒有新的資料到來,說明大家都已經預設了這個結果,同樣也設定角色退出選舉過程.

代碼如下:

/*
* Only proceed if the vote comes from a replica in the
* voting view.
*/
if(self.getVotingView().containsKey(n.sid)){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));

//If have received from all nodes, then terminate
if((self.getVotingView().size() == recvset.size()) && (self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
leaveInstance();
return new Vote(proposedLeader, proposedZxid);

} else if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock))) {

// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid,proposedLeader, proposedZxid)){
   recvqueue.put(n);
   break;
}
}

/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
if(LOG.isDebugEnabled()){
LOG.debug("About to leave FLE instance: Leader= " + proposedLeader + ", Zxid = " + proposedZxid + ", My id = " + self.getId() + ", My state = " + self.getPeerState());
}

leaveInstance();
return new Vote(proposedLeader,proposedZxid);
}
}
}
           

2) 如果所接收伺服器不在選舉狀态,也就是在FOLLOWING或者LEADING狀态

做以下兩個判斷:

a) 如果邏輯時鐘相同,将該資料儲存到recvset,如果所接收伺服器宣稱自己是leader,那麼将判斷是不是有半數以上的伺服器選舉它,如果是則設定選舉狀态退出選舉過程

b) 否則這是一條與目前邏輯時鐘不符合的消息,那麼說明在另一個選舉過程中已經有了選舉結果,于是将該選舉結果加入到outofelection集合中,再根據outofelection來判斷是否可以結束選舉,如果可以也是儲存邏輯時鐘,設定選舉狀态,退出選舉過程.

代碼如下:

if(n.epoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
if((n.state == ServerState.LEADING) || (termPredicate(recvset, new Vote(n.leader,n.zxid, n.epoch, n.state))&& checkLeader(outofelection, n.leader, n.epoch)) ){
self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());
leaveInstance();
return new Vote(n.leader, n.zxid);
}
}

outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));

if(termPredicate(outofelection, new Vote(n.leader,n.zxid, n.epoch, n.state))&& checkLeader(outofelection, n.leader, n.epoch)) {
   synchronized(this){
      logicalclock = n.epoch;
      self.setPeerState((n.leader == self.getId()) ? ServerState.LEADING: learningState());
   }
   leaveInstance();
   return new Vote(n.leader, n.zxid);
}

break;
}
}
           

    以一個簡單的例子來說明整個選舉的過程.

假設有五台伺服器組成的zookeeper叢集,它們的id從1-5,同時它們都是最新啟動的,也就是沒有曆史資料,在存放資料量這一點上,都是一樣的.假設這些伺服器依序啟動,來看看會發生什麼.

1) 伺服器1啟動,此時隻有它一台伺服器啟動了,它發出去的報沒有任何響應,是以它的選舉狀态一直是LOOKING狀态
2) 伺服器2啟動,它與最開始啟動的伺服器1進行通信,互相交換自己的選舉結果,由于兩者都沒有曆史資料,是以id值較大的伺服器2勝出,但是由于沒有達到超過半數以上的伺服器都同意選舉它(這個例子中的半數以上是3),是以伺服器1,2還是繼續保持LOOKING狀态.
3) 伺服器3啟動,根據前面的理論分析,伺服器3成為伺服器1,2,3中的老大,而與上面不同的是,此時有三台伺服器選舉了它,是以它成為了這次選舉的leader.
4) 伺服器4啟動,根據前面的分析,理論上伺服器4應該是伺服器1,2,3,4中最大的,但是由于前面已經有半數以上的伺服器選舉了伺服器3,是以它隻能接收當小弟的命了.
5) 伺服器5啟動,同4一樣,當小弟.