对于kafka的consumer接口,提供两种版本,
一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该consumer group的last offset
不过要注意一些注意事项,对于多个partition和多个consumer
1. 如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数
2. 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀
最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目
3. 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同
4. 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化
5. high-level接口中获取不到数据的时候是会block的
简单版,
简单的坑,如果测试流程是,先produce一些数据,然后再用consumer读的话,记得加上第一句设置
因为初始的offset默认是非法的,然后这个设置的意思是,当offset非法时,如何修正offset,默认是largest,即最新,所以不加这个配置,你是读不到你之前produce的数据的,而且这个时候你再加上smallest配置也没用了,因为此时offset是合法的,不会再被修正了,需要手工或用工具改重置offset
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
在用high-level的consumer时,两个给力的工具,
1. bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --group pv
可以看到当前group offset的状况,比如这里看pv的状况,3个partition
group topic pid offset logsize lag owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键就是offset,logsize和lag
这里以前读完了,所以offset=logsize,并且lag=0
2. bin/kafka-run-class.sh kafka.tools.updateoffsetsinzk earliest config/consumer.properties page_visits
3个参数,
[earliest | latest],表示将offset置到哪里
consumer.properties ,这里是配置文件的路径
topic,topic名,这里是page_visits
我们对上面的pv group执行完这个操作后,再去check group offset状况,结果如下,
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已经被清0,lag=logsize
底下给出原文中多线程consumer的完整代码
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
另一种是simpleconsumer,名字起的,以为是简单的接口,其实是low-level consumer,更复杂的接口
什么时候用这个接口?
read a message multiple times
consume only a subset of the partitions in a topic in a process
manage transactions to make sure a message is processed once and only once
当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换,很麻烦
所以不是一定要用,最好别用
you must keep track of the offsets in your application to know where you left off consuming.
you must figure out which broker is the lead broker for a topic and partition
you must handle broker leader changes
使用simpleconsumer的步骤:
find an active broker and find out which broker is the leader for your topic and partition
determine who the replica brokers are for your topic and partition
build the request defining what data you are interested in
fetch the data
identify and recover from leader changes
首先,你必须知道读哪个topic的哪个partition
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
再者,自己去写request并fetch数据
最终,还要注意需要识别和处理broker leader的改变
逐步来看,
finding the lead broker for a topic and partition
思路就是,遍历每个broker,取出该topic的metadata,然后再遍历其中的每个partition metadata,如果找到我们要找的partition就返回
根据返回的partitionmetadata.leader().host()找到leader broker
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
request主要的信息就是map<topicandpartition, partitionoffsetrequestinfo>
topicandpartition就是对topic和partition信息的封装
partitionoffsetrequestinfo的定义
case class partitionoffsetrequestinfo(time: long, maxnumoffsets: int)
其中参数time,表示where to start reading data,两个取值
kafka.api.offsetrequest.earliesttime(),the beginning of the data in the logs
kafka.api.offsetrequest.latesttime(),will only stream new messages
不要认为起始的offset一定是0,因为messages会过期,被删除
另外一个参数不清楚什么含义,代码中取的是1
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
首先在fetchrequest上加上fetch,指明topic,partition,开始的offset,读取的大小
如果producer在写入很大的message时,也许这里指定的1000000是不够的,会返回an empty message set,这时需要增加这个值,直到得到一个非空的message set。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
没有特别的逻辑,只是重新调用findleader获取leader broker
并且防止在切换过程中,取不到leader信息,加上sleep逻辑
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)