天天看点

Kafka Consumer接口

对于kafka的consumer接口,提供两种版本,

一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该consumer group的last offset 

不过要注意一些注意事项,对于多个partition和多个consumer 

1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 

2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 

    最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 

3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同 

4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化 

5. high-level接口中获取不到数据的时候是会block的

简单版,

简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置 

因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset

Kafka Consumer接口
Kafka Consumer接口

在用high-level的consumer时,两个给力的工具,

1. bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --group pv

可以看到当前group offset的状况,比如这里看pv的状况,3个partition

group           topic                          pid offset          logsize         lag             owner 

pv              page_visits                    0   21              21              0               none 

pv              page_visits                    1   19              19              0               none 

pv              page_visits                    2   20              20              0               none

关键就是offset,logsize和lag 

这里以前读完了,所以offset=logsize,并且lag=0

2. bin/kafka-run-class.sh kafka.tools.updateoffsetsinzk earliest config/consumer.properties  page_visits

3个参数, 

[earliest | latest],表示将offset置到哪里 

consumer.properties ,这里是配置文件的路径 

topic,topic名,这里是page_visits

我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,

pv              page_visits                    0   0               21              21              none 

pv              page_visits                    1   0               19              19              none 

pv              page_visits                    2   0               20              20              none

可以看到offset已经被清0,lag=logsize

底下给出原文中多线程consumer的完整代码

Kafka Consumer接口
Kafka Consumer接口

另一种是simpleconsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口

什么时候用这个接口?

read a message multiple times

consume only a subset of the partitions in a topic in a process

manage transactions to make sure a message is processed once and only once

当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦 

所以不是一定要用,最好别用

you must keep track of the offsets in your application to know where you left off consuming.

you must figure out which broker is the lead broker for a topic and partition

you must handle broker leader changes

使用simpleconsumer的步骤:

find an active broker and find out which broker is the leader for your topic and partition

determine who the replica brokers are for your topic and partition

build the request defining what data you are interested in

fetch the data

identify and recover from leader changes

首先,你必须知道读哪个topic的哪个partition 

然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker 

再者,自己去写request并fetch数据 

最终,还要注意需要识别和处理broker leader的改变

逐步来看,

finding the lead broker for a topic and partition

思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回 

根据返回的partitionmetadata.leader().host()找到leader broker

Kafka Consumer接口
Kafka Consumer接口

request主要的信息就是map<topicandpartition, partitionoffsetrequestinfo>

topicandpartition就是对topic和partition信息的封装 

partitionoffsetrequestinfo的定义 

case class partitionoffsetrequestinfo(time: long, maxnumoffsets: int) 

其中参数time,表示where to start reading data,两个取值 

kafka.api.offsetrequest.earliesttime(),the beginning of the data in the logs 

kafka.api.offsetrequest.latesttime(),will only stream new messages

不要认为起始的offset一定是0,因为messages会过期,被删除

另外一个参数不清楚什么含义,代码中取的是1

Kafka Consumer接口
Kafka Consumer接口

首先在fetchrequest上加上fetch,指明topic,partition,开始的offset,读取的大小 

如果producer在写入很大的message时,也许这里指定的1000000是不够的,会返回an empty message set,这时需要增加这个值,直到得到一个非空的message set。

Kafka Consumer接口
Kafka Consumer接口
Kafka Consumer接口
Kafka Consumer接口

没有特别的逻辑,只是重新调用findleader获取leader broker 

并且防止在切换过程中,取不到leader信息,加上sleep逻辑

Kafka Consumer接口
Kafka Consumer接口
Kafka Consumer接口