[TOC]
之前聊過了很多Kafka啟動過程中的一些加載内容,也知道了broker可以分為很多的partition,每個partition内部也可以分為leader和follower,主從之間有資料的複制。那麼這麼多partition是誰在管理?broker内部有沒有主從之分?這就是本文的主角,KafkaController,本文将細細道來。
KafkaController的啟動入口同樣很簡潔,在KafkaServer的start方法中。
首先執行個體化一個KafkaController,之後啟動了這個controller。
執行個體化的源碼,見注釋:
直接上代碼:
這個start方法并不意味着目前的broker就是controller,隻是把它注冊到zk上面,後面zk會進行選舉,選舉出controller後,在controller機器上面會執行一系列的操作,後面我們能看到。
首先,我們的broker會注冊一個session過期的監聽器,我們看一下這個監聽器。
可以看到,當broker到zk的session失效之後,broker并不會主動發起重連操作,而是等待zk的重連,當新的session被建立後,也就是目前broker加入到broker清單中之後,會進行兩個操作:
onControllerResignation:也就是目前controller失效
controllerElector.elect:重新進行controller選舉
下面我們分别看看做了啥。
從代碼看會比較直覺,主要就是清理一些controller的資料。
這塊是進行controller的重新選舉。
這塊主要進行的是controller的選舉,我們着重看下目前broker被選為controller之後的動作,也就是onBecomingLeader。這塊就需要我們傳回到執行個體化中去看下,這個動作是:onControllerFailover。
這裡面執行的動作很多,我們一一分析。
首先從zk中讀取controller的epoch
然後将epoch+1後更新到zk中
注冊一系列監聽器
初始化controller上下文
啟動兩個狀态機
訂閱所有topic的分區變化監聽器
定時檢查觸發分區選舉
啟動topic删除管理器
這裡面的東西比較多,我們後面文章再分析。
這裡的electionPath是/controller,下面我們看下這個leaderChangeListener。
監聽對應的zk節點,如果節點發生了變化,調用handleDataChange方法,主要内容是擷取目前的leaderId。如果目前broker之前是leader,而新的leader不是自己,那麼就會調用onResigningAsLeader方法,清除之前的leader資訊。
如果節點被删除了,就會調用handleDataDeleted方法。如果目前broker是leader,會首先調用onResigningAsLeader方法,然後發起新的leader選舉。
這邊就是我們的controller即leader選舉方法。與3.1.2的内容一緻。