天天看點

使用說明-Kafka

  • Kafka名詞解釋
  1. Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
  2. Topic:一類消息,消息存放的目錄即主題,例如page view日志、click日志等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發。
  3. Partition:topic實體上的分組,一個topic可以分為多個partition,每個partition是一個有序的隊列
  4. Producer : 生産message發送到topic
  5. Consumer : 訂閱topic消費message, consumer作為一個線程來消費
  6. Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置檔案中配置好的。各個consumer(consumer 線程)可以組成一個組(Consumer group ),partition中的每個message隻能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,如果一個message可以被多個consumer(consumer 線程 ) 消費的話,那麼這些consumer必須在不同的組。
  • Kafka叢集概覽
使用說明-Kafka
  • 剖析一個Topic的生産與消費
使用說明-Kafka
  1. 生産者

叢集模式下,ProducerA和ProducerB都是“my topic”的生産者,即2個執行個體

  1. 消費者

在叢集模式下,ConsumerA、ConsumerB和ConsumerC都是“my topic”的消費者,即3個執行個體,它們都在一個ConsumerGroup中

  1. My Topic

一個主題,如:使用者點選事件

Partition1、Partition2、Partition3是對My Topic按照特定規則的分解,例如

P1存放: key mod 3 = 0

P2存放: key mod 3 = 1

P3存放: key mod 3 = 2

一個topic,在kafka叢集裡的實體存儲怎樣?-----Partition & Replication

  1. Partition在broker中的分布

舉例:在一個含有4個broker節點的叢集中,每個主題3個分區,每個分區3個備份

bin/kafka-topics.sh --create --zookeeper 192.168.102.197:2181 --replication-factor 3 --partitions 3 --topic topic1

使用說明-Kafka
  1. 建立過程

Step1:建立一個topic

Step2:将這個topic邏輯分3個區

Step3:将這3個區,分到每個broker上(分區數量可以大于broker數量嗎?)

Step4:為每個分區,建立對應數量的副本(副本數量可以大于broker數量嗎?)

  1. 消息配置設定規則:

P0存放: key mod 3 = 0(隻有3個分區,每個消息必須落在其中一個分區)

P1存放: key mod 3 = 1

P2存放: key mod 3 = 2

主題名稱 Key Payload負載 所在分區 目标Broker
MyTopic 1 Data1 Part1 Broker2
MyTopic 2 Data2 Part2 Broker4
MyTopic 3 Data3 Part0 Broker1
MyTopic 4 Data4 Part1 Broker2
MyTopic 5 Data5 Part2 Broker4
MyTopic 6 Data6 Part0 Broker1
MyTopic 7 Data7 Part1 Broker2
MyTopic 8 Data8 Part2 Broker4
  1. 結論:

同一個Partition可能會有多個Replica,需要保證同一個Partition的多個Replica之間的資料一緻性。而這時需要在這些Replication之間選出一個Leader(紅色背景),Producer和Consumer隻與這個Leader互動,其它Replica作為Follower從Leader中複制資料

  1. Producer發送消息到Kafka叢集的過程分析
使用說明-Kafka

選擇分區

我們從建立一個ProducerRecord 對象開始, Producer Record 對象需要包含目标主題和要發送的内容。我們還可以指定鍵或分區。在發送ProducerRecord 對象時,生産者要先把鍵和值對象序列化成位元組數組,這樣它們才能夠在網絡上傳輸。

接下來,資料被傳給分區器。如果之前在ProducerRecord 對象裡指定了分區,那麼分區器就不會再做任何事情,直接把指定的分區傳回。如果沒有指定分區,那麼分區器會根據ProducerRecord 對象的鍵來選擇一個分區。選好分區以後,生産者就知道該往哪個主題和分區發送這條記錄了。緊接着,這條記錄被添加到一個記錄批次裡,這個批次裡的所有消息會被發送到相同的主題和分區上。有一個獨立的線程負責把這些記錄批次發送到相應的broker 上。

接收寫入結果

伺服器在收到這些消息時會傳回一個響應。如果消息成功寫入Kafka ,就傳回一個RecordMetaData 對象,它包含了主題和分區資訊,以及記錄在分區裡的偏移量。如果寫入失敗, 則會傳回一個錯誤。生産者在收到錯誤之後會嘗試重新發送消息,幾次之後如果還

是失敗,就傳回錯誤資訊。

同步發送消息---通過調用Future.get()方法阻塞,等待結果傳回

異步發送消息---回調函數

生産者如何保證消息寫入的高可用?

acks 參數指定了必須要有多少個分區副本收到消息,生産者才會認為消息寫入是成功的。這個參數對消息丢失的可能性有重要影響。

如果acks=0 ,生産者在成功寫入消息之前不會等待任何來自伺服器的響應。

如果acks=1 ,隻要叢集的首領節點收到消息,生産者就會收到一個來自伺服器的成功

響應。

如果acks=all ,隻有當所有參與複制的節點全部收到消息時,生産者才會收到一個來自

伺服器的成功響應。

  1. Consumer從Kafka叢集消費消息的過程分析

主題分區和消費者群組的多種配置設定方式

使用說明-Kafka
使用說明-Kafka
使用說明-Kafka
使用說明-Kafka
使用說明-Kafka

消費者群組和分區再均衡

一個新的悄費者加入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩憤時,它就離開群組,原本由它讀取的分區将由群組裡的其他消費者來讀取。在主題發生變化時, 比如管理者添加了新的分區,會發生分區重配置設定。當消費者要加入群組時,它會向群組協調器發送一個Join Gro up 請求。第一個加入群組的消費者将成為“群主”。群主從協調器那裡獲得群組的成員清單(清單中包含了所有最近發送過心跳的消費者,它們被認為是活躍的),并負責給每一個悄費者配置設定分區。它使用一個實作了P a 「ti.ti.onAssi. gn o 「接口的類來決定哪些分區應該被配置設定給哪個消費者。Kafka 内置了兩種配置設定政策,在後面的配置參數小節我們将深入讨論。配置設定完畢之後,群主把配置設定情況清單發送給群組協調器,協調器再把這些資訊發送給所有消費者。每個消費者隻能看到自己的配置設定資訊,隻有群主知道群組裡所有消費者的配置設定資訊。這個過程會在每次再均衡時重複發生。

消費者讀取消息

使用說明-Kafka

poll () 方能傳回一個記錄清單。每條記錄都包含了記錄所屬主題的資訊、記錄所在分

區的資訊、記錄在分區裡的偏移量,以及記錄的鍵值對。我們一般會周遊這個清單,逐

條處理這些記錄。

消費者指定偏移量進行消費

我們知道了如何使用poll () 方告從各個分區的最新偏移量處開始處理消息。不過,有時候我們也需要從特定的偏移量處開始讀取悄息。如果你想從分區的起始位置開始讀取消息,或者直接跳到分區的末尾開始讀取消息, 可以使用seekToBeginning(Collection<TopicPartition> tp ) 和seekToEnd (Collection < TopicPartition >

tp ) 這兩個方法。

消費者送出分區消息偏移量

消費者往一個叫作_consumer_offset 的特殊主題發送消息,消息裡包含每個分區的偏移量。如果消費者一直處于運作狀态,那麼偏移量就沒有什麼用處。不過,如果悄費者發生崩憤或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之後,每個消費者可能配置設定到新的分區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分區最後一次送出的偏移量,然後從偏移量指定的地方繼續處理。

送出方式:

自動送出consumerProps.put("enable.auto.commit", "true")

手動送出consumer.commitSync();或consumer.commitAsync();

送出特定的偏移量:邊處理邊送出

  • 安裝kafka
  • Kafka常用指令

繼續閱讀