[TOC]
今天我们主要分析下broker被选为controller之后,主要干了什么。门面代码先列出来:
一个门面,涉及到的监听器和其他内容比较多,我们一一分析。
首先从zk的节点/controller_epoch下获取之前的epoch,然后将其+1后持久化到zk中。
这块就是订阅zk的节点信息,如果节点信息有变化,会做出一些操作。
这块订阅的路径是:/admin/reassign_partitions,表示的是分区的重新分配。如果有变化,会有下面的操作:
下面我们具体看下重新分配的过程,也就是initiateReassignReplicasForTopicPartition里面做了什么。
我们分析到代码watchIsrChangesForReassignedPartition时,发现里面定义的数据监听之后,其实也是调用了onPartitionReassignment,所以我们之间看下onPartitionReassignment,这是重新分配的重点。
这个方法由重新分区监听器触发,当admin触发时,它首先创建/admin/reassign_partitions路径,以触发zk监听器。分区重新分配会经历下面几步:
RAR = Reassigned replicas 重新分配的副本 OAR = Original list of replicas for partition,分区最初的副本列表 AR = current assigned replicas:当前分配的副本
1、通过OAR + RAR更新zk中的AR
2、发送LeaderAndIsr请求给AR中的每个副本,我们这样做的目的是强制更新zk中的controller epoch。
3、将RAR-OAR中副本状态变为新副本状态NewReplica,启动新副本
4、等待RAR中所有副本与leader同步
5、将RAR中所有的副本设置为OnlineReplica状态
6、设置AR到RAR的内存中
7、如果leader不在RAR中,从RAR中选举一个leader。如果需要选举,需要发送LeaderAndIsr请求。如果不是,那么controller epoch会自增,然后发送LeaderAndIsr请求。在任何情况下,都要保证AR=RAR。防止出现leader把RAR-OAR中的副本加到isr中。
8、把OAR-RAR中的副本设为OfflineReplica状态。当OfflineReplica状态变化时,我们会移除zk中ISR的OAR-RAR部分,然后发送LeaderAndIsr给leader,通知他ISR的缩减。然后,我们把OAR-RAR的副本状态改为StopReplica。
9、将OAR-RAR中所有的副本状态改为StopReplica。这会物理删除这些副本。
10、使用RAR更新ZK中的AR
11、更新zk节点/admin/reassign_partitions,删除对应的分区
12、选举完成后,副本和isr信息变化了。重新发送更新源数据的请求给每个broker。
整个过程比较绕,需要仔细理解下,下面是一个简单的过程,可以参考。
注册路径/isr_change_notification监听器。
主要是更新下leader和isr的缓存,主要是controller的epoch,然后发送更新源数据的请求。
监听/admin/preferred_replica_election路径的数据,preferred replica在leader挂掉的情况下,会直接被选为leader,也就是就是assigned replicas列表中的第一个replica。
首先是分区状态机,分区的状态有以下几个:
NonExistentPartition,分区不存在,他的前一个状态只能是OfflinePartition
NewPartition:新分区,还没有选出leader,前一个状态为NonExistentPartition
OnlinePartition:分区上线,leader已经选举出来了,前一个状态为NewPartition/OfflinePartition
OfflinePartition:分区下线,前一个状态为NewPartition/OnlinePartition
监听/brokers/topics路径数据变化,如果允许删除topic的话,监听/admin/delete_topics路径数据变化。
下面我们看下两个监听背后的动作。
这块主要处理了/brokers/topics路径下一些topic的变化,包括新增和删除的后续操作。
监听zk节点,把需要删除的topic放到待删除队列中,然后由kafka执行删除,主要删除的是zk下面相关的节点,和日志文件。
副本状态机,有以下几种状态:
NewReplica:controller在重新分区时会创建新副本,这个状态下,只能收到成为follower的请求,前一个状态是NonExistentReplica。
OnlineReplica:副本启动后的状态,这个状态下,他可以收到成为leader或follower的请求。前一个状态可以是NewReplica, OnlineReplica or OfflineReplica。
OfflineReplica:分区挂掉后的状态,前一个状态为NewReplica, OnlineReplica
ReplicaDeletionStarted:副本删除开始时的状态,前一个状态为OfflineReplica
ReplicaDeletionSuccessful:副本响应删除请求时没有错误码,这时候的状态,前一个状态为ReplicaDeletionStarted
ReplicaDeletionIneligible:副本删除失败的状态,前一个状态为ReplicaDeletionStarted
NonExistentReplica:副本删除成功后的状态,前一个状态为ReplicaDeletionSuccessful。
监听/brokers/ids路径下的节点变化。主要是broker是否有新增或者删除,然后做对应的操作。
这块主要获取了一些原始数据,包括topic、分区等等,然后启动了一些管理器。
前面几行已经有了注释,也比较清楚,下面我们从startChannelManager开始。这个ChannelManager是什么?其实就是用于leader与各个broker通信的通道。这个manager也就是管理这些请求的管理器。
这里主要处理几种请求:
LEADER_AND_ISR
STOP_REPLICA
UPDATE_METADATA_KEY
这个通道启动完成后,就是初始化三个动作:
initializePreferredReplicaElection
initializePartitionReassignment
initializeTopicDeletion
也就是replicaStateMachine.startup()。这个方法通过读取zk中的分区信息,把所有的副本状态改为OnlineReplica。
类似于副本状态机监听器,这个也是初始化了分区的状态,然后把分区的状态变为OnlineState。
如果开启了auto.leader.rebalance.enable参数,那么就会启动分区负载定时器。配置中可以设置leader.imbalance.check.interval.seconds参数,表示定时检查的时间间隔,单位为秒。
我们可以着重看下checkAndTriggerPartitionRebalance方法。
如果允许程序自动删除topic的话(delete.topic.enable=true),那么就会启动这个进程。