天天看点

【Kafka源码】broker被选为controller之后的连锁反应一、controller epoch二、注册监听器三、分区和副本状态机

[TOC]

今天我们主要分析下broker被选为controller之后,主要干了什么。门面代码先列出来:

一个门面,涉及到的监听器和其他内容比较多,我们一一分析。

首先从zk的节点/controller_epoch下获取之前的epoch,然后将其+1后持久化到zk中。

这块就是订阅zk的节点信息,如果节点信息有变化,会做出一些操作。

这块订阅的路径是:/admin/reassign_partitions,表示的是分区的重新分配。如果有变化,会有下面的操作:

下面我们具体看下重新分配的过程,也就是initiateReassignReplicasForTopicPartition里面做了什么。

我们分析到代码watchIsrChangesForReassignedPartition时,发现里面定义的数据监听之后,其实也是调用了onPartitionReassignment,所以我们之间看下onPartitionReassignment,这是重新分配的重点。

这个方法由重新分区监听器触发,当admin触发时,它首先创建/admin/reassign_partitions路径,以触发zk监听器。分区重新分配会经历下面几步:

RAR = Reassigned replicas 重新分配的副本 OAR = Original list of replicas for partition,分区最初的副本列表 AR = current assigned replicas:当前分配的副本

1、通过OAR + RAR更新zk中的AR

2、发送LeaderAndIsr请求给AR中的每个副本,我们这样做的目的是强制更新zk中的controller epoch。

3、将RAR-OAR中副本状态变为新副本状态NewReplica,启动新副本

4、等待RAR中所有副本与leader同步

5、将RAR中所有的副本设置为OnlineReplica状态

6、设置AR到RAR的内存中

7、如果leader不在RAR中,从RAR中选举一个leader。如果需要选举,需要发送LeaderAndIsr请求。如果不是,那么controller epoch会自增,然后发送LeaderAndIsr请求。在任何情况下,都要保证AR=RAR。防止出现leader把RAR-OAR中的副本加到isr中。

8、把OAR-RAR中的副本设为OfflineReplica状态。当OfflineReplica状态变化时,我们会移除zk中ISR的OAR-RAR部分,然后发送LeaderAndIsr给leader,通知他ISR的缩减。然后,我们把OAR-RAR的副本状态改为StopReplica。

9、将OAR-RAR中所有的副本状态改为StopReplica。这会物理删除这些副本。

10、使用RAR更新ZK中的AR

11、更新zk节点/admin/reassign_partitions,删除对应的分区

12、选举完成后,副本和isr信息变化了。重新发送更新源数据的请求给每个broker。

整个过程比较绕,需要仔细理解下,下面是一个简单的过程,可以参考。

注册路径/isr_change_notification监听器。

主要是更新下leader和isr的缓存,主要是controller的epoch,然后发送更新源数据的请求。

监听/admin/preferred_replica_election路径的数据,preferred replica在leader挂掉的情况下,会直接被选为leader,也就是就是assigned replicas列表中的第一个replica。

首先是分区状态机,分区的状态有以下几个:

NonExistentPartition,分区不存在,他的前一个状态只能是OfflinePartition

NewPartition:新分区,还没有选出leader,前一个状态为NonExistentPartition

OnlinePartition:分区上线,leader已经选举出来了,前一个状态为NewPartition/OfflinePartition

OfflinePartition:分区下线,前一个状态为NewPartition/OnlinePartition

监听/brokers/topics路径数据变化,如果允许删除topic的话,监听/admin/delete_topics路径数据变化。

下面我们看下两个监听背后的动作。

这块主要处理了/brokers/topics路径下一些topic的变化,包括新增和删除的后续操作。

监听zk节点,把需要删除的topic放到待删除队列中,然后由kafka执行删除,主要删除的是zk下面相关的节点,和日志文件。

副本状态机,有以下几种状态:

NewReplica:controller在重新分区时会创建新副本,这个状态下,只能收到成为follower的请求,前一个状态是NonExistentReplica。

OnlineReplica:副本启动后的状态,这个状态下,他可以收到成为leader或follower的请求。前一个状态可以是NewReplica, OnlineReplica or OfflineReplica。

OfflineReplica:分区挂掉后的状态,前一个状态为NewReplica, OnlineReplica

ReplicaDeletionStarted:副本删除开始时的状态,前一个状态为OfflineReplica

ReplicaDeletionSuccessful:副本响应删除请求时没有错误码,这时候的状态,前一个状态为ReplicaDeletionStarted

ReplicaDeletionIneligible:副本删除失败的状态,前一个状态为ReplicaDeletionStarted

NonExistentReplica:副本删除成功后的状态,前一个状态为ReplicaDeletionSuccessful。

监听/brokers/ids路径下的节点变化。主要是broker是否有新增或者删除,然后做对应的操作。

这块主要获取了一些原始数据,包括topic、分区等等,然后启动了一些管理器。

前面几行已经有了注释,也比较清楚,下面我们从startChannelManager开始。这个ChannelManager是什么?其实就是用于leader与各个broker通信的通道。这个manager也就是管理这些请求的管理器。

这里主要处理几种请求:

LEADER_AND_ISR

STOP_REPLICA

UPDATE_METADATA_KEY

这个通道启动完成后,就是初始化三个动作:

initializePreferredReplicaElection

initializePartitionReassignment

initializeTopicDeletion

也就是replicaStateMachine.startup()。这个方法通过读取zk中的分区信息,把所有的副本状态改为OnlineReplica。

类似于副本状态机监听器,这个也是初始化了分区的状态,然后把分区的状态变为OnlineState。

如果开启了auto.leader.rebalance.enable参数,那么就会启动分区负载定时器。配置中可以设置leader.imbalance.check.interval.seconds参数,表示定时检查的时间间隔,单位为秒。

我们可以着重看下checkAndTriggerPartitionRebalance方法。

如果允许程序自动删除topic的话(delete.topic.enable=true),那么就会启动这个进程。