天天看點

MQ系列5:RocketMQ消息的發送模式

MQ系列1:消息中間件執行原理

MQ系列2:消息中間件的技術選型

MQ系列3:RocketMQ 架構分析

MQ系列4:NameServer 原了解析

在之前的篇章中,我們學習了RocketMQ的原理,以及RocketMQ中 命名服務 ServiceName 的運作流程,本篇從消息的生産、消費來了解一條消息的生命周期。

1 消息生産

在RocketMQ中,消息生産指的是 消息生産者往消息隊列中寫入資料的過程。因為業務場景的複雜性,RocketMQ架構設計了多種不同的發送政策。下面先讨論幾種常見的場景:

-** 同步發送:** 整個過程業務是阻塞等待的,消息發送之後等待 Broker 響應,得到響應結果之後再傳遞給業務線程。

  • 異步發送: 調用RocketMQ 的 Async API,消息生産者隻要把消息發送任務放進線程池就傳回給業務線程。所有的邏輯處理、IO操作、網絡請求 都由線程池處理,處理完成之後,調用業務程式定義好的回調函數來告知業務最終的結果。
  • OneWay(單向)發送: 隻負責觸發對消息的發送,發送出即完成任務,不需要對發送的狀态、結果負責。
  • 延遲發送: 指定延遲的時間,在延遲時間到達之後再進行消息的發送。
  • 批量發送: 對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。

    以下是生産者執行個體化啟動的過程:

    MQ系列5:RocketMQ消息的發送模式

1.1 消息發送步驟

一般情況下,我們發送消息,會使用預設的DefaultMQProducer類,經過以下幾個步驟實作:

  • 建立消息生産者Producer,并設定Producer的GroupName(生産組)。
  • 設定InstanceName(執行個體名稱),當你的業務需要啟用多個Producer的時候,使用不同的InstanceName來區分。
  • 設定NameServer位址,這樣Producer才能從NameServer中得到路由資訊
  • 完成其他的初始化配置,比如配置異常重試次數(降低消息丢失的可能性),通信子產品初始化等。
  • 組裝消息對象,指定主題Topic、Tag和消息體Message 等資訊。
  • 通過NameServer擷取到的Broker路由位址,将消息發送。

1.2 消息發生傳回狀态

消息發送之後,會相應的拿到回執。傳回對象中的狀态(SendResult.SendStatus)有4種,如下:

  • FLUSH_DISK_TIMEOUT 刷盤逾時

    如果将Broker的刷盤政策設定成

    SYNC_FLUSH

    ,那麼沒有在規定的時間完成刷盤則會報該錯誤。
  • FLUSH_SLAVE_TIMEOUT 主從同步逾時

    主從模式下(也可以叫主備),Broker配置為

    SYNC_MASTER

    模式,如果沒有在設定時間内完成主從同步,則會報該錯誤。
  • SLAVE_NOT_AVAILABLE 未找到Slave Broker

    主從模式下,且Broker配置為

    SYNC_MASTER

    ,如果未找到Slave的Broker,則會報該錯誤。
  • SEND_OK

    表示發送成功。

1.3 發送同步消息

實時同步消息是一種對可靠性、實時性要求比較高的場景,使用的也比較廣泛,比如:

  • 重要的消息通知,比如驗證碼,不能超過太長時間推送,那樣可能失效
  • 消費記錄确認
  • 資料實時處理和推送 等等
public class SyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、建立生産者producer,并指定生産者組名為 testSyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup");
        // 2、指定NameServer的位址,以擷取Broker路由位址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、建立消息,并指定Topic,Tag和消息體
        Message msg = new Message("testTopic","sync", "測試同步消息".getBytes("UTF-8"));
        // 5、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 6、通過sendResult傳回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 7、如果不再發送消息,關閉生産者Producer
        producer.shutdown();
    }
}
           
MQ系列5:RocketMQ消息的發送模式

1.4 發送異步消息

我們知道,異步主要用于那些對實時響應不敏感的業務,可以容忍一定時間的等待,隻要能達到最終一緻性即可。

有時候為了在流量高峰期進行削峰和分流,緩解壓力,我們經常采用異步消息的發送模式。這種業務場景也很常見,比如:

  • 消費資訊的推送,可能在你買單之後的幾分鐘才送達
  • 資料統計、檔案打包下載下傳等需要長耗時的任務
public class AsyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、建立生産者producer,并指定生産者組名為 testAsyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup");
        // 2、指定NameServer的位址,以擷取Broker路由位址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、建立消息,并指定Topic,Tag和消息體
        Message msg = new Message("testTopic","async", "測試異步消息".getBytes("UTF-8"));
        // 5、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 6. 發送異步消息,SendCallback是處理異步回調的方法
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {  // 成功回調
                System.out.println("success: " + sendResult);
            }
            @Override
            public void onException(Throwable throwable) {  // 失敗回調
                System.out.println("fail: " + throwable);
            }
        });
        // 7、如果不再發送消息,關閉生産者Producer
        producer.shutdown();
    }
}
           
MQ系列5:RocketMQ消息的發送模式

1.5 單向發送消息

OneWay的模式主要用在Care發送結果的場景,隻要消息發送出去即完成任務,不需要對發送的狀态、結果負責。常見的使用場景如

  • 普通日志記錄
  • 非核心的埋點上報等
public class OneWayProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、建立生産者producer,并指定生産者組名為 testOneWayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup");
        // 2、指定NameServer的位址,以擷取Broker路由位址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、建立消息,并指定Topic,Tag和消息體
        Message msg = new Message("testTopic","oneway", "測試單向發送消息".getBytes("UTF-8"));
        // 5、發送消息到一個Broker
        producer.sendOneway(msg);
        // 6、如果不再發送消息,關閉生産者Producer
        producer.shutdown();
    }
}
           
MQ系列5:RocketMQ消息的發送模式

1.6 發送延時消息

指定延遲的時間,在延遲時間到達之後再進行消息的發送。這種的使用場景也很多:

  • 比如火車票訂購,送出了一個訂單就把車票給占位了,這時候可以發送一個延時确認的消息,15m 未付款,就要把該車票釋放,讓其他人去購買。
  • 還比如購買了電影票,可以發送一個核銷資訊,在電影開場前15分鐘就無法退票了。

1.6.1 延時時間的使用限制

延時時間并不是随意指定的,Rocket源碼中指定了18種等級,分别代表不同的時間時長,如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
           
  • RocketMq不支援任意時間延時,需設定固定的延時等級,從1s到2h分别對應着等級1到18
  • 可以使用setDelayTimeLevel(int level) 方法設定延時等級,level 從 0 開始

1.6.2 發送延時消息具體實作

通過下面的代碼,可以得到的結果是消費的時間點比資訊記錄的時間點延遲了1分鐘,這是因為我們在send的時候做了delay。

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、建立生産者producer,并指定生産者組名為 testDelayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup");
        // 2、指定NameServer的位址,以擷取Broker路由位址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、建立消息,并指定Topic,Tag和消息體
        Message msg = new Message("testTopic","delay", "測試延遲發送消息".getBytes("UTF-8"));
        // 5、設定延時等級4,對應1m,是以這個消息在一分鐘之後發送
        msg.setDelayTimeLevel(4);
        // 6、發送消息到一個Broker
        SendResult sendResult = producer.send(msg);
        // 7、通過sendResult傳回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 8、如果不再發送消息,關閉生産者Producer
        producer.shutdown();
    }
}
           
MQ系列5:RocketMQ消息的發送模式

1.7 發送批量消息

  • 對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。
  • 批量發送消息須有相同的topic,相同的waitStoreMsgOK,且不能是延時消息。
waitStoreMsgOK: 消息發送時是否等消息存儲完成後再傳回。
  • 一批次的消息總大小不應超過4MB。
public class BatchProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        // 1、建立生産者producer,并指定生産者組名為 testBatchGroup
        DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup");
        // 2、指定NameServer的位址,以擷取Broker路由位址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、啟動producer
        producer.start();
        // 4、建立消息清單,并指定Topic,Tag和消息體
        List<Message> messages = new ArrayList<>();
        String topic = "testTopic";
        messages.add(new Message(topic, "batch", "測試批量發送消息 0".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "測試批量發送消息 1".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "測試批量發送消息 2".getBytes("UTF-8")));

        // 5、發送消息到一個Broker
        SendResult sendResult = producer.send(messages);
        // 6、通過sendResult傳回消息是否成功送達
        System.out.printf("%s%n", sendResult);
        // 7、如果不再發送消息,關閉生産者Producer
        producer.shutdown();
    }
}
           

1.8 如何提升消息生産的性能

消息的發送一般是經過 client發送、Broker伺服器接收并處理、Broker伺服器傳回應答 三個步驟。

如果我們想要提高消息生産的效率,一般有如下方法:

  • Oneway方式發送

    Oneway方式發送用在一些性能要求高,可靠性要求低的場景下,比如日志采集,非核心的埋點上報等。Oneway方式發送請求無需應答,即将資料寫入用戶端的Socket緩沖區就傳回,不等待結果的傳回。

    是以這種模式是極快的,可以把發送消息時長縮短至微秒級。

  • 增加Producer的并發量,使用多個Producer同時發送

    RocketMQ引入了一個并發視窗,在視窗内消息可以并發地寫入DirectMem中,然後異步地将連續資料寫入檔案系統。

    順序執行CommitLog讓RocketMQ可以保持較高的寫入性能。

  • 恰當的批量發送

    對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。批量發送消息須有相同的topic,相同的waitStoreMsgOK,且不能是延時消息。

    對于消息體的大小也要注意不能超過4MB。

根據阿裡内部調優後的性能測試報告,消息的寫入性能達到90萬+的TPS,我們可以朝着這個名額進行優化。

2 總結

本篇介紹了RocketMQ 消息生産與發送的幾種模式:

  • 同步發送:整個過程業務是阻塞等待的,消息發送之後等待 Broker 響應,得到響應結果之後再傳遞給業務線程。
  • 異步發送:調用RocketMQ 的 Async API,消息生産者隻要把消息發送任務放進線程池就傳回給業務線程。所有的邏輯處理、IO操作、網絡請求 都由線程池處理,處理完成之後,調用業務程式定義好的回調函數來告知業務最終的結果。
  • OneWay(單向)發送:隻負責觸發對消息的發送,發送出即完成任務,不需要對發送的狀态、結果負責。
  • 延遲發送:指定延遲的時間,在延遲時間到達之後再進行消息的發送。
  • 批量發送:對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。

    可以根據實際的業務場景選擇适當的發送模式。

架構與思維·公衆号:撰稿者為bat、位元組的幾位高階研發/架構。不做廣告、不賣課、不要打賞,隻分享優質技術

★ 加公衆号擷取學習資料和面試集錦

碼字不易,歡迎關注,歡迎轉載

作者:翁智華

出處:https://www.cnblogs.com/wzh2010/

本文采用「CC BY 4.0」知識共享協定進行許可,轉載請注明作者及出處。