對于kafka的consumer接口,提供兩種版本,
一種high-level版本,比較簡單不用關心offset, 會自動的讀zookeeper中該consumer group的last offset
不過要注意一些注意事項,對于多個partition和多個consumer
1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發的,是以consumer數不要大于partition數
2. 如果consumer比partition少,一個consumer會對應于多個partitions,這裡主要合理配置設定consumer數和partition數,否則會導緻partition裡面的資料被取的不均勻
最好partiton數目是consumer數目的整數倍,是以partition數目很重要,比如取24,就很容易設定consumer數目
3. 如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka隻保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同
4. 增減consumer,broker,partition會導緻rebalance,是以rebalance後consumer對應的partition會發生變化
5. high-level接口中擷取不到資料的時候是會block的
簡單版,
簡單的坑,如果測試流程是,先produce一些資料,然後再用consumer讀的話,記得加上第一句設定
因為初始的offset預設是非法的,然後這個設定的意思是,當offset非法時,如何修正offset,預設是largest,即最新,是以不加這個配置,你是讀不到你之前produce的資料的,而且這個時候你再加上smallest配置也沒用了,因為此時offset是合法的,不會再被修正了,需要手工或用工具改重置offset
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
在用high-level的consumer時,兩個給力的工具,
1. bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --group pv
可以看到目前group offset的狀況,比如這裡看pv的狀況,3個partition
group topic pid offset logsize lag owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
關鍵就是offset,logsize和lag
這裡以前讀完了,是以offset=logsize,并且lag=0
2. bin/kafka-run-class.sh kafka.tools.updateoffsetsinzk earliest config/consumer.properties page_visits
3個參數,
[earliest | latest],表示将offset置到哪裡
consumer.properties ,這裡是配置檔案的路徑
topic,topic名,這裡是page_visits
我們對上面的pv group執行完這個操作後,再去check group offset狀況,結果如下,
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
可以看到offset已經被清0,lag=logsize
底下給出原文中多線程consumer的完整代碼
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
另一種是simpleconsumer,名字起的,以為是簡單的接口,其實是low-level consumer,更複雜的接口
什麼時候用這個接口?
read a message multiple times
consume only a subset of the partitions in a topic in a process
manage transactions to make sure a message is processed once and only once
當然用這個接口是有代價的,即partition,broker,offset對你不再透明,需要自己去管理這些,并且還要handle broker leader的切換,很麻煩
是以不是一定要用,最好别用
you must keep track of the offsets in your application to know where you left off consuming.
you must figure out which broker is the lead broker for a topic and partition
you must handle broker leader changes
使用simpleconsumer的步驟:
find an active broker and find out which broker is the leader for your topic and partition
determine who the replica brokers are for your topic and partition
build the request defining what data you are interested in
fetch the data
identify and recover from leader changes
首先,你必須知道讀哪個topic的哪個partition
然後,找到負責該partition的broker leader,進而找到存有該partition副本的那個broker
再者,自己去寫request并fetch資料
最終,還要注意需要識别和處理broker leader的改變
逐漸來看,
finding the lead broker for a topic and partition
思路就是,周遊每個broker,取出該topic的metadata,然後再周遊其中的每個partition metadata,如果找到我們要找的partition就傳回
根據傳回的partitionmetadata.leader().host()找到leader broker
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
request主要的資訊就是map<topicandpartition, partitionoffsetrequestinfo>
topicandpartition就是對topic和partition資訊的封裝
partitionoffsetrequestinfo的定義
case class partitionoffsetrequestinfo(time: long, maxnumoffsets: int)
其中參數time,表示where to start reading data,兩個取值
kafka.api.offsetrequest.earliesttime(),the beginning of the data in the logs
kafka.api.offsetrequest.latesttime(),will only stream new messages
不要認為起始的offset一定是0,因為messages會過期,被删除
另外一個參數不清楚什麼含義,代碼中取的是1
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
首先在fetchrequest上加上fetch,指明topic,partition,開始的offset,讀取的大小
如果producer在寫入很大的message時,也許這裡指定的1000000是不夠的,會傳回an empty message set,這時需要增加這個值,直到得到一個非空的message set。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
沒有特别的邏輯,隻是重新調用findleader擷取leader broker
并且防止在切換過程中,取不到leader資訊,加上sleep邏輯
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiZpdmLlR2bjlHcvN2LcNXZnFWbp9CXt92YuM3ZvxmYuNmLu9Wbt92Yvw1LcpDc0RHaiojIsJye.gif)