天天看點

kafka 權威指南--讀書筆記-(4)從kafka讀取資料

應用程式使用 KafkaConsumer向 Kafka 訂閱主題,并從訂閱的主題上接收消息 。 從 Kafka 讀取資料不同于從其他悄息系統讀取資料,它涉及一些獨特的概念和想法。如果不先了解 這些概念,就難以了解如何使用消費者 API。是以我們接下來先解釋這些重要的概念,然 後再舉幾個例子,橫示如何使用消費者 API 實作不同的應用程式。

消費者和消費者群組

假設我們有一個應用程式需要從-個 Kafka主題讀取消息井驗證這些消息,然後再把它們 儲存起來。應用程式需要建立一個消費者對象,訂閱主題并開始接收消息,然後驗證消息 井儲存結果。過了 一陣子,生産者往主題寫入消息的速度超過了應用程式驗證資料的速 度,這個時候該怎麼辦?如果隻使用單個消費者處理消息,應用程式會遠跟不上消息生成 的速度。顯然,此時很有必要對消費者進行橫向伸縮。就像多個生産者可以向相同的 主題 寫入消息一樣,我們也可以使用多個消費者從同一個主題讀取消息,對消息進行分流。

Kafka 消費者從屬于消費者群組。一個群組裡的消費者訂閱的是同一個主題,每個消費者 接收主題一部分分區的消息。

假設主題 T1 有 4 個分區,我們建立了消費者 C1 ,它是群組 G1 裡唯 一 的消費者,我們用 它訂閱主題 T1。消費者 Cl1将收到主題 T1全部 4個分區的消息,如圖 4-1 所示。

kafka 權威指南--讀書筆記-(4)從kafka讀取資料

如果在群組 G1 裡新增一個消費者 C2,那麼每個消費者将分别從兩個分區接收消息。我 假設消費者 C1接收分區 0 和分區 2 的消息,消費者 C2 接收分區 1 和分區 3 的消息,如圖 4-2 所示。

kafka 權威指南--讀書筆記-(4)從kafka讀取資料

如果群組 G1 有 4 個消費者,那麼每個消費者可以配置設定到 一個分區,如圖 4-3 所示。

kafka 權威指南--讀書筆記-(4)從kafka讀取資料

如果我們往群組裡添加更多的消費者,超過主題的分區數量,那麼多出的消費者就會被閑置,不會接收到任何消息。

往群組裡增加消費者是橫向伸縮消費能力的主要方式。 Kafka 消費者經常會做一些高延遲的操作,比如把資料寫到資料庫或 HDFS,或者使用資料進行比較耗時的計算。在這些情況下,單個消費者無法跟上資料生成的速度,是以可以增加更多的消費者,讓它們分擔負載,每個消費者隻處理部分分區的消息,這就是橫向伸縮的主要手段。我們有必要為主題建立大量的分區,在負載增長時可以加入更多的消費者。不過要性意,不要讓消費者的數量超過主題分區的數量,多餘的消費者隻會被閑置。

除了通過增加消費者來橫向伸縮單個應用程式外,還經常出現多個應用程式從同一個主題讀取資料的情況。實際上, Kafka 設計的主要目标之一 ,就是要讓 Kafka 主題裡的資料能夠滿足企業各種應用場景的需求。在這些場景裡,每個應用程式可以擷取到所有的消息, 而不隻是其中的 一部分。隻要保證每個應用程式有自己的消費者群組,就可以讓它們擷取到主題所有的消息。不同于傳統的消息系統,橫向伸縮 Kafka消費者和消費者群組并不會對性能造成負面影響。

在上面的例子裡,如果新增一個隻包含一個消費者的群組 G2,那麼這個消費者将從主題 T1 上接收所有的消息,與群組 G1 之間互不影響。群組 G2 可以增加更多的消費者,每個消費者可以消費若幹個分區,就像群組 G1 那樣,如圖 4-5 所示。總的來說,群組 G2 還是會接收到所有消息,不管有沒有其他群組存在。

簡而言之,為每一個需要擷取一個或多個主題全部消息的應用程式建立一個消費者群組, 然後往群組裡添加消費者來伸縮讀取能力和處理能力,群組裡的每個消費者隻處理一部分消息。

kafka 權威指南--讀書筆記-(4)從kafka讀取資料

消費者群組和分區再均衡

我們已經從上一個小節了解到,群組裡的消費者共同讀取主題的分區。一個新的消費者加 入群組時,它讀取的是原本由其他消費者讀取的消息。當一個消費者被關閉或發生崩潰時,它就離開群組,原本由它讀取的分區将由群組裡的其他消費者來讀取。在主題發生變化時 , 比如管理者添加了新的分區,會發生分區重配置設定。

分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費者群組帶來了高可用性和伸縮性(我們可以放心地添加或移除消費者), 不過在正常情況下,我們并不希望發生這樣的行為。在再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用。另外,當分區被重新配置設定給另 一個消費者時,消費者目前的讀取狀态會丢失,它有可能還需要去重新整理緩存 ,在它重新恢複狀态之前會拖慢應用程式。我們将在本章讨論如何進行安全的再均衡,以及如何避免不必要的再均衡。

消費者通過向被指派為 群組協調器的 broker (不同的群組可以有不同的協調器)發送 心跳 來維持它們和群組的從屬關系以及它們對分區的所有權關系。隻要消費者以正常的時間間隔發送心跳,就被認為是活躍的,說明它還在讀取分區裡的消息。消費者會在輪詢消息 (為了擷取消息)或送出偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。

如果一個消費者發生崩潰,井停止讀取消息,群組協調器(broker)會等待幾秒鐘,确認它死亡了才會觸發再均衡。在這幾秒鐘時間裡,死掉的消費者不會讀取分區裡的消息。在清理消費者時,消費者會通知協調器它将要離開群組,協調器會立即觸發一次再均衡,盡量降低處理停頓。在本章的後續部分,我們将讨論一些用于控制發送心跳頻率和會話過期時間的配置參數,以及如何根據實際需要來配置這些參數 。

配置設定分區是怎樣的一個過程

當消費者要加入群組時,它會向群組協調器發送 一 個 JoinGroup 請求。第 一 個加入群組的消費者将成為“群主”。群主從協調器那裡獲得群組的成員列 表(清單中包含了所有最近發送過心跳的消費者,它們被認為是活躍的), 并負責給每一個消費者配置設定分區。它使用 一個實作了 PartitionAssignor接口的類來決定哪些分 區應該被配置設定給哪個消費者 。

Kafka 内置了兩種配置設定政策,在後面的配置參數小節我們将深入讨論。配置設定完畢之後,群主把配置設定情況清單發送給群組協調器,協調器再把這些資訊發送給所有消費者。每個消費者隻能看到自己的配置設定資訊,隻有群 主知道群組 裡所有消費者的配置設定資訊。這個過程會在每次再均衡時重複發生。

建立 Kafka消費者

在讀取消息之前,需要先建立 一個 KafkaConsumer對象 。 建立 KafkaConsumer 對象與建立 KafkaProducer對象非常相似——把想要傳給消費者的屬性放在 Properties 對象裡。本章 後續部分會深入讨論所有的屬性。在這裡,我們隻需要使用 3個必要的屬性: bootstrap.servers、 key.deserializer、 value.deserializer。

下面代碼示範了如何建立一個KafkaConsumer對象:

  1. Properties props = new Properties();

  2. props.put("bootstrap.servers", "broker1:9092, broker2:9092");

  3. props.put("group.id", "CountryCounter");

  4. props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");

  5. props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");

  6. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

deserializer使用指定的類(反序列化器)把位元組數組轉成 Java對象。

group.id指定了KafkaConsumer 屬于哪一個消費者群組。

group.id不是必需的,不過我們現在姑且認為它是必需的。它指定了 KafkaConsumer 屬于哪一個消費者群組。建立不屬于任何一個群組的消費者也是可以的,隻是這樣做不太常見。

訂閱主題

建立好消費者之後,下一步可以開始訂閱主題了。subscribe()方法接受一個主題清單作為參數

consumer.subscribe(Collections.singletonList("customerCountries"));
           

在這裡我們建立了一個包含單個元素的清單,主題的名字叫作“customerCountries”,我們也可以在調用subscribe()方法時傳入一個正規表達式,正規表達式可以比對多個主題如果有人建立了新的主題,并且主題名與正規表達式比對,那麼會立即觸發一次再均衡,消費者就可以讀取新添加的主題。如果應用程式需要讀取多個主題,并且可以處理不同類型的資料,那麼這種訂閱方式就很管用。在Kafka和其他系統之間複制資料時,使用正規表達式的方式訂閱多個主題時很常見的做法。

要訂閱所有test相關的主題,可以這樣做:

consumer.subscribe("test.*");

輪詢

消息輪詢是消費者 API 的核心,通過一個簡單的輪詢向伺服器請求資料。一旦消費者訂閱了主題 ,輪詢就會處理所有的細節,包括群組協調、分區再均衡、發送心跳和擷取資料, 開發者隻需要使用一組簡單的 API 來處理從分區傳回的資料。消費者代碼的主要部分如下所示 :

kafka 權威指南--讀書筆記-(4)從kafka讀取資料

輪詢不隻是擷取資料那麼簡單。在第一次調用新消費者的 poll() 方法時,它會負責查找 GroupCoordinator, 然後加入群組,接受配置設定的分區。 如果發生了再均衡,整個過程也是在輪詢期間進行的。當然 ,心跳也是從輪詢裡發疊出去的。是以,我們要確定在輪詢期間所做的任何處理工作都應該盡快完成。

線程安全

在同一個群組中,我們無法讓一個線程運作多個消費者,也無法讓多個線程安全地共享一個消費者。按照規則,一個消費者使用一個線程。如果要在同一個消費者群組裡運作多個消費者,需要讓每個消費者運作在自己的線程裡。最好是把消費者的邏輯封裝在自己的對象裡,然後使用Java的ExecutorService啟動多個線程,使每個消費者運作在自己的線程上。Confluent的部落格(https://www.confluent.io/blog/)上有一個教程介紹如何處理這種情況。

消費者的配置

到目前為止,我們學習了如何使用消費者 API,不過隻介紹了幾個配置屬’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文檔列出了所有與消費者相關的配置說明。大部分參數都有合理的預設值,一般不需要修改它們,不過有一些參數與消費 者的性能和可用性有很大關系。接下來介紹這些重要的屬性。

1. fetch.min.bytes

該屬性指定了消費者從伺服器擷取記錄的最小位元組數。 broker 在收到消費者的資料請求時, 如果可用的資料量小于 fetch.min.bytes指定的大小,那麼它會等到有足夠的可用資料時才把它傳回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題不是很活躍的時候(或者一天裡的低谷時段)就不需要來來回回地處理消息。如果沒有很多可用資料,但消費者的 CPU 使用率卻很高,那麼就需要把該屬性的值設得比預設值大。如果消費者的數量比較多,把該屬性的值設定得大一點可以降低 broker 的工作負載。

2. fetch.max.wait.ms

我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的資料時才把它傳回給消費者。而 fetch.max.wait.ms則用于指定 broker的等待時間,預設是 500ms。如果沒有足夠的資料流入 Kafka,消費者擷取最小資料量的要求就得不到滿足,最終導緻500ms的延遲。 如果要降低潛在的延遲(為了滿足 SLA),可以把該參數值設定得小一些。如果 fetch.max.wait.ms被設 為 100ms,并且 fetch.min.bytes 被設為 1MB,那麼 Kafka在收到消費者的請求後,要麼返 回 1MB 資料,要麼在 100ms 後傳回所有可用的資料 , 就看哪個條件先得到滿足。

3. max.parition.fetch.bytes

該屬性指定了伺服器從每個分區裡傳回給消費者的最大位元組數。它的預設值是 1MB,也 就是說, KafkaConsumer.poll() 方法從每個分區裡傳回的記錄最多不超過 max.parition.fetch.bytes 指定的位元組。如果一個主題有 20個分區和 5 個消費者,那麼每個消費者需要至少 4MB 的可用記憶體來接收記錄。在為消費者配置設定記憶體時,可以給它們多配置設定一些,因 為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分區。 max.parition.fetch.bytes 的值必須比 broker能夠接收的最大消息的位元組數(通過 max.message.size屬 性配置 )大, 否則消費者可能無法讀取這些消息,導緻消費者一直挂起重試。在設定該屬性時,另一個需要考慮的因素是消費者處理資料的時間。 消費者需要頻繁調用 poll() 方法來避免會話過期和發生分區再均衡,如果單次調用 poll() 傳回的資料太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況, 可以把 max.parition.fetch.bytes 值改小 ,或者延長會話過期時間。

4. session.timeout.ms

該屬性指定了消費者在被認為死亡之前可以與伺服器斷開連接配接的時間,預設是 3s。如果消費者沒有在 session.timeout.ms 指定的時間内發送心跳給群組協調器,就被認為已經死亡,協調器就會觸發再均衡,把它的分區配置設定給群組裡的其他消費者 。該屬性與 heartbeat.interval.ms緊密相關。heartbeat.interval.ms 指定了poll()方法向協調器 發送心跳的頻 率, session.timeout.ms 則指定了消費者可以多久不發送心跳。是以, 一般需要同時修改這兩個屬性, heartbeat.interval.ms 必須比 session.timeout.ms 小, 一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 ls。 把 session.timeout.ms 值設 得比預設值小,可以更快地檢測和恢 複崩潰的節點,不過長時間的輪詢或垃圾收集可能導緻非預期的再均衡。把該屬性的值設定得大一些,可以減少意外的再均衡 ,不過檢測節點崩潰需要更長的時間。

5. auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時井被删除)該作何處理。它的預設值是latest, 意 思是說,在偏移量無效的情況下,消費者将從最新的記錄開始讀取資料(在消費者 啟動之 後生成的記錄)。另一個值是 earliest,意思是說,在偏移量無效的情況下,消費者将從 起始位置讀取分區的記錄。

6. enable.auto.commit

我們稍後将介紹 幾種 不同的送出偏移量的方式。該屬性指定了消費者是否自動送出偏移量,預設值是 true。為了盡量避免出現重複資料和資料丢失,可以把它設為 false,由自己控制何時送出偏移量。如果把它設為 true,還可以通過配置 auto.commit.interval.mls 屬性來控制送出的頻率。

7. partition.assignment.strategy

我們知道,分區會被配置設定給群組裡的消費者。 PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被配置設定給哪個消費者。 Kafka 有兩個預設的配置設定政策 。

  • Range

該政策會把主題的若幹個連續的分區配置設定給消費者。假設悄費者 C1 和消費者 C2 同時 訂閱了主題 T1 和主題 T2,井且每個主題有 3 個分區。那麼消費者 C1 有可能配置設定到這 兩個主題的分區 0 和 分區 1,而消費者 C2 配置設定到這兩個主題 的分區 2。因為每個主題 擁有奇數個分區,而配置設定是在主題内獨立完成的,第一個消費者最後配置設定到比第二個消費者更多的分區。隻要使用了 Range政策,而且分區數量無法被消費者數量整除,就會出現這種情況。

  • RoundRobin

該政策把主題的所有分區逐個配置設定給消費者。如果使用 RoundRobin 政策來給消費者 C1 和消費者 C2配置設定分區,那麼消費者 C1 将分到主題 T1 的分區 0和分區 2以及主題 T2 的分區 1,消費者 C2 将配置設定到主題 T1 的分區 l 以及主題T2 的分區 0和分區 2。一般 來說,如果所有消費者都訂閱相同的主題(這種情況很常見), RoundRobin政策會給所 有消費者配置設定相同數量 的分區(或最多就差一個分區)。

可以通過設定 partition.assignment.strategy 來選擇分區政策。預設使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 這個類實作了 Range政策,不過也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我們還可以使用自定 義政策,在這種情況下 , partition.assignment.strategy 屬性的值就是自定義類的名字。

8. client.id

該屬性可以是任意字元串 , broker用它來辨別從用戶端發送過來的消息,通常被用在日志、度量名額和配額裡。

9. max.poll.records

該屬性用于控制單次調用 call() 方法能夠傳回的記錄數量,可以幫你控制在輪詢裡需要處理的資料量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫資料時用到的 TCP 緩沖區也可以設定大小。如果它們被設為-1,就使用作業系統的預設值。如果生産者或消費者與 broker處于不同的資料中心内,可以适當增大這些值,因為跨資料中心的網絡一般都有 比較高的延遲和比較低的帶寬 。

送出和偏移量

 每次調用poll方法,它總是傳回由生産者寫入kafka但還沒有被消費者讀取過的記錄,我們是以可以追蹤到哪些記錄是被群組裡的那個消費者讀取的, 

我們把更新分區目前位置的操作叫做送出。

消費者往一個叫做_consumer_offset的特殊主題發送消息,消息包含每個分區的偏移量。隻有新加入消費者 觸發在均衡,消費者需要讀取新的偏移量。

如果送出的偏移量大于用戶端處理的最後一個消息偏移量,那麼處于兩個偏移量之間的消息會丢失。反之則會消息重複

自動送出

這種方式讓消費者來管理位移,應用本身不需要顯式操作。當我們将enable.auto.commit設定為true,那麼消費者會在poll方法調用後每隔5秒(由auto.commit.interval.ms指定)送出一次位移。和很多其他操作一樣,自動送出也是由poll()方法來驅動的;在調用poll()時,消費者判斷是否到達送出時間,如果是則送出上一次poll傳回的最大位移。

需要注意到,這種方式可能會導緻消息重複消費。假如,某個消費者poll消息後,應用正在處理消息,在3秒後Kafka進行了重平衡,那麼由于沒有更新位移導緻重平衡後這部分消息重複消費。

送出目前位移

為了減少消息重複消費或者避免消息丢失,很多應用選擇自己主動送出位移。設定auto.commit.offset為false,那麼應用需要自己通過調用commitSync()來主動送出位移,該方法會送出poll傳回的最後位移。

為了避免消息丢失,我們應當在完成業務邏輯後才送出位移。而如果在處理消息時發生了重平衡,那麼隻有目前poll的消息會重複消費。下面是一個自動送出的代碼樣例:

while (true) {

    ConsumerRecords<String, String> records = consumer.poll(100);

    for (ConsumerRecord<String, String> record : records)

    {

        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

    }

    try {

        consumer.commitSync();

    } catch (CommitFailedException e) {

        log.error("commit failed", e)

    }

}

上面代碼poll消息,并進行簡單的列印(在實際中有更多的處理),最後完成處理後進行了位移送出。

異步送出

手動送出有一個缺點,那就是當發起送出調用時應用會阻塞。當然我們可以減少手動送出的頻率,但這個會增加消息重複的機率(和自動送出一樣)。另外一個解決辦法是,使用異步送出的API。以下為使用異步送出的方式,應用發了一個送出請求然後立即傳回:

while (true) {

    ConsumerRecords<String, String> records = consumer.poll(100);

    for (ConsumerRecord<String, String> record : records)

    {

        System.out.printf("topic = %s, partition = %s,

        offset = %d, customer = %s, country = %s\n",

        record.topic(), record.partition(), record.offset(),

        record.key(), record.value());

    }

    consumer.commitAsync();

}

但是異步送出也有個缺點,那就是如果伺服器傳回送出失敗,異步送出不會進行重試。相比較起來,同步送出會進行重試直到成功或者最後抛出異常給應用。異步送出沒有實作重試是因為,如果同時存在多個異步送出,進行重試可能會導緻位移覆寫。舉個例子,假如我們發起了一個異步送出commitA,此時的送出位移為2000,随後又發起了一個異步送出commitB且位移為3000;commitA送出失敗但commitB送出成功,此時commitA進行重試并成功的話,會将實際上将已經送出的位移從3000復原到2000,導緻消息重複消費。

是以,基于這種性質,一般情況下對于異步送出,我們可能會通過回調的方式記錄送出結果:

while (true) {

    ConsumerRecords<String, String> records = consumer.poll(100);

    for (ConsumerRecord<String, String> record : records) {

        System.out.printf("topic = %s, partition = %s,

        offset = %d, customer = %s, country = %s\n",

        record.topic(), record.partition(), record.offset(),

        record.key(), record.value());

    }

    consumer.commitAsync(new OffsetCommitCallback() {

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {

            if (e != null)

                log.error("Commit failed for offsets {}", offsets, e);

        } 

    });

}

而如果想進行重試同時又保證送出順序的話,一種簡單的辦法是使用單調遞增的序号。每次發起異步送出時增加此序号,并且将此時的序号作為參數傳給回調方法;當消息送出失敗回調時,檢查參數中的序号值與全局的序号值,如果相等那麼可以進行重試送出,否則放棄(因為已經有更新的位移送出了)。

混合同步送出與異步送出

正常情況下,偶然的送出失敗并不是什麼大問題,因為後續的送出成功就可以了。但是在某些情況下(例如程式退出、重平衡),我們希望最後的送出成功,是以一種非常普遍的方式是混合異步送出和同步送出,如下所示:

try {

    while (true) {

       ConsumerRecords<String, String> records = consumer.poll(100);

       for (ConsumerRecord<String, String> record : records) {

           System.out.printf("topic = %s, partition = %s, offset = %d,

           customer = %s, country = %s\n",

           record.topic(), record.partition(),

           record.offset(), record.key(), record.value());

       }

       consumer.commitAsync();

    }

} catch (Exception e) {

    log.error("Unexpected error", e);

} finally {

    try {

        consumer.commitSync();

    } finally {

        consumer.close();

    }

}

在正常處理流程中,我們使用異步送出來提高性能,但最後使用同步送出來保證位移送出成功。

送出特定位移

commitSync()和commitAsync()會送出上一次poll()的最大位移,但如果poll()傳回了批量消息,而且消息數量非常多,我們可能會希望在處理這些批量消息過程中送出位移,以免重平衡導緻從頭開始消費和處理。幸運的是,commitSync()和commitAsync()允許我們指定特定的位移參數,參數為一個分區與位移的map。由于一個消費者可能會消費多個分區,是以這種方式會增加一定的代碼複雜度,如下所示:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

int count = 0;

....

while (true) {

    ConsumerRecords<String, String> records = consumer.poll(100);

    for (ConsumerRecord<String, String> record : records)

    {

        System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));

        if (count % 1000 == 0)

            consumer.commitAsync(currentOffsets, null);

        count++;

} }

代碼中在處理poll()消息的過程中,不斷儲存分區與位移的關系,每處理1000條消息就會異步送出(也可以使用同步送出)。

重平衡監聽器(Rebalance Listener)

在分區重平衡前,如果消費者知道它即将不再負責某個分區,那麼它可能需要将已經處理過的消息位移進行送出。Kafka的API允許我們在消費者新增分區或者失去分區時進行處理,我們隻需要在調用subscribe()方法時傳入ConsumerRebalanceListener對象,該對象有兩個方法:

public void onPartitionRevoked(Collection partitions):此方法會在消費者停止消費消費後,在重平衡開始前調用。

public void onPartitionAssigned(Collection partitions):此方法在分區配置設定給消費者後,在消費者開始讀取消息前調用。

下面來看一個onPartitionRevoked9)的例子,該例子在消費者失去某個分區時送出位移(以便其他消費者可以接着消費消息并處理):

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();

private class HandleRebalance implements ConsumerRebalanceListener {

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

        System.out.println("Lost partitions in rebalance.

          Committing current

        offsets:" + currentOffsets);

        consumer.commitSync(currentOffsets);

    }

}

try {

    consumer.subscribe(topics, new HandleRebalance());

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records)

        {

             System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

             currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));

        }

        consumer.commitAsync(currentOffsets, null);

    }

} catch (WakeupException e) {

    // ignore, we're closing

} catch (Exception e) {

   log.error("Unexpected error", e);

} finally {

   try {

       consumer.commitSync(currentOffsets);

   } finally {

       consumer.close();

       System.out.println("Closed consumer and we are done");

   }

}

代碼中實作了onPartitionsRevoked()方法,當消費者失去某個分區時,會送出已經處理的消息位移(而不是poll()的最大位移)。上面代碼會送出所有的分區位移,而不僅僅是失去分區的位移,但這種做法沒什麼壞處。

從指定位移開始消費

在此之前,我們使用poll()來從最後的送出位移開始消費,但我們也可以從一個指定的位移開始消費。

如果想從分區開始端重新開始消費,那麼可以使用seekToBeginning(TopicPartition tp);如果想從分區的最末端消費最新的消息,那麼可以使用seekToEnd(TopicPartition tp)。而且,Kafka還支援我們從指定位移開始消費。從指定位移開始消費的應用場景有很多,其中最典型的一個是:位移存在其他系統(例如資料庫)中,并且以其他系統的位移為準。

考慮這麼個場景:我們從Kafka中讀取消費,然後進行處理,最後把結果寫入資料庫;我們既不想丢失消息,也不想資料庫中存在重複的消息資料。對于這樣的場景,我們可能會按如下邏輯處理:

while (true) {

    ConsumerRecords<String, String> records = consumer.poll(100);

    for (ConsumerRecord<String, String> record : records)

    {

        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset());

        processRecord(record);

        storeRecordInDB(record);

        consumer.commitAsync(currentOffsets);

    }

}

這個邏輯似乎沒什麼問題,但是要注意到這麼個事實,在持久化到資料庫成功後,送出位移到Kafka可能會失敗,那麼這可能會導緻消息會重複處理。對于這種情況,我們可以優化方案,将持久化到資料庫與送出位移實作為原子性操作,也就是要麼同時成功,要麼同時失敗。但這個是不可能的,是以我們可以在儲存記錄到資料庫的同時,也儲存位移,然後在消費者開始消費時使用資料庫的位移開始消費。這個方案是可行的,我們隻需要通過seek()來指定分區位移開始消費即可。下面是一個改進的樣例代碼:

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

        //在消費者負責的分區被回收前送出資料庫事務,儲存消費的記錄和位移

        commitDBTransaction();

    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

        //在開始消費前,從資料庫中擷取分區的位移,并使用seek()來指定開始消費的位移

        for(TopicPartition partition: partitions)

            consumer.seek(partition, getOffsetFromDB(partition));

    } 

}

    consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));

    //在subscribe()之後poll一次,并從資料庫中擷取分區的位移,使用seek()來指定開始消費的位移

    consumer.poll(0);

    for (TopicPartition partition: consumer.assignment())

        consumer.seek(partition, getOffsetFromDB(partition));

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records)

        {

            processRecord(record);

            //儲存記錄結果

            storeRecordInDB(record);

            //儲存位移

            storeOffsetInDB(record.topic(), record.partition(), record.offset());

        }

        //送出資料庫事務,儲存消費的記錄以及位移

        commitDBTransaction();

    }

具體邏輯見代碼注釋,此處不再贅述。另外注意的是,seek()隻是指定了poll()拉取的開始位移,這并不影響在Kafka中儲存的送出位移(當然我們可以在seek和poll之後送出位移覆寫)。

優雅退出

下面我們來讨論下消費者如何優雅退出。

在一般情況下,我們會在一個主線程中循環poll消息并進行處理。當需要退出poll循環時,我們可以使用另一個線程調用consumer.wakeup(),調用此方法會使得poll()抛出WakeupException。如果調用wakup時,主線程正在處理消息,那麼在下一次主線程調用poll時會抛出異常。主線程在抛出WakeUpException後,需要調用consumer.close(),此方法會送出位移,同時發送一個退出消費組的消息到Kafka的組協調者。組協調者收到消息後會立即進行重平衡(而無需等待此消費者會話過期)。

下面是一個優雅退出的樣例代碼:

//注冊JVM關閉時的回調鈎子,當JVM關閉時調用此鈎子。

Runtime.getRuntime().addShutdownHook(new Thread() {

          public void run() {

              System.out.println("Starting exit...");

              //調用消費者的wakeup方法通知主線程退出

              consumer.wakeup();

              try {

                  //等待主線程退出

                  mainThread.join();

              } catch (InterruptedException e) {

                  e.printStackTrace();

              }

          } 

});

...

try {

    // looping until ctrl-c, the shutdown hook will cleanup on exit

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(1000);

        System.out.println(System.currentTimeMillis() + "--  waiting for data...");

        for (ConsumerRecord<String, String> record : records) {

            System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());

        }

        for (TopicPartition tp: consumer.assignment())

            System.out.println("Committing offset at position:" + consumer.position(tp));

        consumer.commitSync();

    }

} catch (WakeupException e) {

    // ignore for shutdown

} finally {

    consumer.close();

    System.out.println("Closed consumer and we are done");

}

反序列化

如前所述,Kafka生産者負責将對象序列化成位元組數組并發送到Kafka。消費者則需要将位元組數組轉換成對象,這就是反序列化做的事情。序列化與反序列化需要比對,如果序列化使用IntegerSerializer,但使用StringDeserializer來反序列化,那麼會反序列化失敗。是以作為開發者,我們需要關注寫入到主題使用的是什麼序列化格式,并且保證寫入的資料能夠被消費者反序列化成功。如果使用Avro與模式注冊中心(Schema Registry)來序列化與反序列化,那麼事情會輕松許多,因為AvroSerializer會保證所有寫入的資料都是結構相容的,并且能夠被反序列化出來。

下面先來看下如何自定義反序列化,後面會進一步讨論如何使用Avro。

自定義反序列化

首先,假設序列化的對象為Customer:

public class Customer {

     private int customerID;

     private String customerName;

     public Customer(int ID, String name) {

         this.customerID = ID;

         this.customerName = name;

     }

     public int getID() {

         return customerID;

     }

     public String getName() {

         return customerName;

     } 

}

根據之前的序列化政策,我們的反序列化代碼如下:

import org.apache.kafka.common.errors.SerializationException;

import java.nio.ByteBuffer;

import java.util.Map;

public class CustomerDeserializer implements Deserializer<Customer> {

    @Override

    public void configure(Map configs, boolean isKey) {

     // nothing to configure

    }

    @Override

    public Customer deserialize(String topic, byte[] data) {

        int id;

        int nameSize;

        String name;

        try {

            if (data == null)

                return null;

            if (data.length < 8)

                throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");

            ByteBuffer buffer = ByteBuffer.wrap(data);

            id = buffer.getInt();

            String nameSize = buffer.getInt();

            byte[] nameBytes = new Array[Byte](nameSize);

            buffer.get(nameBytes);

            name = new String(nameBytes, 'UTF-8');

            return new Customer(id, name);

        } catch (Exception e) {

            throw new SerializationException("Error when serializing Customer to byte[] " + e);

        }

    }

    @Override

    public void close() {

            // nothing to close

    } 

}

消費者使用這個反序列化的代碼如下:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

props.put("group.id", "CountryCounter");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer");

KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);

consumer.subscribe("customerCountries")

while (true) {

    ConsumerRecords<String, Customer> records = consumer.poll(100);

    for (ConsumerRecord<String, Customer> record : records)

    {

    System.out.println("current customer Id: " + record.value().getId() + " and current customer name: " + record.value().getName());

    } 

}

最後提醒下,我們并不推薦實作自定義的序列化與反序列化,因為往往這些方案并不成熟,難以維護和更新,而且容易出錯。我們可以使用JSON、Thrift、Protobuf或者Avro的成熟的解決方案。

使用Avro反序列化

假設我們使用之前生産者Avro序列化時使用的Customer,那麼使用Avro反序列化的話,我們的樣例代碼如下:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092,broker2:9092");

props.put("group.id", "CountryCounter");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

//使用KafkaAvroDeserializer來反序列化Avro消息

props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

//這裡增加了schema.registry.url參數,擷取生産者注冊的消息模式

props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));

consumer.subscribe(Collections.singletonList(topic));

System.out.println("Reading topic:" + topic);

while (true) {

    //這裡使用之前生産者使用的Avro生成的Customer類

    ConsumerRecords<String, Customer> records = consumer.poll(1000);

    for (ConsumerRecord<String, Customer> record: records) {

        System.out.println("Current customer name is: " + record.value().getName());

    }

    consumer.commitSync();

}

單個消費者

一般情況下我們都是使用消費組(即便隻有一個消費者)來消費消息的,因為這樣可以在增加或減少消費者時自動進行分區重平衡。這種方式是推薦的方式。在知道主題和分區的情況下,我們也可以使用單個消費者來進行消費。對于這種情況,我們需要自己給消費者配置設定消費分區,而不是讓消費者訂閱(成為消費組)主題。

下面是一個給單個消費者指定分區進行消費的代碼樣例:

List<PartitionInfo> partitionInfos = null;

//擷取主題下所有的分區。如果你知道所指定的分區,可以跳過這一步

partitionInfos = consumer.partitionsFor("topic");

if (partitionInfos != null) {

    for (PartitionInfo partition : partitionInfos)

        partitions.add(new TopicPartition(partition.topic(), partition.partition()));

    //為消費者指定分區

    consumer.assign(partitions);

    while (true) {

        ConsumerRecords<String, String> records = consumer.poll(1000);

        for (ConsumerRecord<String, String> record: records) {

            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());

        }

        consumer.commitSync();

    }

}

除了需要主動擷取分區以及沒有分區重平衡,其他的處理邏輯都是一樣的。需要注意的是,如果添加了新的分區,這個消費者是感覺不到的,需要通過consumer.partitionsFor()來重新擷取分區。