MQ系列1:消息中間件執行原理
MQ系列2:消息中間件的技術選型
MQ系列3:RocketMQ 架構分析
MQ系列4:NameServer 原了解析
在之前的篇章中,我們學習了RocketMQ的原理,以及RocketMQ中 命名服務 ServiceName 的運作流程,本篇從消息的生産、消費來了解一條消息的生命周期。
1 消息生産
在RocketMQ中,消息生産指的是 消息生産者往消息隊列中寫入資料的過程。因為業務場景的複雜性,RocketMQ架構設計了多種不同的發送政策。下面先讨論幾種常見的場景:
-** 同步發送:** 整個過程業務是阻塞等待的,消息發送之後等待 Broker 響應,得到響應結果之後再傳遞給業務線程。
- 異步發送: 調用RocketMQ 的 Async API,消息生産者隻要把消息發送任務放進線程池就傳回給業務線程。所有的邏輯處理、IO操作、網絡請求 都由線程池處理,處理完成之後,調用業務程式定義好的回調函數來告知業務最終的結果。
- OneWay(單向)發送: 隻負責觸發對消息的發送,發送出即完成任務,不需要對發送的狀态、結果負責。
- 延遲發送: 指定延遲的時間,在延遲時間到達之後再進行消息的發送。
-
批量發送: 對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。
以下是生産者執行個體化啟動的過程:
1.1 消息發送步驟
一般情況下,我們發送消息,會使用預設的DefaultMQProducer類,經過以下幾個步驟實作:
- 建立消息生産者Producer,并設定Producer的GroupName(生産組)。
- 設定InstanceName(執行個體名稱),當你的業務需要啟用多個Producer的時候,使用不同的InstanceName來區分。
- 設定NameServer位址,這樣Producer才能從NameServer中得到路由資訊
- 完成其他的初始化配置,比如配置異常重試次數(降低消息丢失的可能性),通信子產品初始化等。
- 組裝消息對象,指定主題Topic、Tag和消息體Message 等資訊。
- 通過NameServer擷取到的Broker路由位址,将消息發送。
1.2 消息發生傳回狀态
消息發送之後,會相應的拿到回執。傳回對象中的狀态(SendResult.SendStatus)有4種,如下:
-
FLUSH_DISK_TIMEOUT 刷盤逾時
如果将Broker的刷盤政策設定成
,那麼沒有在規定的時間完成刷盤則會報該錯誤。SYNC_FLUSH
-
FLUSH_SLAVE_TIMEOUT 主從同步逾時
主從模式下(也可以叫主備),Broker配置為
模式,如果沒有在設定時間内完成主從同步,則會報該錯誤。SYNC_MASTER
-
SLAVE_NOT_AVAILABLE 未找到Slave Broker
主從模式下,且Broker配置為
,如果未找到Slave的Broker,則會報該錯誤。SYNC_MASTER
-
SEND_OK
表示發送成功。
1.3 發送同步消息
實時同步消息是一種對可靠性、實時性要求比較高的場景,使用的也比較廣泛,比如:
- 重要的消息通知,比如驗證碼,不能超過太長時間推送,那樣可能失效
- 消費記錄确認
- 資料實時處理和推送 等等
public class SyncProducerApplication {
public static void main(String[] args) throws Exception {
// 1、建立生産者producer,并指定生産者組名為 testSyncGroup
DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup");
// 2、指定NameServer的位址,以擷取Broker路由位址
producer.setNamesrvAddr("192.168.139.1:9876");
// 3、啟動producer
producer.start();
// 4、建立消息,并指定Topic,Tag和消息體
Message msg = new Message("testTopic","sync", "測試同步消息".getBytes("UTF-8"));
// 5、發送消息到一個Broker
SendResult sendResult = producer.send(msg);
// 6、通過sendResult傳回消息是否成功送達
System.out.printf("%s%n", sendResult);
// 7、如果不再發送消息,關閉生産者Producer
producer.shutdown();
}
}
1.4 發送異步消息
我們知道,異步主要用于那些對實時響應不敏感的業務,可以容忍一定時間的等待,隻要能達到最終一緻性即可。
有時候為了在流量高峰期進行削峰和分流,緩解壓力,我們經常采用異步消息的發送模式。這種業務場景也很常見,比如:
- 消費資訊的推送,可能在你買單之後的幾分鐘才送達
- 資料統計、檔案打包下載下傳等需要長耗時的任務
public class AsyncProducerApplication {
public static void main(String[] args) throws Exception {
// 1、建立生産者producer,并指定生産者組名為 testAsyncGroup
DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup");
// 2、指定NameServer的位址,以擷取Broker路由位址
producer.setNamesrvAddr("192.168.139.1:9876");
// 3、啟動producer
producer.start();
// 4、建立消息,并指定Topic,Tag和消息體
Message msg = new Message("testTopic","async", "測試異步消息".getBytes("UTF-8"));
// 5、發送消息到一個Broker
SendResult sendResult = producer.send(msg);
// 6. 發送異步消息,SendCallback是處理異步回調的方法
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) { // 成功回調
System.out.println("success: " + sendResult);
}
@Override
public void onException(Throwable throwable) { // 失敗回調
System.out.println("fail: " + throwable);
}
});
// 7、如果不再發送消息,關閉生産者Producer
producer.shutdown();
}
}
1.5 單向發送消息
OneWay的模式主要用在Care發送結果的場景,隻要消息發送出去即完成任務,不需要對發送的狀态、結果負責。常見的使用場景如
- 普通日志記錄
- 非核心的埋點上報等
public class OneWayProducerApplication {
public static void main(String[] args) throws Exception {
// 1、建立生産者producer,并指定生産者組名為 testOneWayGroup
DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup");
// 2、指定NameServer的位址,以擷取Broker路由位址
producer.setNamesrvAddr("192.168.139.1:9876");
// 3、啟動producer
producer.start();
// 4、建立消息,并指定Topic,Tag和消息體
Message msg = new Message("testTopic","oneway", "測試單向發送消息".getBytes("UTF-8"));
// 5、發送消息到一個Broker
producer.sendOneway(msg);
// 6、如果不再發送消息,關閉生産者Producer
producer.shutdown();
}
}
1.6 發送延時消息
指定延遲的時間,在延遲時間到達之後再進行消息的發送。這種的使用場景也很多:
- 比如火車票訂購,送出了一個訂單就把車票給占位了,這時候可以發送一個延時确認的消息,15m 未付款,就要把該車票釋放,讓其他人去購買。
- 還比如購買了電影票,可以發送一個核銷資訊,在電影開場前15分鐘就無法退票了。
1.6.1 延時時間的使用限制
延時時間并不是随意指定的,Rocket源碼中指定了18種等級,分别代表不同的時間時長,如下:
// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- RocketMq不支援任意時間延時,需設定固定的延時等級,從1s到2h分别對應着等級1到18
- 可以使用setDelayTimeLevel(int level) 方法設定延時等級,level 從 0 開始
1.6.2 發送延時消息具體實作
通過下面的代碼,可以得到的結果是消費的時間點比資訊記錄的時間點延遲了1分鐘,這是因為我們在send的時候做了delay。
public class DelayProducerApplication {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
// 1、建立生産者producer,并指定生産者組名為 testDelayGroup
DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup");
// 2、指定NameServer的位址,以擷取Broker路由位址
producer.setNamesrvAddr("192.168.139.1:9876");
// 3、啟動producer
producer.start();
// 4、建立消息,并指定Topic,Tag和消息體
Message msg = new Message("testTopic","delay", "測試延遲發送消息".getBytes("UTF-8"));
// 5、設定延時等級4,對應1m,是以這個消息在一分鐘之後發送
msg.setDelayTimeLevel(4);
// 6、發送消息到一個Broker
SendResult sendResult = producer.send(msg);
// 7、通過sendResult傳回消息是否成功送達
System.out.printf("%s%n", sendResult);
// 8、如果不再發送消息,關閉生産者Producer
producer.shutdown();
}
}
1.7 發送批量消息
- 對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。
- 批量發送消息須有相同的topic,相同的waitStoreMsgOK,且不能是延時消息。
waitStoreMsgOK: 消息發送時是否等消息存儲完成後再傳回。
- 一批次的消息總大小不應超過4MB。
public class BatchProducerApplication {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
// 1、建立生産者producer,并指定生産者組名為 testBatchGroup
DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup");
// 2、指定NameServer的位址,以擷取Broker路由位址
producer.setNamesrvAddr("192.168.139.1:9876");
// 3、啟動producer
producer.start();
// 4、建立消息清單,并指定Topic,Tag和消息體
List<Message> messages = new ArrayList<>();
String topic = "testTopic";
messages.add(new Message(topic, "batch", "測試批量發送消息 0".getBytes("UTF-8")));
messages.add(new Message(topic, "batch", "測試批量發送消息 1".getBytes("UTF-8")));
messages.add(new Message(topic, "batch", "測試批量發送消息 2".getBytes("UTF-8")));
// 5、發送消息到一個Broker
SendResult sendResult = producer.send(messages);
// 6、通過sendResult傳回消息是否成功送達
System.out.printf("%s%n", sendResult);
// 7、如果不再發送消息,關閉生産者Producer
producer.shutdown();
}
}
1.8 如何提升消息生産的性能
消息的發送一般是經過 client發送、Broker伺服器接收并處理、Broker伺服器傳回應答 三個步驟。
如果我們想要提高消息生産的效率,一般有如下方法:
-
Oneway方式發送
Oneway方式發送用在一些性能要求高,可靠性要求低的場景下,比如日志采集,非核心的埋點上報等。Oneway方式發送請求無需應答,即将資料寫入用戶端的Socket緩沖區就傳回,不等待結果的傳回。
是以這種模式是極快的,可以把發送消息時長縮短至微秒級。
-
增加Producer的并發量,使用多個Producer同時發送
RocketMQ引入了一個并發視窗,在視窗内消息可以并發地寫入DirectMem中,然後異步地将連續資料寫入檔案系統。
順序執行CommitLog讓RocketMQ可以保持較高的寫入性能。
-
恰當的批量發送
對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。批量發送消息須有相同的topic,相同的waitStoreMsgOK,且不能是延時消息。
對于消息體的大小也要注意不能超過4MB。
根據阿裡内部調優後的性能測試報告,消息的寫入性能達到90萬+的TPS,我們可以朝着這個名額進行優化。
2 總結
本篇介紹了RocketMQ 消息生産與發送的幾種模式:
- 同步發送:整個過程業務是阻塞等待的,消息發送之後等待 Broker 響應,得到響應結果之後再傳遞給業務線程。
- 異步發送:調用RocketMQ 的 Async API,消息生産者隻要把消息發送任務放進線程池就傳回給業務線程。所有的邏輯處理、IO操作、網絡請求 都由線程池處理,處理完成之後,調用業務程式定義好的回調函數來告知業務最終的結果。
- OneWay(單向)發送:隻負責觸發對消息的發送,發送出即完成任務,不需要對發送的狀态、結果負責。
- 延遲發送:指定延遲的時間,在延遲時間到達之後再進行消息的發送。
-
批量發送:對于同類型、同特征的消息,可以聚合進行批量發送,減少MQ的連接配接發送次數,能夠顯著提升性能。
可以根據實際的業務場景選擇适當的發送模式。
架構與思維·公衆号:撰稿者為bat、位元組的幾位高階研發/架構。不做廣告、不賣課、不要打賞,隻分享優質技術
★ 加公衆号擷取學習資料和面試集錦
碼字不易,歡迎關注,歡迎轉載
作者:翁智華
出處:https://www.cnblogs.com/wzh2010/
本文采用「CC BY 4.0」知識共享協定進行許可,轉載請注明作者及出處。