雖然目前Kafka0.10版本已經重寫了其API,但底層原理是類似的,是以我們可以先了解kafka0.8.x裡面的提供的Consumer的實作原理與互動流程
Kafka提供了兩套API給Consumer
- The SimpleConsumer API
- The high-level Consumer API
1. 低階API
本質上是提供了一種與broker互動資訊的API
剩下的處理全靠使用者自己的程式,功能比較簡單,但使用者擴充性比較強
1) API結構
低階API的consumer,指定查找topic某個partition的指定offset去消費
首先與broker通信,尋找到leader(不與zookeeper通信,不存在groupid),然後直接和leader通信,指定offset去消費。消費多少,從哪裡開始消費,都可控(我們的例子是從0開始消費)
findLeader方法中會去調用findPartitionMetadata方法
程式運作結果:
運作過程中一直卡住沒有成功消費,加入如下錯誤資訊判斷,發現error code為1
說明我們從offset 0消費offsetoutofrange了
(我們發送請求topic1 partition0的offset 0 broker回複我們offset out of range,因為kafka中已經沒有offset 0 的資料了,已經過期清理掉了)
是以我們添加getLastOffset,getEarliestOffset的方法,擷取該topic該partition在kafka叢集中有的的最小和最大的offset
調整offset之後,可能最新的資料也過期了,于是擷取到的message的size為0
檢視SimpleConsumer的源碼:
1) 互動過程
使用SimpleConsumer的步驟
1) 從所有活躍的broker中找出哪個是指定Topic Partition中的leader broker
2) 擷取kafka中已存在的offset通路(或人工指定)
3) 構造請求
4) 發送請求查詢資料
5) 擷取查詢結果,處理(判斷擷取的結果,進行相應的處理)
處理就包括:
l 處理offset不存在的情況
l 處理offset的增長
l 處理leader broker變更
(當連接配接的這個brokerdown掉,我們要寫程式捕獲異常并且寫程式去切換broker,重新連接配接)
注意:該API是不阻塞的,SimpleConsumer傳一個請求過去,不論是資料過期、新的資料還沒來等,都會有一個response回來的
使用SimpleConsumer有哪些弊端呢?
l 必須在程式中跟蹤offset值
l 必須找出指定Topic Partition中的lead broker
l 必須處理broker的變動
(當連接配接的這個brokerdown掉,我們要寫程式捕獲異常并且寫程式去切換broker,重新連接配接)
l 如果多個SimpleConsumer共享消費某個topic,想要實作彼此的負載均衡,需要添加很多額外代碼
(多個用戶端共享某個topic,就要保證他們的消費是互斥的,不能消費到同一條資料,比如A,B,C共享topicX共4個partition,那麼A就消費partition0,B消費partition1,partition2,C就消費partition3,保證其消費互相獨立,并且A,B,C的消費總和是整個topicX的所有消息)
2. 高階API
本質上是提供了一個完整的程式,内置各種功能(比如和其他consumer的負載均衡,比如處理broker變動)
使用者隻需要調用API即可,功能非常強大,但使用者擴充性比較差
1) API
高階消費者API必須要指定group.id,否則會報錯
2) 負載均衡原理與算法
該程式,運作多個程序,他們之間是可以實作負載均衡的。前提是他們同屬于一個group,擁有相同的groupid。
每個consumer都會監聽zk上topic partition資訊和consumer的資訊
添加一個節點,他們就監聽到變化,監聽到變化就會調用rebanlance去重新算自己需要消費哪些partition。然後開始消費。
每個consumer都自己獨立去調整自己的消費
會觸發consumer rebalance的場景有如下場景:
l 條件1:有新的consumer加入
l 條件2:舊的consumer挂了
l 條件3:coordinator挂了,叢集選舉出新的coordinator(0.10 特有的)
l 條件4:topic的partition新加
l 條件5:consumer調用unsubscrible(),取消topic的訂閱
這種負載均衡方案存在的問題
- Herd effect(羊群效應)
任何Broker或者Consumer的增減都會觸發所有的Consumer的Rebalance,造成叢集内大量的調整
- Split Brain
每個Consumer分别單獨通過Zookeeper判斷哪些Broker和Consumer 當機了,那麼不同Consumer在同一時刻從Zookeeper“看”到的View就可能不一樣,這是由Zookeeper的特性決定的,這就會造成不正确的Reblance嘗試。
- 調整結果不可控
所有的Consumer都并不知道其它Consumer的Rebalance是否成功,這可能會導緻Kafka工作在一個不正确的狀态。
為了解決這些問題,Kafka作者在0.9.x版本中開始使用中心協調器(Coordinator)。由它統一來監聽zookeeper,生成rebalance指令,并且判斷是否成功,不成功進行重試(後面講解)
3) blockingQueue
consumerMap
Topic1 | 4 |
Topic2 | 2 |
Topic3 | 1 |
核心建立方案就是:
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
這個topicCountMap主要是指定消費線程數,該API底層的實作如下圖:
1.會為每個topic生成對應的消費線程,我們可以叫它消費線程。它會從一個blockingQueue裡面取資料。這個取的過程是阻塞的,如果queue中沒有資料,就會阻塞。
2.會為每個kafka的broker生成一個fetch線程,我們可以叫它取資料線程。每個fetch thread會于kafka broker建立一個連接配接。fetch thread線程去拉取消息資料,最終放到對應的blockingQueue中,等待消費線程來消費。
用戶端使用時:
根據topic,指定取某一個消費線程,拿出流資料,然後可以周遊該資料了,如上所述,該方法會是阻塞的,如果沒有資料了,它就會阻塞在這裡。
4) 關于offset
該api除了使用zk做負載均衡 還會用它記錄offset。
/consumers/groupid/offsets/topic/partition/xxxx
記錄消費到的offset值
上圖左邊為在zookeeper中沒有此groupid節點的流程 右邊為有的流程
groupid節點如果沒有,會建立,然後offset的建立初始值會在kafka中擷取,預設是擷取最新的offset,也可以指定擷取kafka中目前存在的最小offset,如下參數可以人工指定
//偏移量,初始化從哪個位置讀
//props.put("auto.offset.reset", "smallest");
注意:此設定隻有在運作初始化的時候有效,如果zookeeper中已經有值,那麼這個參數是無效的,會直接去讀zookeeper中的offset值。如果還想擷取之前的資料,方法1手動修改zookeeper中該offset的值,方法2換一個groupid去消費,指定smallest。
在擷取到offset值之後,就是去kafka中消費資料,然後在zookeeper中更新此offset的值。這些都是API底層幫我們實作了,我們上層API無感覺。
5) 負載均衡小實驗
建立一個partition為2的topic
建立兩個groupid相同的consumer程序來消費這個topic的消息
一個producer不斷的打入消息
結果:
附API使用代碼
高階API
package com.wangke.consumer;
import com.wangke.kafkaProducerConsumer.KafkaProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Created by dell on 2017/8/11.
*/
public class HighConsumerTest {
private static ConsumerConfig createConsumerConfig()
{
String zkConnect = "ip:2181";
String groupId = "group2";
Properties props = new Properties();
props.put("zookeeper.connect", zkConnect);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
//偏移量,從哪個位置讀
props.put("auto.offset.reset", "smallest");
return new ConsumerConfig(props);
}
public static void main(String[] args) throws InterruptedException {
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
String topic = "test7";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int i=0;
while (it.hasNext()) {
System.out.println("receive:" + new String(it.next().message()));
i++;
if(i==10)
break;
}
consumer.shutdown();
}
}
低階API
package com.wangke.consumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
*
* @author wangke
* @date 2017/8/9
*/
public class SimpleComsumerTest {
public static void main(String[] args) throws InterruptedException {
String BROKER_CONNECT = "ip:9092";
String TOPIC = "topic";
int partitionNum = 0;
// 找到leader
Broker leaderBroker = findLeader(BROKER_CONNECT, TOPIC, partitionNum);
if(leaderBroker==null){
System.out.println("未找到leader資訊");
return;
}
// 從leader消費 soTimeout bufferSize clientId
SimpleConsumer simpleConsumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), 20000, 10000, "mySimpleConsumer");
long startOffet = 0;
int fetchSize = 500;
long offset = startOffet;
while (true ) {
System.out.println("offset:"+offset);
// 添加fetch指定目标topic,分區,起始offset及fetchSize(位元組),可以添加多個fetch
FetchRequest req = new FetchRequestBuilder().addFetch(TOPIC, partitionNum, offset, fetchSize).build();
// 拉取消息
FetchResponse fetchResponse = simpleConsumer.fetch(req);
if (fetchResponse.hasError()) {
// Something went wrong!
short code = fetchResponse.errorCode(TOPIC, partitionNum);
System.out.println("Error fetching data from the Broker:" + leaderBroker + " Reason: " + code);
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
//offset = getEarliestOffset(simpleConsumer, TOPIC, partitionNum, "mySimpleConsumer");
offset = getLastOffset(simpleConsumer, TOPIC, partitionNum, "mySimpleConsumer");
continue;
}
System.out.println("Error fetching data Offset Data the Broker. Reason: " + fetchResponse.errorCode(TOPIC, partitionNum));
continue;
}
ByteBufferMessageSet messageSet = fetchResponse.messageSet(TOPIC, partitionNum);
if(messageSet.sizeInBytes() ==0){
Thread.sleep(5000);
System.out.println("資料為空");
continue;
}
for (MessageAndOffset messageAndOffset : messageSet) {
Message mess = messageAndOffset.message();
ByteBuffer payload = mess.payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
String msg = new String(bytes);
offset = messageAndOffset.offset();
System.out.println("partition : " + partitionNum + ", offset : " + offset + " mess : " + msg);
}
// 繼續消費下一批
offset = offset + 1;
Thread.sleep(5000);
}
}
/**
* 找到制定分區的leader broker
*
* @param brokerHosts broker位址,格式為:“host1:port1,host2:port2,host3:port3”
* @param topic topic
* @param partition 分區
* @return
*/
private static Broker findLeader(String brokerHosts, String topic, int partition) {
PartitionMetadata partitionMetadata = findPartitionMetadata(brokerHosts, topic, partition);
if(partitionMetadata==null){
System.out.println("未找到leader資訊");
return null;
}
Broker leader = partitionMetadata.leader();
System.out.println(String.format("Leader tor topic %s, partition %d is %s:%d", topic, partition, leader.host(), leader.port()));
return leader;
}
/**
* 找到指定分區的中繼資料
*
* @param brokerHosts broker位址,格式為:“host1:port1,host2:port2,host3:port3”
* @param topic topic
* @param partition 分區
* @return 中繼資料
*/
private static PartitionMetadata findPartitionMetadata(String brokerHosts, String topic, int partition) {
PartitionMetadata returnMetaData = null;
for (String brokerHost : brokerHosts.split(",")) {
SimpleConsumer consumer = null;
String[] splits = brokerHost.split(":");
consumer = new SimpleConsumer(splits[0], Integer.valueOf(splits[1]), 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest request = new TopicMetadataRequest(topics);
TopicMetadataResponse response = consumer.send(request);
List<TopicMetadata> topicMetadatas = response.topicsMetadata();
for (TopicMetadata topicMetadata : topicMetadatas) {
for (PartitionMetadata PartitionMetadata : topicMetadata.partitionsMetadata()) {
if (PartitionMetadata.partitionId() == partition) {
returnMetaData = PartitionMetadata;
break;//找到中繼資料,程式可以退出了
}
}
}
if (consumer != null)
consumer.close();
}
return returnMetaData;
}
/*消費者消費一個topic的指定partition時,從哪裡開始讀資料
*kafka.api.OffsetRequest.EarliestTime()找到日志中資料的最開始頭位置,從那裡開始消費(hadoop-consumer中使用的應該就是這種方式)
*kafka.api.OffsetRequest.LatestTime()隻消費最新的資料
*注意,不要假設0是offset的初始值
*參數:long whichTime的取值即兩種:
* kafka.api.OffsetRequest.LatestTime()
* kafka.api.OffsetRequest.LatestTime()
*傳回值:一個long類型的offset*/
private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private static long getEarliestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
}