名言警句
任何先進的技術均與魔法無異
追本溯源
經曆了6個月的失蹤,我将帶着幹貨終究歸來!【RocketMQ入門到精通】
訂閱與釋出
消息的釋出是指某個生産者向某個topic發送消息,消息的訂閱是指某個消費者關注了某個topic中帶有某些tag的消息,進而從該topic消費資料。
對于一個指定的Topic,消息嚴格按照先進先出(FIFO)的原則進行消息釋出和消費,即先釋出的消息先消費,後釋出的消息後消費。在 Apache RocketMQ 中支援分區順序消息,如下圖所示。我們可以按照某一個标準對消息進行分區(比如圖中的ShardingKey),同一個ShardingKey的消息會被配置設定到同一個隊列中,并按照順序被消費。
消息順序
消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單産生了三條消息分别是訂單建立、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。RocketMQ可以嚴格的保證消息有序。
順序消息分為全局順序消息與分區順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息隻要保證每一組消息被順序消費即可。
- 全局順序 對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行釋出和消費。
- 适用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息釋出和消費的場景
- 分區順序 對于指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區内的消息按照嚴格的 FIFO 順序進行釋出和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。
- 适用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息釋出和消費的場景。
順序消息發送
順序消息是一種對消息發送和消費順序有嚴格要求的消息。
RocketMQ 消息的順序性分為兩部分,生産順序性和消費順序性。隻有同時滿足了生産順序性和消費順序性才能達到上述的FIFO效果。
順序消息分類
生産順序性: RocketMQ 通過生産者和服務端的協定保障單個生産者串行地發送消息,并按序存儲和持久化。如需保證消息生産的順序性,則必須滿足以下條件:
- 單一生産者: 消息生産的順序性僅支援單一生産者,不同生産者分布在不同的系統,即使設定相同的分區鍵,不同生産者之間産生的消息也無法判定其先後順序。
- 串行發送:生産者用戶端支援多線程安全通路,但如果生産者使用多線程并行發送,則不同線程間産生的消息将無法判定其先後順序。
滿足以上條件的生産者,将順序消息發送至服務端後,會保證設定了同一分區鍵的消息,按照發送順序存儲在同一隊列中。服務端順序存儲邏輯如下:
順序消息的應用場景也非常廣泛,在有序事件處理、撮合交易、資料實時增量同步等場景下,異構系統間需要維持強一緻的狀态同步,上遊的事件變更需要按照順序傳遞到下遊進行處理。
例如,建立訂單的場景,需要保證同一個訂單的生成、付款和發貨,這三個操作被順序執行。如果是普通消息,訂單A的消息可能會被輪詢發送到不同的隊列中,不同隊列的消息将無法保持順序,而順序消息發送時将ShardingKey相同(同一訂單号)的消息序路由到一個邏輯隊列中。
順序消息的一緻性
如果一個Broker掉線,那麼此時隊列總數是否會發化?
如果發生變化,那麼同一個ShardingKey的消息就會發送到不同的隊列上,造成亂序。如果不發生變化,那消息将會發送到掉線Broker的隊列上,必然是失敗的。是以 Apache RocketMQ 提供了兩種模式,如果要保證嚴格順序而不是可用性,建立 Topic 是要指定 -o 參數(--order)為true,表示順序消息。
$ sh bin/mqadmin updateTopic -c DefaultCluster -t TopicTest -o true -n 127.0.0.1:9876
create topic to 127.0.0.1:10911 success.
TopicConfig [topicName=TopicTest, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=true, attributes=null]
MessageQueueSelector的接口:
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
其中mqs是可以發送的隊列,msg是消息,arg是上述send接口中傳入的Object對象,傳回的是該消息需要發送到的隊列。上述例子裡,是以orderId作為分區分類标準,對所有隊列個數取餘,來對将相同orderId的消息發送到同一個隊列中。
生産環境中建議選擇最細粒度的分區鍵進行拆分,例如,将訂單ID、使用者ID作為分區鍵關鍵字,可實作同一終端使用者的消息按照順序處理,不同使用者的消息無需保證順序。
保證NameServer中的配置
orderMessageEnable
和
returnOrderTopicConfigToBroker
必須是 true。如果上述任意一個條件不滿足,則是保證可用性而不是嚴格順序。
順序消息示例代碼
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}