作者:橫刀天笑
在上一篇文章中我們大緻浏覽了zookeeper的啟動過程,并且提到在Zookeeper的啟動過程中leader選舉是非常重要而且最複雜的一個環節。那麼什麼是leader選舉呢?zookeeper為什麼需要leader選舉呢?zookeeper的leader選舉的過程又是什麼樣子的?本文的目的就是解決這三個問題。
首先我們來看看什麼是leader選舉。其實這個很好了解,leader選舉就像總統選舉一樣,每人一票,獲得多數票的人就當選為總統了。在zookeeper叢集中也是一樣,每個節點都會投票,如果某個節點獲得超過半數以上的節點的投票,則該節點就是leader節點了。
國家選舉總統是為了選一個最高統帥,治理國家。那麼zookeeper叢集選舉的目的又是什麼呢?其實這個要清楚明白的解釋還是挺複雜的。我們可以簡單點想這個問題:我們有一個zookeeper叢集,有好幾個節點。每個節點都可以接收請求,處理請求。那麼,如果這個時候分别有兩個用戶端向兩個節點發起請求,請求的内容是修改同一個資料。比如用戶端c1,請求節點n1,請求是set a = 1; 而用戶端c2,請求節點n2,請求内容是set a = 2;
那麼最後a是等于1還是等于2呢? 這在一個分布式環境裡是很難确定的。解決這個問題有很多辦法,而zookeeper的辦法是,我們選一個總統出來,所有的這類決策都送出給總統一個人決策,那之前的問題不就沒有了麼。
那我們現在的問題就是怎麼來選擇這個總統呢? 在現實中,選擇總統是需要宣講拉選票的,那麼在zookeeper的世界裡這又如何處理呢?我們還是show code吧。
在QuorumPeer的startLeaderElection方法裡包含leader選舉的邏輯。Zookeeper預設提供了4種選舉方式,預設是第4種: FastLeaderElection。
我們先假設我們這是一個嶄新的叢集,嶄新的叢集的選舉和之前運作過一段時間的選舉是有稍許不同的,後面會提及。
節點狀态: 每個叢集中的節點都有一個狀态 LOOKING, FOLLOWING, LEADING, OBSERVING。都屬于這4種,每個節點啟動的時候都是LOOKING狀态,如果這個節點參與選舉但最後不是leader,則狀态是FOLLOWING,如果不參與選舉則是OBSERVING,leader的狀态是LEADING。
開始這個選舉算法前,每個節點都會在zoo.cfg上指定的監聽端口啟動監聽(server.1=127.0.0.1:20881:20882),這裡的20882就是這裡用于選舉的端口。
在FastLeaderElection裡有一個Manager的内部類,這個類裡有啟動了兩個線程:WorkerReceiver, WorkerSender。為什麼說選舉這部分複雜呢,我覺得就是這些線程就像左右互搏一樣,非常難以了解。顧名思義,這兩個線程一個是處理從别的節點接收消息的,一個是向外發送消息的。對于外面的邏輯接收和發送的邏輯都是異步的。
這裡配置好了,QuorumPeer的run方法就開始執行了,這裡實作的是一個簡單的狀态機。因為現在是LOOKING狀态,是以進入LOOKING的分支,調用選舉算法開始選舉了:
setCurrentVote(makeLEStrategy().lookForLeader());
而在lookForLeader裡主要是幹什麼呢?首先我們會更新一下一個叫邏輯時鐘的東西,這也是在分布式算法裡很重要的一個概念,但是在這裡先不介紹,可以參考後面的論文。然後決定我要投票給誰。不過zookeeper這裡的選舉真直白,每個節點都選自己(汗),選我,選我,選我...... 然後向其他節點廣播這個選舉資訊。這裡實際上并沒有真正的發送出去,隻是将選舉資訊放到由WorkerSender管理的一個隊列裡。
synchronized(this){
//邏輯時鐘
logicalclock++;
//getInitLastLoggedZxid(), getPeerEpoch()這裡先不關心是什麼,後面會讨論
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//getInitId() 即是擷取選誰,id就是myid裡指定的那個數字,是以說一定要唯一
private long getInitId(){
if(self.getQuorumVerifier().getVotingMembers().containsKey(self.getId()))
return self.getId();
else return Long.MIN_VALUE;
}
//發送選舉資訊,異步發送
sendNotifications();
現在我們去看看怎麼把投票資訊投遞出去。這個邏輯在WorkerSender裡,WorkerSender從sendqueue裡取出投票,然後交給QuorumCnxManager發送。因為前面發送投票資訊的時候是向叢集所有節點發送,是以當然也包括自己這個節點,是以QuorumCnxManager的發送邏輯裡會判斷,如果這個要發送的投票資訊是發送給自己的,則不發送了,直接進入接收隊列。
public void toSend(Long sid, ByteBuffer b) {
if (self.getId() == sid) {
b.position(0);
addToRecvQueue(new Message(b.duplicate(), sid));
} else {
//發送給别的節點,判斷之前是不是發送過
if (!queueSendMap.containsKey(sid)) {
//這個SEND_CAPACITY的大小是1,是以如果之前已經有一個還在等待發送,則會把之前的一個删除掉,發送新的
ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);
queueSendMap.put(sid, bq);
addToSendQueue(bq, b);
} else {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if(bq != null){
addToSendQueue(bq, b);
} else {
LOG.error("No queue for server " + sid);
}
}
//這裡是真正的發送邏輯了
connectOne(sid);
}
}
connectOne就是真正發送了。在發送之前會先把自己的id和選舉位址發送過去。然後判斷要發送節點的id是不是比自己的id大,如果大則不發送了。如果要發送又是啟動兩個線程:SendWorker,RecvWorker(這種一個程序内許多不同種類的線程,各自幹活的狀态真的很難了解)。
發送邏輯還算簡單,就是從剛才放到那個queueSendMap裡取出,然後發送。并且發送的時候将發送出去的東西放到一個lastMessageSent的map裡,如果queueSendMap裡是空的,就發送lastMessageSent裡的東西,確定對方一定收到了。
看完了SendWorker的邏輯,再來看看資料接收的邏輯吧。還記得前面提到的有個Listener在選舉端口上啟動了監聽麼,現在這裡應該接收到資料了。我們可以看到receiveConnection方法。在這裡,如果接收到的的資訊裡的id比自身的id小,則斷開連接配接,并嘗試發送消息給這個id對應的節點(當然,如果已經有SendWorker在往這個節點發送資料,則不用了)。
如果接收到的消息的id比目前的大,則會有RecvWorker接收資料,RecvWorker會将接收到的資料放到recvQueue裡。
而FastLeaderElection的WorkerReceiver線程裡會不斷地從這個recvQueue裡讀取Message處理。在WorkerReceiver會處理一些協定上的事情,比如消息格式等。除此之外還會看看接收到的消息是不是來自投票成員。如果是投票成員,則會看看這個消息裡的狀态,如果是LOOKING狀态并且目前的邏輯時鐘比投票消息裡的邏輯時鐘要高,則會發個通知過去,告訴誰是leader。
在這裡,剛剛啟動的嶄新叢集,是以邏輯時鐘基本上都是相同的,是以這裡還沒判斷出誰是leader。不過在這裡我們注意到如果目前節點的狀态是LOOKING的話,接收邏輯會将接收到的消息放到FastLeaderElection的recvqueue裡。而在FastLeaderElection會從這個recvqueue裡讀取東西。
這裡就是選舉的主要邏輯了:totalOrderPredicate
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
- 判斷消息裡的epoch是不是比目前的大,如果大則消息裡id對應的server我就承認它是leader
- 如果epoch相等則判斷zxid,如果消息裡的zxid比我的大我就承認它是leader
- 如果前面兩個都相等那就比較一下server id吧,如果比我的大我就承認它是leader。
關于前面兩個東西暫時我們不去關心它,對于新啟動的叢集這兩者都是相等的。
那這樣看來server id的大小也是leader選舉的一環啊(有的人生下來注定就不平凡,這都是命啊)。
最後我們來看看,很多文章所介紹的,如果超過一半的人說它是leader,那它就是leader的邏輯吧
private boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
//周遊已經收到的投票集合,将等于目前投票的集合取出放到set中
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())
&& vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
//統計set,也就是投某個id的票數是否超過一半
return self.getQuorumVerifier().containsQuorum(set);
}
public boolean containsQuorum(Set<Long> ackSet) {
return (ackSet.size() > half);
}
最後一關:如果選的是自己,則将自己的狀态更新為LEADING,否則根據type,要麼是FOLLOWING,要麼是OBSERVING。
到這裡選舉就結束了。
這裡介紹的是一個新叢集啟動時候的選舉過程,啟動的時候就是根據zoo.cfg裡的配置,向各個節點廣播投票,一般都是選投自己。然後收到投票後就會進行進行判斷。如果某個節點收到的投票數超過一半,那麼它就是leader了。
了解了這個過程,我們來看看另外一個問題:
一個叢集有3台機器,挂了一台後的影響是什麼?挂了兩台呢?
挂了一台:挂了一台後就是收不到其中一台的投票,但是有兩台可以參與投票,按照上面的邏輯,它們開始都投給自己,後來按照選舉的原則,兩個人都投票給其中一個,那麼就有一個節點獲得的票等于2,2 > (3/2)=1 的,超過了半數,這個時候是能選出leader的。
挂了兩台: 挂了兩台後,怎麼弄也隻能獲得一張票, 1 不大于 (3/2)=1的,這樣就無法選出一個leader了。
在前面介紹時,為了簡單我假設的是這是一個嶄新的剛啟動的叢集,這樣的叢集與工作一段時間後的叢集有什麼不同呢?不同的就是epoch和zxid這兩個參數。在新啟動的叢集裡這兩個一般是相等的,而工作一段時間後這兩個參數有可能有的節點落後其他節點,至于是為什麼,這個還要在後面的存儲和處理額胡斷請求的文章裡介紹。