[TOC]
之前聊过了很多Kafka启动过程中的一些加载内容,也知道了broker可以分为很多的partition,每个partition内部也可以分为leader和follower,主从之间有数据的复制。那么这么多partition是谁在管理?broker内部有没有主从之分?这就是本文的主角,KafkaController,本文将细细道来。
KafkaController的启动入口同样很简洁,在KafkaServer的start方法中。
首先实例化一个KafkaController,之后启动了这个controller。
实例化的源码,见注释:
直接上代码:
这个start方法并不意味着当前的broker就是controller,只是把它注册到zk上面,后面zk会进行选举,选举出controller后,在controller机器上面会执行一系列的操作,后面我们能看到。
首先,我们的broker会注册一个session过期的监听器,我们看一下这个监听器。
可以看到,当broker到zk的session失效之后,broker并不会主动发起重连操作,而是等待zk的重连,当新的session被创建后,也就是当前broker加入到broker列表中之后,会进行两个操作:
onControllerResignation:也就是当前controller失效
controllerElector.elect:重新进行controller选举
下面我们分别看看做了啥。
从代码看会比较直观,主要就是清理一些controller的数据。
这块是进行controller的重新选举。
这块主要进行的是controller的选举,我们着重看下当前broker被选为controller之后的动作,也就是onBecomingLeader。这块就需要我们返回到实例化中去看下,这个动作是:onControllerFailover。
这里面执行的动作很多,我们一一分析。
首先从zk中读取controller的epoch
然后将epoch+1后更新到zk中
注册一系列监听器
初始化controller上下文
启动两个状态机
订阅所有topic的分区变化监听器
定时检查触发分区选举
启动topic删除管理器
这里面的东西比较多,我们后面文章再分析。
这里的electionPath是/controller,下面我们看下这个leaderChangeListener。
监听对应的zk节点,如果节点发生了变化,调用handleDataChange方法,主要内容是获取当前的leaderId。如果当前broker之前是leader,而新的leader不是自己,那么就会调用onResigningAsLeader方法,清除之前的leader信息。
如果节点被删除了,就会调用handleDataDeleted方法。如果当前broker是leader,会首先调用onResigningAsLeader方法,然后发起新的leader选举。
这边就是我们的controller即leader选举方法。与3.1.2的内容一致。