天天看點

Kafka Consumer接口

對于kafka的consumer接口,提供兩種版本,

一種high-level版本,比較簡單不用關心offset, 會自動的讀zookeeper中該consumer group的last offset 

不過要注意一些注意事項,對于多個partition和多個consumer 

1. 如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發的,是以consumer數不要大于partition數 

2. 如果consumer比partition少,一個consumer會對應于多個partitions,這裡主要合理配置設定consumer數和partition數,否則會導緻partition裡面的資料被取的不均勻 

    最好partiton數目是consumer數目的整數倍,是以partition數目很重要,比如取24,就很容易設定consumer數目 

3. 如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka隻保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同 

4. 增減consumer,broker,partition會導緻rebalance,是以rebalance後consumer對應的partition會發生變化 

5. high-level接口中擷取不到資料的時候是會block的

簡單版,

簡單的坑,如果測試流程是,先produce一些資料,然後再用consumer讀的話,記得加上第一句設定 

因為初始的offset預設是非法的,然後這個設定的意思是,當offset非法時,如何修正offset,預設是largest,即最新,是以不加這個配置,你是讀不到你之前produce的資料的,而且這個時候你再加上smallest配置也沒用了,因為此時offset是合法的,不會再被修正了,需要手工或用工具改重置offset

Kafka Consumer接口
Kafka Consumer接口

在用high-level的consumer時,兩個給力的工具,

1. bin/kafka-run-class.sh kafka.tools.consumeroffsetchecker --group pv

可以看到目前group offset的狀況,比如這裡看pv的狀況,3個partition

group           topic                          pid offset          logsize         lag             owner 

pv              page_visits                    0   21              21              0               none 

pv              page_visits                    1   19              19              0               none 

pv              page_visits                    2   20              20              0               none

關鍵就是offset,logsize和lag 

這裡以前讀完了,是以offset=logsize,并且lag=0

2. bin/kafka-run-class.sh kafka.tools.updateoffsetsinzk earliest config/consumer.properties  page_visits

3個參數, 

[earliest | latest],表示将offset置到哪裡 

consumer.properties ,這裡是配置檔案的路徑 

topic,topic名,這裡是page_visits

我們對上面的pv group執行完這個操作後,再去check group offset狀況,結果如下,

pv              page_visits                    0   0               21              21              none 

pv              page_visits                    1   0               19              19              none 

pv              page_visits                    2   0               20              20              none

可以看到offset已經被清0,lag=logsize

底下給出原文中多線程consumer的完整代碼

Kafka Consumer接口
Kafka Consumer接口

另一種是simpleconsumer,名字起的,以為是簡單的接口,其實是low-level consumer,更複雜的接口

什麼時候用這個接口?

read a message multiple times

consume only a subset of the partitions in a topic in a process

manage transactions to make sure a message is processed once and only once

當然用這個接口是有代價的,即partition,broker,offset對你不再透明,需要自己去管理這些,并且還要handle broker leader的切換,很麻煩 

是以不是一定要用,最好别用

you must keep track of the offsets in your application to know where you left off consuming.

you must figure out which broker is the lead broker for a topic and partition

you must handle broker leader changes

使用simpleconsumer的步驟:

find an active broker and find out which broker is the leader for your topic and partition

determine who the replica brokers are for your topic and partition

build the request defining what data you are interested in

fetch the data

identify and recover from leader changes

首先,你必須知道讀哪個topic的哪個partition 

然後,找到負責該partition的broker leader,進而找到存有該partition副本的那個broker 

再者,自己去寫request并fetch資料 

最終,還要注意需要識别和處理broker leader的改變

逐漸來看,

finding the lead broker for a topic and partition

思路就是,周遊每個broker,取出該topic的metadata,然後再周遊其中的每個partition metadata,如果找到我們要找的partition就傳回 

根據傳回的partitionmetadata.leader().host()找到leader broker

Kafka Consumer接口
Kafka Consumer接口

request主要的資訊就是map<topicandpartition, partitionoffsetrequestinfo>

topicandpartition就是對topic和partition資訊的封裝 

partitionoffsetrequestinfo的定義 

case class partitionoffsetrequestinfo(time: long, maxnumoffsets: int) 

其中參數time,表示where to start reading data,兩個取值 

kafka.api.offsetrequest.earliesttime(),the beginning of the data in the logs 

kafka.api.offsetrequest.latesttime(),will only stream new messages

不要認為起始的offset一定是0,因為messages會過期,被删除

另外一個參數不清楚什麼含義,代碼中取的是1

Kafka Consumer接口
Kafka Consumer接口

首先在fetchrequest上加上fetch,指明topic,partition,開始的offset,讀取的大小 

如果producer在寫入很大的message時,也許這裡指定的1000000是不夠的,會傳回an empty message set,這時需要增加這個值,直到得到一個非空的message set。

Kafka Consumer接口
Kafka Consumer接口
Kafka Consumer接口
Kafka Consumer接口

沒有特别的邏輯,隻是重新調用findleader擷取leader broker 

并且防止在切換過程中,取不到leader資訊,加上sleep邏輯

Kafka Consumer接口
Kafka Consumer接口
Kafka Consumer接口