天天看點

Kafka Consumer

Consumer

High Level Consumer

簡要:很多應用場景下,客戶程式知識希望從Kafka順序讀取并處理資料,并不太關心具體的offset。

同時也希望提供一些語義,例如同一條消息隻被一個Consumer消費(單點傳播)或被所有Consumer消費(廣播)。

Kafka High Level API提供了一個從Kafka消費資料的高層抽象,進而屏蔽掉其中的細節,并提供豐富的語義。

Consumer Group

High Level Consumer将從某個Patition讀取的最後一條消息的offset存在Zookeeper中(從0.8.2開始同時支援将offset存在Zookeeper中和專用的Kafka topic中)。

這裡就有一個問題:如果消費一條消息就将其offset存在Topic中,那麼存在Topic的offset資料量将和被消費的資料量是一樣的。如果我們想要知道哪條消息被哪個Consumer消費,那麼我們就得再起一個Consumer去pull這個存有offset的Topic。而且消息被删除後,可能會影響到offset。這樣Kafka不是通過時間等去回收offset,而是通過Compaction的方式,将key想同的value隻保留最後一條offset。

這個offset基于用戶端程式提供給Kafka的名字來儲存,這個名字被稱為Consumer Group。

Consumer Group是整個Kafka叢集全局唯一的,而非針對某個Topic。

每個High level Consumer執行個體都屬于一個Consumer Group,若不指定則屬于預設的Group。

消息被消費後,并不會被删除,隻是相應的offset加一。

對于每條消息,在同一個Consumer Group裡隻會被一個Consumer消費。

不同Consumer Group可消費同一條消息。

(假設一個Broker裡就一個Patition)

Kafka Consumer

Kafka的設計理念之一就是同時提供對離線批處理和線上流處理的支援

可同時使用Hadoop系統進行離線批處理,Storm或其它流處理系統進行流處理

可使用Kafka的Mirror Maker将消息從一個資料中心鏡像到另一個資料中心

驗證  消息的單點傳播和廣播方式

1.重新部署叢集,目的是将資料清空,啟動Kafka和Zookeeper叢集

Kafka Consumer

2.建立Topic,設定Topic中Patiton的數目為3個,replication為1個

Kafka Consumer
Kafka Consumer

3.我們設定2組Consumer Group,一組2個Consumer,另外一組1個Consumer

Kafka Consumer
Kafka Consumer

4.Producer發送3條消息test0,test1,test2,發現

Kafka Consumer
Kafka Consumer
Kafka Consumer

High Level Consumer Rebalance

Consumer啟動及Rebalance流程

High Level Consumer啟動時将其ID注冊到其Consumer Group下,在Zookeeper上的路徑為/consumers/[consumer group]/ids/[consumer id]

在/consumers/[consumer group]/ids上注冊Watch,為了監控Consumer挂了。

在/brokers/ids上注冊Watch,為了監控Broker是否挂了

如果Consumer通過Topic Filter建立消息流,則它會同時在/brokers/topics上建立Watch

強制自己在其Consumer Group内啟動Rebalace

Consumer Rebalance算法

将目标Topic下所有Partition排序,存于集合P中

對某個Consumer Group下所有Consumer排序,存于集合C中,第i個Consumer記為Ci

N=size(P)/size(C),向上取整

解除Ci對原來配置設定的Partition的消費權(i從0開始)

将第i*N到(I+1)*N-1個Partitoon配置設定給Ci

Consumer rebalance算法缺陷及改進

任何Broker或者Consumer的增減都會促發所有的Consumer的Rebalance

Split Brain(腦裂)每個Consumer分别單獨通過Zookeeper判斷哪些Broker和Consumer當機,同時Consumer在同一時刻從Zookeeper”看”到的view可能不一樣,這是由Zookeeper特性決定的

調整結果不可控,所有Consumer分别進行Rebalance,彼此不知道對應的Rebalance是否成功。

Low Level Consumer

使用Low Level Consumer(Simple Consumer)的主要原因是,使用者希望比Consumer Group更好的控制資料的消費,如

同一條消息讀多次,友善Replay

隻消費某個Topic的部分Partiton

管理事務,進而確定每條消息被處理一次(Exactly once)

與High Level Consumer相對,Low Level Consumer要求使用者做大量的額外工作

在應用程式中跟蹤處理offset,并決定下一條消費哪條消息

獲知每個Partition的Leader

處理Leader的變化

處理多Consumer的協作

Kafka Consumer