天天看点

Apache Kafka源码分析 - kafka controller

前面已经分析过kafka server的启动过程,以及server所能处理的所有的request,即kafkaapis 

剩下的,其实关键就是controller,以及partition和replica的状态机 

这里先看看controller在broker server的基础上,多做了哪些初始化和failover的工作

最关键的一句,

kafka.server.zookeeperleaderelector

做两件事, 

1. 建立watcher,去listen “election path” in zk, "/controller"; 

当controller发生变化是,可以做相应的处理

leaderchangelistener

2. elect, 试图去创建ephemeralpath,从而成为controller

用createephemeralpathexpectconflicthandlezkbug,试图去创建ephemeralnode, 

如果不成功说明已经被别人抢占 

成功就说明我成为了controller,调用onbecomingleader(),即oncontrollerfailover

kafkacontroller.oncontrollerfailover

这个callback就是controller初始化的关键,

Apache Kafka源码分析 - kafka controller
Apache Kafka源码分析 - kafka controller

这里最后再讨论一下createephemeralpathexpectconflicthandlezkbug 

这个函数看着比较诡异,handlezkbug,到底是zk的什么bug? 

这个问题在于“the current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.” 

即zk的session过期和ephemeral node删除并不是一个原子操作,所以带来的问题的过程如下,

1. client session超时,它会假设在zk,这个ephemeral node已经被删除,但是zk当前由于某种原因hang住了,比如very long fsync operations,所以其实这个ephemeral node并没有被删除

2. 但client是认为ephemeral node已经被删除,所以它会尝试重新创建这个ephemeral node,但得到的结果是nodeexists,因为这个node并没有被删除,那么client想既然有就算了

3. 这个时候,zk从very long fsync operations的hang中恢复,它会继续之前没有完成的操作,把ephemeral node删掉

4. 这样client就会发现ephemeral node不存在了,虽然session并没有超时

所以这个函数就是为了规避这个问题, 

方法就是,当我发现nodeexists时,说明zk当前hang住了,这个时候我需要等待,并反复尝试,直到zk把这个node删除后,我再重新创建

Apache Kafka源码分析 - kafka controller
Apache Kafka源码分析 - kafka controller

这个方式有个问题就是,如果zk hang住,这里的逻辑是while true,这个函数也会一直hang住

本文章摘自博客园,原文发布日期:2015-11-05