天天看點

【Kafka源碼】Kafka啟動過程

一般來說,我們是通過指令來啟動kafka,但是指令的本質還是調用代碼中的main方法,是以,我們重點看下啟動類Kafka。源碼下下來之後,我們也可以通過直接運作Kafka.scala中的main方法(需要指定啟動參數,也就是server.properties的位置)來啟動Kafka。因為kafka依賴zookeeper,是以我們需要提前啟動zookeeper,然後在server.properties中指定zk位址後,啟動。

下面我們首先看一下main()方法:

def main(args: Array[String]): Unit = {

}

我們慢慢來分析下,首先是getPropsFromArgs(args),這一行很明确,就是從配置檔案中讀取我們配置的内容,然後指派給serverProps。第二步,KafkaServerStartable.fromProps(serverProps),

object KafkaServerStartable {

def fromProps(serverProps: Properties) = {

這塊主要是啟動了一個内部的監控服務(内部狀态監控)。

下面是一個在java中常見的鈎子函數,在關閉時會啟動一些銷毀程式,保證程式安全關閉。之後就是我們啟動的重頭戲了:kafkaServerStartable.startup。跟進去可以很清楚的看到,裡面調用的方法是KafkaServer中的startup方法,下面我們重點看下這個方法(比較長):

def startup() {

首先判斷是否目前正在關閉中或者已經啟動了,這兩種情況直接抛出異常。然後是一個CAS的操作isStartingUp,防止線程并發操作啟動,判斷是否可以啟動。如果可以啟動,就開始我們的啟動過程。

構造Metrics類

定義broker狀态為啟動中starting

啟動定時器kafkaScheduler.startup()

構造zkUtils:利用參數中的zk資訊,啟動一個zk用戶端

啟動檔案管理器:讀取zk中的配置資訊,包含__consumer_offsets和__system.topic__。重點是啟動一些定時任務,來删除符合條件的記錄(cleanupLogs),清理髒記錄(flushDirtyLogs),把所有記錄寫到一個文本檔案中,防止在啟動時重新開機所有的記錄檔案(checkpointRecoveryPointOffsets)。

/**

Start the background threads to flush logs and do log cleanup

*/

下一步,擷取brokerId

啟動一個NIO socket服務

啟動複制管理器:啟動ISR逾時處理線程

啟動kafka控制器:注冊session過期監聽器,同時啟動控制器leader選舉

啟動協調器

權限認證

開啟線程,開始處理請求

開啟配置監聽,主要是監聽zk節點資料變化,然後廣播到所有機器

開啟健康檢查:目前隻是把broker節點注冊到zk上,注冊成功就是活的,否則就是dead

注冊啟動資料資訊

啟動成功

等待關閉countDownLatch,如果shutdownLatch變為0,則關閉Kafka