- Kafka名詞解釋
- Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
- Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發。
- Partition:topic實體上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
- Producer : 生産message發送到topic
- Consumer : 訂閱topic消費message, consumer作為一個線程來消費
- Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置檔案中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message隻能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那麼這些consumer必須在不同的組。
- Kafka叢集概覽
- 剖析一個Topic的生産與消費
- 生産者
叢集模式下,ProducerA和ProducerB都是“my topic”的生産者,即2個執行個體
- 消費者
在叢集模式下,ConsumerA、ConsumerB和ConsumerC都是“my topic”的消費者,即3個執行個體,它們都在一個ConsumerGroup中
- My Topic
一個主題,如:使用者點選事件
Partition1、Partition2、Partition3是對My Topic按照特定規則的分解,例如
P1存放: key mod 3 = 0
P2存放: key mod 3 = 1
P3存放: key mod 3 = 2
一個topic,在kafka叢集裡的實體存儲怎樣?-----Partition & Replication
- Partition在broker中的分布
舉例:在一個含有4個broker節點的叢集中,每個主題3個分區,每個分區3個備份
bin/kafka-topics.sh --create --zookeeper 192.168.102.197:2181 --replication-factor 3 --partitions 3 --topic topic1
- 建立過程
Step1:建立一個topic
Step2:将這個topic邏輯分3個區
Step3:将這3個區,分到每個broker上(分區數量可以大于broker數量嗎?)
Step4:為每個分區,建立對應數量的副本(副本數量可以大于broker數量嗎?)
- 消息配置設定規則:
P0存放: key mod 3 = 0(隻有3個分區,每個消息必須落在其中一個分區)
P1存放: key mod 3 = 1
P2存放: key mod 3 = 2
主題名稱 | Key | Payload負載 | 所在分區 | 目标Broker |
MyTopic | 1 | Data1 | Part1 | Broker2 |
MyTopic | 2 | Data2 | Part2 | Broker4 |
MyTopic | 3 | Data3 | Part0 | Broker1 |
MyTopic | 4 | Data4 | Part1 | Broker2 |
MyTopic | 5 | Data5 | Part2 | Broker4 |
MyTopic | 6 | Data6 | Part0 | Broker1 |
MyTopic | 7 | Data7 | Part1 | Broker2 |
MyTopic | 8 | Data8 | Part2 | Broker4 |
- 結論:
同一個Partition可能會有多個Replica,需要保證同一個Partition的多個Replica之間的資料一緻性。而這時需要在這些Replication之間選出一個Leader(紅色背景),Producer和Consumer隻與這個Leader互動,其它Replica作為Follower從Leader中複制資料
- Producer發送消息到Kafka叢集的過程分析
選擇分區
我們從建立一個ProducerRecord 對象開始, Producer Record 對象需要包含目标主題和要發送的内容。我們還可以指定鍵或分區。在發送ProducerRecord 對象時,生産者要先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網絡上傳輸。
接下來,資料被傳給分區器。如果之前在ProducerRecord 對象裡指定了分區,那麼分區器就不會再做任何事情,直接把指定的分區傳回。如果沒有指定分區,那麼分區器會根據ProducerRecord 對象的鍵來選擇一個分區。選好分區以後,生産者就知道該往哪個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裡,這個批次裡的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的broker 上。
接收寫入結果
伺服器在收到這些消息時會傳回一個響應。如果消息成功寫入Kafka ,就傳回一個RecordMetaData 對象,它包含了主題和分區資訊,以及記錄在分區裡的偏移量。如果寫入失敗, 則會傳回一個錯誤。生産者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還
是失敗,就傳回錯誤資訊。
同步發送消息---通過調用Future.get()方法阻塞,等待結果傳回
異步發送消息---回調函數
生産者如何保證消息寫入的高可用?
acks 參數指定了必須要有多少個分區副本收到消息,生産者才會認為消息寫入是成功的。這個參數對消息丢失的可能性有重要影響。
如果acks=0 ,生産者在成功寫入消息之前不會等待任何來自伺服器的響應。
如果acks=1 ,隻要叢集的首領節點收到消息,生産者就會收到一個來自伺服器的成功
響應。
如果acks=all ,隻有當所有參與複制的節點全部收到消息時,生産者才會收到一個來自
伺服器的成功響應。
- Consumer從Kafka叢集消費消息的過程分析
主題分區和消費者群組的多種配置設定方式
消費者群組和分區再均衡
一個新的悄費者加入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩憤時,它就離開群組,原本由它讀取的分區将由群組裡的其他消費者來讀取。在主題發生變化時, 比如管理者添加了新的分區,會發生分區重配置設定。當消費者要加入群組時,它會向群組協調器發送一個Join Gro up 請求。第一個加入群組的消費者将成為“群主”。群主從協調器那裡獲得群組的成員清單(清單中包含了所有最近發送過心跳的消費者,它們被認為是活躍的),并負責給每一個悄費者配置設定分區。它使用一個實作了P a 「ti.ti.onAssi. gn o 「接口的類來決定哪些分區應該被配置設定給哪個消費者。Kafka 内置了兩種配置設定政策,在後面的配置參數小節我們将深入讨論。配置設定完畢之後,群主把配置設定情況清單發送給群組協調器,協調器再把這些資訊發送給所有消費者。每個消費者隻能看到自己的配置設定資訊,隻有群主知道群組裡所有消費者的配置設定資訊。這個過程會在每次再均衡時重複發生。
消費者讀取消息
poll () 方能傳回一個記錄清單。每條記錄都包含了記錄所屬主題的資訊、記錄所在分
區的資訊、記錄在分區裡的偏移量,以及記錄的鍵值對。我們一般會周遊這個清單,逐
條處理這些記錄。
消費者指定偏移量進行消費
我們知道了如何使用poll () 方告從各個分區的最新偏移量處開始處理消息。不過,有時候我們也需要從特定的偏移量處開始讀取悄息。如果你想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息, 可以使用seekToBeginning(Collection<TopicPartition> tp ) 和seekToEnd (Collection < TopicPartition >
tp ) 這兩個方法。
消費者送出分區消息偏移量
消費者往一個叫作_consumer_offset 的特殊主題發送消息,消息裡包含每個分區的偏移量。如果消費者一直處于運作狀态,那麼偏移量就沒有什麼用處。不過,如果悄費者發生崩憤或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之後,每個消費者可能配置設定到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最後一次送出的偏移量,然後從偏移量指定的地方繼續處理。
送出方式:
自動送出consumerProps.put("enable.auto.commit", "true")
手動送出consumer.commitSync();或consumer.commitAsync();
送出特定的偏移量:邊處理邊送出
- 安裝kafka
- Kafka常用指令