天天看點

kafka使用_Kafka介紹與使用

最近在研究kafka,覺得需要輸出點東西才能更好的吸收,遂總結與大家分享,話不多說。

一、先上思維導圖:

kafka使用_Kafka介紹與使用

二、再上kafka整體架構圖:

kafka使用_Kafka介紹與使用

2.1、Producer:消息生産者,就是向kafka broker發消息的用戶端。

2.2、Consumer :消息消費者,向kafka broker取消息的用戶端

2.3、Topic :每條釋出到kafka叢集的消息都有一個類别,這個類别被稱為主題Topic。(實體上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然儲存于一個或多個broker上但使用者隻需指定消息的Topic即可生産或消費資料而不必關心資料存于何處)。

2.4、Consumer Group (CG):這是kafka用來實作一個topic消息的廣播(發給所有的consumer)和單點傳播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的消息會複制(不是真的複制,是概念上的)到所有的CG,但每個partition隻會把消息發給該CG中的一個consumer。如果需要實作廣播,隻要每個consumer有一個獨立的CG就可以了。要實作單點傳播隻要所有的consumer在同一個CG。用CG還可以将consumer進行自由的分組而不需要多次發送消息到不同的topic。

2.5、Broker :一台kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。

2.6、Partition:為了實作擴充性,一個非常大的topic可以分布到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的隊列。partition中的每條消息都會被配置設定一個有序的id(offset)。kafka隻保證按一個partition中的順序将消息發給consumer,不保證一個topic的整體(多個partition間)的順序。

2.7、Offset:kafka的存儲檔案都是按照offset.kafka來命名,用offset做名字的好處是友善查找。例如你想找位于2049的位置,隻要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka。

三、部分小點請看導圖

四、Kafka叢集部署 (提前備好ZK叢集環境)

4.1、下載下傳安裝包

http://kafka.apache.org/downloads

或者在linux中使用wget指令下載下傳安裝包

wget http://mirrors.hust.edu.cn/apache/kafka/2.5.0/kafka_2.13-2.5.0.tgz

4.2、解壓安裝包

tar -zxvf /root/mysoftpackage/kafka_2.13-2.5.0.tgz -C /root/apps/

4.3、建立軟連結

如後續配置環境變量後,更新版本啥的不用再重新配置環境變量。

cd /root/apps/

ln -s kafka_2.13-2.5.0 kafka

4.4、修改配置檔案

cp /root/apps/kafka/config/server.properties 

/root/apps/kafka/config/server.properties.bak

vi /root/apps/kafka/config/server.properties

修改以下配置:

# Broker的全局唯一編号,叢集内不重複即可

broker.id=0

#kafka運作日志存放的路徑

log.dirs=/root/kafkadata/logs

#kafka依賴的ZK叢集

zookeeper.connect=hdp-node-01:2181,hdp-node-02:2181,hdp-node-03:2181

vi /root/apps/kafka/config/producer.properties

修改以下配置:

bootstrap.servers=hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092

vi /root/apps/kafka/config/consumer.properties

修改以下配置:

bootstrap.servers=hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092

4.5、分發安裝包

scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-02:/root/apps

scp -r /root/apps/kafka_2.13-2.5.0 hdp-node-03:/root/apps

然後分别在各機器上建立軟連

cd /root/apps/

ln -s kafka_2.13-2.5.0 kafka

4.6、再次修改配置檔案(重要)

依次修改各伺服器上配置檔案的的broker.id,分别是0,1,2不得重複。

4.7、環境變量配置

vi /etc/profile

export KAFKA_HOME=/root/apps/kafka

export PATH=$PATH:$KAFKA_HOME/bin

重新整理下系統環境變量

source /etc/profile

4.8、守護程序啟動叢集

依次在各節點上啟動kafka

kafka-server-start.sh -daemon /root/apps/kafka/config/server.properties

4.9、編寫腳本批量啟動叢集kafka服務(kafkaBatchStart.sh)

#!/bin/bash

for i in 1 2 3

do

ssh hdp-node-0$i "source /etc/profile;/root/apps/kafka/bin/kafka-server-start.sh -daemon /root/apps/kafka/config/server.properties"

done

五、基本管理操作Shell指令

5.1、檢視目前伺服器中的所有topic

kafka-topics.sh --list --zookeeper hdp-node-01:2181
kafka使用_Kafka介紹與使用

5.2、建立topic

replication-facto:副本數、partition:分區數

kafka-topics.sh --create --zookeeper hdp-node-01:2181 --replication-factor 3 --partitions 3 --topic goodsMq

5.3、删除topic

kafka-topics.sh --delete --zookeeper hdp-node-01:2181 --topic goodsMq

5.4、通過shell指令發送消息

kafka-console-producer.sh --broker-list hdp-node-01:9092 --topic goodsMq

kafka-console-producer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq

kafka使用_Kafka介紹與使用

5.5、通過shell消費消息

--from-beginning:指定偏移量從頭開始消費

kafka-console-consumer.sh --bootstrap-server hdp-node-01:9092,hdp-node-02:9092 --topic goodsMq --from-beginning

kafka使用_Kafka介紹與使用

5.6、檢視某個Topic的詳情

kafka-topics.sh --topic goodsMq --describe --zookeeper hdp-node-01:2181,hdp-node-02:2181
kafka使用_Kafka介紹與使用

六、Java簡單代碼示例

6.1、引入pom依賴

<dependency><groupId>org.apache.kafkagroupId><artifactId>kafka-clientsartifactId><version>2.5.0version>dependency
           

6.2、消息生産者

public static void main(String[] args) {   //指定目前kafka producer生産的資料的目的地   String topicName = "orderMq";   // 讀取配置檔案   Properties props = new Properties();   //指定kafka伺服器位址 如果是叢集可以指定多個 但是就算隻指定一個他也會去叢集環境下尋找其他的節點位址   props.setProperty("bootstrap.servers","hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092");   //key序列化器   props.setProperty("key.serializer", StringSerializer.class.getName());   //value序列化器   props.setProperty("value.serializer",StringSerializer.class.getName());   //通過配置檔案,建立生産者   KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);   //生産資料   for (int messageNo = 1; messageNo < 100; messageNo++) {      //調用producer的send方法發送資料      ProducerRecord record = new ProducerRecord<String, String>(topicName, messageNo + "", "appid-" + UUID.randomUUID() + "-測試");      //發送記錄      producer.send(record);   }   producer.close();   System.out.println("done!!!");}
           

6.3、消息消費者

public static void main(String[] args) throws Exception{    Properties properties = new Properties();    properties.setProperty("bootstrap.servers","hdp-node-01:9092,hdp-node-02:9092,hdp-node-03:9092");    properties.setProperty("key.deserializer", StringDeserializer.class.getName());    properties.setProperty("value.deserializer",StringDeserializer.class.getName());    properties.setProperty("group.id","test-consumer-group");    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);    consumer.subscribe(Collections.singletonList("orderMq"));    while (true){        ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(500));        for (ConsumerRecord<String, String> record : poll) {            System.out.println(record.key() + "=" + record.value());        }    }}
           

七、思考

7.1、Kafka為什麼效率高吞吐量大

1)、硬碟的索引功能,二分查找法。

分區:找到相應的leader分區負責讀寫操作;

分段:根據檔案segment的命名可以确認要查找的offset或timestamp在哪個檔案中;

稀疏索引:快速确定要找的offset在哪個記憶體位址的附近。

2)、通過Partition實作并行處理

3)、I/O優化:

3.1)、磁盤的順序寫入(600MB/S)

kafka使用_Kafka介紹與使用

3.2)、充分利用作業系統檔案讀取緩存(PageCache)

       Kafka 的資料并不是實時的寫入硬碟,它充分利用了現代作業系統分頁存儲來利用記憶體提高 I/O 效率。再通過mmap(Memory Mapped Files)記憶體映射檔案零拷貝的方式,它的工作原理是直接利用作業系統的 Page 來實作檔案到實體記憶體的直接映射,完成映射之後你對實體記憶體的操作會被同步到硬碟上(作業系統在适當的時候)。通過 mmap,程序像讀寫硬碟一樣讀寫記憶體(當然是虛拟機記憶體),也不必關心記憶體的大小,有虛拟記憶體為我們兜底。

       使用這種方式可以擷取很大的 I/O 提升,省去了使用者空間到核心空間複制的開銷。但也有一個很明顯的缺陷——不可靠,寫到 mmap 中的資料并沒有被真正的寫到硬碟,作業系統會在程式主動調用 Flush 的時候才把資料真正的寫到硬碟。

       Kafka 提供了一個參數 producer.type 來控制是不是主動 Flush:

如果Kafka 寫入到 mmap 之後就立即 Flush,然後再傳回 Producer 叫同步 (Sync)。如果 Kafka 寫入 mmap 之後立即傳回 Producer 不調用 Flush 叫異步 (Async)。

3.3.)、基于 Sendfile 實作零拷貝(Zero Copy)方式讀取磁盤資料

傳統模式下,當需要對一個檔案進行傳輸的時候,其具體流程細節如下:

kafka使用_Kafka介紹與使用

a、調用 Read 函數,檔案資料被 Copy 到核心緩沖區。

b、Read 函數傳回,檔案資料從核心緩沖區 Copy 到使用者緩沖區

c、Write 函數調用,将檔案資料從使用者緩沖區 Copy 到核心與 Socket 相關的緩沖區。

d、資料從 Socket 緩沖區 Copy 到相關協定引擎。

以上細節是傳統 Read/Write 方式進行網絡檔案傳輸的方式,我們可以看到,在這個過程當中,檔案資料實際上是經過了四次 Copy 操作:

硬碟—>核心 buf—>使用者 buf—>Socket 相關緩沖區—>協定引擎

Sendfile 的引入以減少資料複制,同時減少上下文切換

kafka使用_Kafka介紹與使用

3.4)、批量壓縮減少網絡IO損耗

      在很多情況下,系統的瓶頸不是 CPU 或磁盤,而是網絡 IO,對于需要在廣域網上的資料中心之間發送消息的資料流水線尤其如此。進行資料壓縮會消耗少量的 CPU 資源,不過對于 Kafka 而言,網絡 IO 更應該考慮:因為每個消息都壓縮,但是壓縮率相對很低,是以 Kafka 使用了批量壓縮,即将多個消息一起壓縮而不是單個消息壓縮。

       Kafka 允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費者解壓縮。

       kafka在壓縮資料時使用的壓縮算法,可選參數有:none、gzip、snappy。none即不壓縮,gzip,和snappy壓縮算法之間取舍的話gzip壓縮率比較高,系統cpu占用比較大,但是帶來的好處是,網絡帶寬占用少,snappy壓縮比沒有gzip高,cpu占用率不是很高,性能也還行,如果網絡帶寬比較緊張的話。可以選擇gzip,一般推薦snappy。

7.2、資料生産時的分發政策是什麼

Producer用戶端負責消息的分發。

      kafka叢集中的任何一個broker都可以向producer提供metadata資訊,這些metadata中包含"叢集中存活的servers清單、partitions、leader清單"等資訊;

     當producer擷取到metadata資訊之後, producer将會和Topic下所有partition leader保持socket連接配接;

     消息由producer直接通過socket發送到broker,中間不會經過任何"路由層",事實上,消息被路由到哪個partition上由producer用戶端決定;

     比如可以采用"random"、"key-hash""輪詢"等,如果一個topic中有多個partitions,那麼在producer端實作"消息均衡分發"是必要的。

     在producer端的配置檔案中,開發者可以指定partition路由的方式。

7.3、如何保證資料不丢失完全生産

Producer消息發送的應答機制。

設定發送資料是否需要服務端的回報,有三個值0,1,all

0: producer不會等待broker發送ack 

1: 當leader接收到消息之後發送ack 

all: 當所有的follower都同步消息成功後發送ack

request.required.acks=0

7.4、Partition如何分布在不同的Broker上

//第i個partition

int i = 0;

//broker清單

list{ broker01, broker02, broker03}

for(int i=0;i<5;i++){

brIndex = I % list.size;

       //第i個partition分布在hostName上

hostName = list.get(brIndex)

}

7.5、Broker如何儲存資料其檔案存儲機制是什麼

1)、Kafka檔案存儲基本結構

       在Kafka檔案存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規則為topic名稱+有序序号,第一個partiton序号從0開始,序号最大值為partitions數量減1。

       每個partion(目錄)相當于一個巨型檔案被平均配置設定到多個大小相等segment(段)資料檔案中。但每個段segment file消息數量不一定相等,這種特性友善old segment file快速被删除。

       預設保留7天的資料。

kafka使用_Kafka介紹與使用
      每個partiton隻需要支援順序讀寫就行了,segment檔案生命周期由服務端配置參數決定。(什麼時候建立,什麼時候删除)
kafka使用_Kafka介紹與使用

2)、Kafka Partition Segment

     Segment file組成:由2大部分組成,分别為index file和data file,這兩個檔案一一對應,成對出現,字尾".index"和“.log”分别表示為segment索引檔案、資料檔案。

kafka使用_Kafka介紹與使用

       Segment檔案命名規則:partion全局的第一個segment從0開始,後續每個segment檔案名為上一個segment檔案最後一條消息的offset值。數值最大為64位long大小,19位數字字元長度,沒有數字用0填充。

      索引檔案存儲大量中繼資料,資料檔案存儲大量消息,索引檔案中中繼資料指向對應資料檔案中message的實體偏移位址。

kafka使用_Kafka介紹與使用

       其中以索引檔案中中繼資料3,497為例,依次在資料檔案中表示第3個message(在全局partiton表示第368772個message)、以及該消息的實體偏移位址為497。

3)、Kafka 查找message

讀取offset=368776的message,需要通過下面2個步驟查找。

kafka使用_Kafka介紹與使用

第一步:查找segment file

00000000000000000000.index表示最開始的檔案,起始偏移量(offset)為0。

00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1。

00000000000000737337.index的起始偏移量為737338=737337 + 1

其他後續檔案依次類推。

以起始偏移量命名并排序這些檔案,隻要根據offset **二分查找**檔案清單,就可以快速定位到具體檔案。當offset=368776時定位到00000000000000368769.index和對應log檔案。

第二步:通過segment file查找message

當offset=368776時,依次定位到00000000000000368769.index的中繼資料實體位置和00000000000000368769.log的實體偏移位址

然後再通過00000000000000368769.log順序查找直到offset=368776為止。

7.6、消費者如何标記消費狀态

通過偏移量來辨別。

擴充偏移量與偏移量送出:

      偏移量是一個自增長的ID,用來辨別目前分區的哪些消息被消費過了,這個ID會儲存在kafka的broker當中,而且消費者本地也會存儲一份。

       因為每次消費每一條消息都要更新一下偏移量的話,難免會影響整個broker的吞吐量,是以一般這個偏移量在每次發生改動時,先由消費者本地改動,預設情況下,消費者每5秒鐘會送出一次改動的偏移量,這樣做雖然說吞吐量上來了,但是可能會出現重複消費的問題: 

       因為可能在下一次送出偏移量之前,消費者本地消費了一些消息,然後發生了分區再均衡(分區再均衡在下面有講) 那麼就會出現一個問題。

       假設上次送出的偏移量是 2000 在下一次送出之前,其實消費者又消費了500條資料,也就是說目前的偏移量應該是2500 但是這個2500隻在消費者本地,也就是說,假設其他消費者去消費這個分區的時候拿到的偏移量是2000,那麼又會從2000開始消費消息,那麼2000到2500之間的消息又會被消費一遍,這就是重複消費的問題。

      kafka對于這種問題也提供了解決方案:手動送出偏移量

可以關閉預設的自動送出(enable.auto.commit= false) 然後使用kafka提供的API來進行偏移量送出,kafka提供了兩種方式送出偏移量 :同步和異步

//同步送出偏移量kafkaConsumer.commitSync();//異步送出偏移量kafkaConsumer.commitAsync();
           

       他們之間的差別在于,同步送出偏移量會等待伺服器應答,并且遇到錯誤會嘗試重試,但是會一定程度上影響性能不過能確定偏移量到底送出成功與否;而異步送出的對于性能肯定是有提示的,但是弊端也就像我們剛剛所提到遇到錯誤沒辦法重試,因為可能在收到你這個結果的時候又送出過偏移量了,如果這時候重試,又會導緻消息重複的問題了。

        其實,我們可以采用同步+異步的方式來保證送出的正确性以及伺服器的性能。因為異步送出的話,如果出現問題但不是緻命問題的話,可能下一次送出就不會出現這個問題了,是以有些異常是不需要解決的(可能單純的就是網絡抽風了呢? ) 是以,我們平時可以采用異步送出的方式,等到消費者中斷了(遇到了緻命問題,或是強制中斷消費者) 的時候再使用同步送出(因為這次如果失敗了就沒有下次了,是以要讓他重試) 。

具體代碼:

try {        while (true) {            ConsumerRecords<String, String> poll = kafkaConsumer.poll(500);            for (ConsumerRecord<String, String> context : poll) {                System.out.println("消息所在分區:" + context.partition() + "-消息的偏移量:" + context.offset() + "key:" + context.key() + "value:" + context.value());            }            //正常情況異步送出            kafkaConsumer.commitAsync();        }    } catch (Exception e) {        e.printStackTrace();    } finally {        try {            //當程式中斷時同步送出            kafkaConsumer.commitSync();        } catch (Exception e) {            e.printStackTrace();        } finally {            //關閉目前消費者  具體在下面有講            kafkaConsumer.close();        }    }
           
      值得一提的是,在手動送出時kafka提供了你可以傳入具體的偏移量來完成送出,也就是指定偏移量送出,但是非常不建議手動指定,因為如果指定的偏移量小于分區所存儲的偏移量大小的話,那麼會導緻消息重複消費,如果指定的偏移量大于分區所存儲的偏移量的話,那麼會導緻消息丢失

7.7.消費者的分區再均衡及負載均衡政策是什麼

分區再均衡也是kafka裡面非常重要的一個概念。

首先操作在以下情況下會觸發分區再均衡(Rebalance)操作:

a、組成員發生變更(新consumer加入組、已有consumer主動離開組或已有consumer崩潰了);

b、訂閱主題數發生變更,如果你使用了正規表達式的方式進行訂閱,那麼建立比對正規表達式的topic就會觸發rebalance;

c、訂閱主題的分區數發生變更。

當觸發Rebalance,kafka重新配置設定分區所有權

     何為分區所有權?我們之前有提到過,消費者有一個消費者組的概念, 而且一個消費者組在消費一個主題時有以下規則,一個消費者可以消費多個分區,但是一個分區隻能被一個消費者消費。如果我有分區 0、1、2 現在有消費者 A,B 那麼 kafka可能會讓消費者A 消費 0,1 這兩個分區,那麼 這時候我們就會說,消費者A 擁有分區 0、1的所有權。

      當觸發 Rebalance 的時候kafka會重新配置設定這個所有權,還是基于剛剛的比方,消費者A 擁有 0 和1 的所有權,消費者B 會有2的所有權。當消費者B離開kafka的時候 這時候 kafka會重新配置設定一下所有權,此時整個消費者組隻有一個A 那麼 0、1、2 三個分區的所有權都會屬于A ,同理如果這時候有消費者C進入這個消費者組,那麼這時候kafka會確定每一個消費者都能消費一個分區。

       當觸發Rebalance時,由于kafka正在配置設定所有權,會導緻消費者不能消費,而且還會引發一個重複消費的問題, 當消費者還沒來得及送出偏移量時,分區所有權遭到了重新配置設定,那麼這時候就會導緻一個消息被多個消費者重複消費。

       那麼解決方案就是在消費者訂閱時,添加一個再均衡監聽器,也就是當kafka在做Rebalance操作前後,均會調用再均衡監聽器,那麼這時候 我們可以在kafka Rebalance之前送出我們消費者最後處理的消息來解決這個問題。

拓展、Close():

      當我們不需要某個消費者繼續消費kafka當中的資料時,我們可以選擇調用Close方法來關閉它,在關閉之前 close方法會發送一個通知告訴kafka我這個消費者要退出了,那麼 kafka就會準備Rebalance 而且如果是采用的自動送出偏移量,消費者自身也會在關閉自己之前送出最後所消費的偏移量 。當然即使沒有調用close方法,而是直接強制中斷了消費者的程序 kafka也會根據我們後面會說到的系統參數捕捉到消費者退出了。

7.8.如何保證消費者消費的資料有序

      隻能保證同一個分區下的資料是有序的,可以讓同一類的資料進入到同一個分區裡。

      若想保證同一個主題的資料被消費時的順序和生産時的順序一緻,那麼隻能設定一個分區。