天天看點

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&Push 融合總結

作者:凱易&耘田

稽核校對:白玙

編輯&排版:雯燕

前言:随着 RocketMQ 5.0 preview 的釋出,5.0 的重大特性逐漸與大家見面。POP Consumer 作為 5.0 的一大特性,POP 消費模式展現了一種全新的消費模式。其具備的輕量級,無狀态,無隊列獨占等特點,對于消息積壓場景,Streaming 消費場景等都非常友好。在介紹 POP Consumer 之前,我們先回顧一下目前使用較多的 Push Consumer。

Push Consumer

熟悉 RocketMQ 的同學對 Push Consumer 肯定不會陌生,用戶端消費一般都會使用這種消費模式,使用這種消費模式也比較簡單。我們隻需簡單設定,并在回調方法 ConsumeMessage 中寫好業務邏輯即可,啟動用戶端應用就可以正常消費消息了。

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("test_topic", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}      

那麼 Push Consumer 是如何消費消息的呢?

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

當然,Consumer 收到消息的前提是 Producer 先發消息發到 Topic 當中。Producer 使用輪詢的方式分别向每個 Queue 中發送消息,一般消費端都不止一個,用戶端啟動的時候會在 Topic,Consumer group 次元發生負載均衡,為每個用戶端配置設定需要處理的 Queue。負載均衡過程中每個用戶端都擷取到全部的的 ConsumerID 和所有 Queue 并進行排序,每個用戶端使用相同負責均衡算法,例如平均配置設定的算法,這樣每個用戶端都會計算出自己需要消費那些 Queue,每當 Consumer 增加或減少就會觸發負載均衡,是以我們可以通過 RocketMQ 負載均衡機制實作動态擴容,提升用戶端收發消息能力。

這裡有個小問題:可以一直增加用戶端的數量提升消費能力嗎?當然不可以,因為 Queue 數量有限,用戶端數量一旦達到 Queue 數量,再擴容新節點無法提升消費能力,因為會有節點配置設定不到 Queue 而無法消費。

用戶端負責均衡為用戶端配置設定好 Queue 後,用戶端會不斷向 Broker 拉取消息,在用戶端進行消費。不是 Push 用戶端嗎?怎麼會是用戶端向 Broker 拉消息,不應該是 Broker 推消息到用戶端嗎?這是一個很有意思的點,因為 RocketMQ 無論是 Push Consumer,還是 Pull Consumer,還是後面要介紹的 POP Consumer,都是用戶端拉的方式消費消息。Push Consumer 隻是通過用戶端 API 層面的封裝讓我們感覺是 Broker 推送的。

經過用戶端負載均衡以及拉消息,用戶端就可以正常消費消息了。

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

完整的的Push Consumer處理邏輯可以看下上面這張圖,我們可以看到Push Consumer完整處理流程。

首先用戶端 Rebalance 确定哪些 Consumer 用戶端處理哪些 Queue,然後通過 PullMessageService 服務拉取消息,拉取到消息以後 ConsumeMessageConcurrentlyService 送出消費請求到消息消費線程池,然後調用回調方法 ConsumeMessage,到這裡就可以拿到消息處理業務了,最後消費成功更新本地 offset 并上報 offset 到 Broker。如果消費失敗(抛異常,逾時等),用戶端會發送 sendBack 告訴 Broker 哪些消息消費失敗了,Broker會将消費失敗的消息發送到延時隊列,延時後再放到retry Topic,用戶端消費retry Topic完成消息重投。這樣做的好處是不會因為部分消費失敗的消息而影響正常消息的消費。想了解細節的同學可以到 github 下載下傳源碼對照這張圖看一下實際的代碼處理流程。

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

通過前面 Push Consumer 的介紹,我們對 Push Consumer 原理有了一定的認識。我們可以發現,RocketMQ 的用戶端做了很多事情,負載均衡,拉消息,消費位點管理,消費失敗後的 sendBack 等等。這對多語言支援無疑是不友好的。參與過多語言開發的同學應該會感同身受,将這麼多的邏輯移植到不同的語言,肯定不是一件簡單的事情。同時用戶端的更新運維也會增加難度。

是以我們思考可不可為用戶端瘦身,把一部分邏輯從用戶端移到 Broker?當然是可以的,前面介紹 Push Consumer 用戶端負責均衡的時候,我們可以發現,負載均衡需要的資訊,所有ConsumerId,原本就是用戶端從 Broker 擷取的,所有 Queue 資訊,Broker 也可以通過 nameServer 拿到,負責均衡算法在用戶端還是 Broker 端調用也沒有什麼大的差異,是以把 Rebalance 移植到 Broker 是一個不錯選擇,Broker 負載均衡可以跟用戶端負責均衡達到基本相同的效果,用戶端邏輯會減少,多語言實作更加簡單,後續更新運維也會更加可控。除此以外因為 Broker 相對用戶端具有全局資訊,還可以做一些更有意思的事情。例如在負責均衡的時候根據 Queue 的積壓情況做負載均衡,将一些壓力比較大的用戶端上的 Queue 配置設定給其它用戶端處理等等。

POP Consumer

通過前面 Push Consumer 的介紹,我們了解到 Push Consumer 的一些特點。

  • 隊列獨占:Broker 上的每個隊列隻能配置設定到相同 Consumer group 的一台 Push Consumer 機器上。 
  • 消費後更新 offset:每次 Pull 請求拉取批量消息到本地隊列緩存,本地消費成功才會 commit offset。

以上特點可能會帶來一些問題,比如用戶端異常機器 hang,導緻配置設定隊列消息堆積,無法消費。

RocketMQ 的 Push Consumer 消費對于機器異常 hang 時并不十分友好。如果遇到用戶端機器 hang 住,處于半死不活的狀态,與 Broker 的心跳沒有斷掉的時候,用戶端 Rebalance 依然會配置設定消費隊列到 hang 機器上,并且 hang 機器消費速度很慢甚至無法消費的時候,會導緻消費堆積。另外類似還有服務端 Broker 釋出時,也會由于用戶端多次 Rebalance 導緻消費延遲影響等無法避免的問題。如下圖所示:

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

當 Push Consumer 2 機器發生 hang 的時候,它所配置設定到的 Broker 上的 Q2 出現嚴重的堆積。我們目前處理這種問題,一般可能是找到這台機器重新開機,或者下線。保證業務不受異常機器影響,但是如果隊列擠壓到一定程度可能機器恢複了也沒辦法快速追趕消費進度,這也是受 Push Consumer 的能力限制。

我們總結下 Push Consumer 存在的一些痛點問題:

  • 富用戶端,用戶端邏輯比較重,多語言支援不友好;
  • 用戶端或者 Broker 更新釋出,重新開機等 Rebalance 可能導緻消費擠壓;
  • 隊列占位,單隊列與單 Consumer 綁定,單個 Queue 消費能力無法橫向擴充;
  • 機器 hang,會導緻擠壓。

基于上述問題,RocketMQ 5.0 實作了全新的消費模型-POP Consumer。

POP Consumer 能夠解決上述穩定性和解除隊列占位的擴充能力。

我們下面來簡單看一下 POP Consumer 是如何消費消息的:

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

POP Client 從 Broker 的隊列中發出 POP 請求消息,Broker 傳回消息 message。在消息的系統屬性裡面有一個比較重要的屬性叫做 POP_CK,POP_CK 為一條消息的 handler,通過一個 handler 就可以定位到一條消息。當消息消費成功之後,POP client 發送 ackMessage 并傳遞 handler 向 broker 确認消息消費成功。

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

對于消息的重試,當 POP 出一條消息之後,這條消息就會進入一個不可見的時間,在這段時間就不會再被 POP 出來。如果沒有在這段不可見時間通過 ackMessage 确認消息消費成功,那麼過了不可見時間之後,這條消息就會再一次的可見。

另外,對于消息的重試,我們的重試政策是一個梯度的延遲時間,重試的間隔時間是一個逐漸遞增的。是以,還有一個 changeInvisibleTime 可以修改消息的不可見時間。

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

從圖上可以看見,本來消息會在中間這個時間點再一次的可見的,但是我們在可見之前提前使用 changeInvisibleTime延長了不可見時間,讓這條消息的可見時間推遲了。當使用者業務代碼傳回 reconsumeLater 或者抛異常的時候,我們就可以通過 changeInvisibleTime 按照重試次數來修改下一次的可見時間了。另外如果消費 RT 超過了 30 秒(預設值,可以修改),則 Broker 也會把消息放到重試隊列。

除此以外,POP 消費的位點是由 Broker 儲存和控制,而且 POP 消費是可以多個 Client 消費同一個隊列,如下圖所示:

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結
RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

三個用戶端并不需要 Rebalance 去配置設定 Queue,取而代之的是,它們都會使用 POP 請求所有的 Broker 擷取消息進行消費。即使 POP Consumer 2 出現 hang,其内部消息也會讓 POP Consumer1 和 POP Consumer3 進行消費。這樣就解決了 hang 機器可能造成的消費堆積問題。

從整體流程可見,POP 消費可以避免 Rebalance 帶來的消費延時,同時用戶端可以消費 Broker 的所有隊列,這樣就可以避免機器 hang 而導緻堆積的問題。

同時擴充能力提升,POP Consumer 可以消費同一 Topic 下所有 Queue,相比 Push Consumer 解除了每個 Queue 必須 Rebalance 到一台用戶端消費的限制,Push Consuner 用戶端數量最多隻能等于 Queue 的數量。POP Consumer 可以突破這個限制,多個 POP Consumer 可以消費同一個 Queue。

Broker 實作

POP Consumer 在 Broker 端是如何實作的呢?

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

POP Consumer 拉取消息後,會在 Queue 次元上加鎖,保證同一時刻隻有一個用戶端可以拉去到同一個 Queue 的消息。擷取到消息後,會儲存 checkPoint 資訊在 Broker,checkPoint 資訊主要包括消息的 Topic,ConsumerGroup,QueueId,offset,POPTime,msgCout,reviveQueueId 等資訊。checkPoint 資訊會優先儲存到 buffer 當中,等待 ack 消息,在一段時間内收到用戶端回複的 ack 消息,對應的 checkPoint 資訊從 buffer 中移除,并且更新消費進度,辨別消息消費成功。

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

當 checkPoint 消息在 buffer 中等待一段時間,一直未等到 ack 消息時,checkPoint 資訊會清理出 buffer 并發送 ck msg 到 store,ck msg 首先被發送到延時隊列 SCHEDULE_Topic_XXXX 中,延時完成以後會進入 REVIVE_LOG Topic,REVIVE_LOG Topic 是儲存在 store 當中待處理的 ck msg 和 ack msg 的 Topic,POPReceiveService 拉取 REVIVE_LOG Topic 的消息放到一個 map 當中,如果 ck 有對應的 ack 則會更新 REVIVE_LOG 的消費位點,辨別消息消費完成,逾時未被确認的 ck msg,會查詢到 ck msg 對應的真實的消息,并把這個消息放到 retry Topic 當中,等待用戶端消費,POP Consumer 正常消費的時候會機率性的消費到 retry Topic 中的消息。我們從這塊設計中可以看到 RocketMQ 的常用設計,通過一些内部的 Topic 實作業務邏輯,事務消息,定時消息都用了這種設計方式。

我們簡單終結一下 POP Consumer 的優勢:

  • 無狀态,offset 資訊 Broker 維護,用戶端與 Queue 無綁定。
  • 輕量級,用戶端隻需要收發消息,确認消息。
  • 無隊列占位,Queue 不再與用戶端綁定。
  • 多語言友好,友善多語言移植。
  • 更新更可控,邏輯都收斂到 Broker,更新更加友善可控。

POP&Push 融合

既然 POP 有這麼多優勢,我們能否使用 POP 解決 Push 的一些問題呢?前面我們提到 Push Consumer 當一個隊列因為 Consumer 問題已經堆積很多的時候,受限于單個 Consumer 的消費能力,也無法快速的追趕消費進度,延遲會很高。核心問題是單隊列單 Consumer 的限制,導緻消費能力無法橫向擴充。

我們希望通過 POPAPI 的形式,當一個隊列堆積太多的情況下,可以切換到 POP 模式,有機會讓多個 Consumer 來一起消費該隊列,追趕進度,我們在 5.0 的實作中也實作了這一點。 

POP/Push 模式切換方式

可以通過兩種方式進行切換。

1、指令行

mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8      

2、代碼切換

public static final String CONSUMER_GROUP = "CID_JODIE_1";
    public static final String TOPIC = "TopicTest";

    // Or use AdminTools directly: mqadmin setConsumeMode -c cluster -t topic -g group -m POP -n 8
    private static void switchPop() throws Exception {
        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
        mqAdminExt.start();

        ClusterInfo clusterInfo = mqAdminExt.examineBrokerClusterInfo();
        Set<String> brokerAddrs = clusterInfo.getBrokerAddrTable().values().stream().map(BrokerData::selectBrokerAddr).collect(Collectors.toSet());

        for (String brokerAddr : brokerAddrs) {
            mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
        }
    }      

通過下面 POP Consumer Demo,我們看到 POP Consumer 跟 Push API 基本是統一,使用也比較簡單,相比 Push API 隻是多了一步消費模式切換。

RocketMQ 5.0 POP 消費模式探秘Push ConsumerPOP ConsumerPOP&amp;Push 融合總結

Push & POP Retry 隊列差異

在使用 POP 消費模式時我們隻需要在 Push API 的基礎上切換模式即可,對于 Broker 來說還是需要做一些處理的。主要需要處理的地方是 retry 隊列。

Push 和 POP 模式對 retry 隊列處理不一樣

  • Push 的 retry 處理 
    • 服務端有一個 %RETRY%ConsumerGroup 隊列
    • 用戶端會有拉取任務拉取這個隊列的消息。
  • POP 的 retry 處理 
    • 服務端針對每個Topic,都有一個名為 %RETRY%ConsumerGroup_Topic 的 retry 隊列
    • 用戶端沒有專門針對 retry 隊列的拉任務,每次普通 POP 請求都有一定機率消費相應的 retry 隊列

模式切換之後,老模式的 retry 裡的消息還需要繼續處理,否則就丢消息了。

Push & POP 切換

Push 切換到 POP 

  • 正常隊列切換到 POP 模式
  • 正常隊列的 POP 請求會處理對應的 POP retry 隊列
  • 針對 Push retry 隊列,我們保留原來 Push retry 隊列的拉取任務,并且是工作在 Push 模式。

POP 切換到 Push 

  • 正常隊列切換到 Push 模式
  • Push retry 隊列自然有相應的拉取任務
  • 之前 POP 的 retry 隊列,我們在用戶端自動建立拉取任務,以Push 模式去拉取。注意這裡的拉取任務隻拉取 POP 的 retry 隊列。

總結下來就是,對于 retry 隊列,我們會特殊處理不參與模式切換。

總結

最後我們總結下 POP Consumer。POP 作為一種全新的消費模式,解決了 Push 模式的一些痛點,使用戶端無狀态,更加輕量,消費邏輯也基本都收斂到了 Broker,對多語言的支援十分的友好。在 API 層面也與 Push 完成了融合,繼承了 Push API 的簡單易用,同時實作了 Push,POP 之間的自由切換。