RocketMQ源于阿裡,原名MetaQ,後捐獻給apache,支援Java、C/C++、Python、Go;是一款分布式、隊列模型的開源消息中間件,經曆了幾年淘寶雙十一的考驗。
架構設計核心特性
NameServer
路由中心/注冊中心,管理Broker,類似于kafka中的zookeeper,負責Broker節點注冊,Producer和Consumer發送和消費消息會先去NameServer中查找可用的Broker節點;Broker節點在啟動的時候就會去查找配置檔案中有哪些NameServer,然後在所有的NameServer中注冊自己建立長連接配接,之後每隔30秒發送一個心跳告訴NameServer目前節點還存活可用;而NameServer也會定時每隔10秒檢視清單中哪些Broker長時間沒有發送心跳,如果120秒沒有發送心跳就會将Broker從清單中移除。
那麼RocketMQ為什麼不用zookeeper而選擇自己造一個NameServer呢?
(1)因為基于RocketMQ的架構設計,他們僅需要一個輕量級的中繼資料伺服器,隻需要保證最終一緻性,而不需要像zookeeper一樣保持強一緻。
(2)使用zookeeper擔心服務故障,需要考慮高可用問題,需要維護
為什麼NameServer之間不需要同步?如何保持一緻?
(1)服務注冊(Broker新增):每個Broker啟動時都會檢視配置檔案有多少NameServer,然後向所有的NameServer注冊自己保證NameServer彼此一緻
(2)服務剔除(Broker關閉或當機):Broker關閉會将自己從所有的NameServer中移除;如果Broker當機,NameServer會10秒監測一次清單中的Broker是否存活,如果Broker挂了,也會将其移除
(3)路由發現(用戶端擷取最新的Broker list,初始連接配接、後續連接配接):Producer和Consumer會定期每隔30秒重新整理一次本地路由清單,通過重試機制可以解決30秒内連接配接了當機Broker的問題
生産者
支援同步發送、異步發送、順序發送、單向發送,同步發送和異步發送都需要broker給一個ack應答,單向發送不需要
生産者消息發送規則
通過MessageQueueSelector實作類來定義發送規則,有以下三個實作類:
-
SelectMessageQueueByHash(預設):發送消息時,API底層預設使用此規則,根據隊列id取絕對值和隊列數取餘數,然後id自增達到輪詢效果
public class SelectMessageQueueByHash implements MessageQueueSelector { public SelectMessageQueueByHash() {} public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = arg.hashCode(); if (value < 0) { value = Math.abs(value); } value %= mqs.size(); return (MessageQueue)mqs.get(value); } }
- SelectMessageQueueByRandom:随機選擇一個隊列
public class SelectMessageQueueByRandom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); public SelectMessageQueueByRandom() {} public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = this.random.nextInt(mqs.size()); return (MessageQueue)mqs.get(value); } }
- SelectMessageQueueByMachineRoom:傳回空,沒有實作
也可以自定義發送規則(實作MessageQueueSelector接口):
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, i);
順序消息
- 生産者發送消息時,到達broker是有序的,是以生産者隻能用單線程順序發送
- 寫入broker時,要順序寫入,相同主題的消息集中寫入,選擇同一個message queue,傳入相同的hashkey
- 消費者消費的時候隻能單線程消費
事務消息(兩階段送出)
如何保證資料庫操作與MQ消息發送操作要麼都成功,要麼都復原?
- 生産者發送消息到broker,把消息狀态标記為“未确認”,這個狀态的消息無法被消費
- 如果broker可以接收成功,broker通知生産者消息接收成功
- 生産者執行資料庫事務操作,并傳回執行結果
- 如果事務執行成功,生成者修改消息狀态為“确認”,否則丢棄消息
- 如果生産者資料庫事務操作沒有傳回結果,broker主動查詢事務執行結果
延遲消息
訂單逾時未支付自動關閉,怎麼實作?
消息設為延遲消息,30分鐘之後才發送,然後消費端查詢訂單是否支付,如果沒有支付則關閉訂單。
String content = "This is one test message";
Message msg = new Message("delay-topic", "tagA", "orderId666", content.getBytes("UTF-8"));
msg.setDelayTimeLevel(2); // 不同delay time level不同延遲時間
Broker
做一個中轉角色,負責存儲消息,轉發消息,并且存儲中繼資料(消費者offset)
Topic
表示一類消息的集合,每個主題可以存儲很多消息,每個消息都屬于一個topic,是rocketmq訂閱消息的基本機關。
Tag:消息标簽,用于在同一個主題下區分不同類型的消息
Message Queue
類似于Kafka中的partition分區,建立topic時需要指定寫隊列數量和讀隊列數量,每一個topic下都可以設定一定數量的消息隊列用來資料讀取;寫隊列數量決定了有幾個message queue,用來劃分存儲位置,如果沒有配置,服務端預設寫隊列數量是8,而生産端預設是4個;讀隊列數量決定了消費者有幾個線程來消費這些消息,用來做負載。
MessageQueue有三個屬性:
topic:指定目前message queue是屬于topic
broker名字:指定存在哪個broker上
queue編号:一個topic下有多個queue,是以需要根據編号來尋找
消息如何存儲
從下面rocketmq官網圖 可以看出,所有發送的消息都存放在commitlog中,集中存儲保證順序性,那麼consumer消費時就可以實作順序消費提升速度;然後由consumequeue來存放每個consumer與其消費的對應offset的對應關系。
實體存儲檔案
commitlog:存放所有消息,檔案預設最大1G,超過大小後會以最後的offset作為檔案名重新建立新的檔案
consumequeue:下面有多個檔案目錄,類似索引,和建立topic時設定的寫入隊列數量有關,裡面記錄每個consumer和其消費的Offset的對應關系,每個檔案可存30W條記錄
checkpoint:檔案檢查點,記錄最後一次刷盤的時間戳
config:運作時配置資訊
index:索引存儲,可存2000W條記錄,大概400M大小
page cache
使用者調用資料,需要先從磁盤擷取資料copy到核心空間,然後從核心空間copy到使用者空間
mmap(memory map)
為了減少資料copy的次數,使用了mmap記憶體映射來實作核心緩存區與使用者緩沖區映射同一塊記憶體位址,避免核心空間與使用者空間之間資料的copy
檔案清理政策
-
哪些檔案需要清理?
commitlog存放資料,consumequeue存放offset,這兩個檔案需要定期清理
-
檔案怎樣算是過期檔案?
MessageStoreConfig類中有定義fileReservedTime屬性,預設儲存72小時,超過72小時即為過期
-
過期檔案什麼時候删除?
MessageStoreConfig類中有定義deleteWhen屬性,預設4:00am删除
或者磁盤使用率超過了85%也會批量删除檔案,如果磁盤使用率達到90%就會拒絕消息寫入,在DefaultMessageStore類中有定義diskSpaceCleanForciblyRatio和diskSpaceWarningLevelRatio兩個屬性值
消費者
消費模式
- 叢集模式:消費者組裡的消費者會負載消費不同的消息
- 廣播模式:消費者組裡的消費者會将消息全部消費一遍,所有消費者消費一樣的消息
消費模型
RabbitMQ既支援Pull也支援Push,而Kafka隻支援Pull;從接口而言RocketMQ也同時支援Pull和Push,但是實際上RocketMQ的Push不是真正的Push,而是通過Pull來實作的,最終啟動了一個pull服務
消費端負載均衡與rebalance
- AllocateMessageQueueAveragely:連續配置設定(預設),類似于kafka的範圍配置設定RangeAssignor政策
- AllocateMessageQueueAveragelyByCircle:每人輪流一個,類似kafka的輪詢配置設定RoundRobinAssignor政策
- AllocateMessageQueueByConfig:通過配置
- AllocateMessageQueueConsistentHash:一緻性哈希
- AllocateMessageQueueByMachineRoom:指定一個broker中的topic中的queue消費
- AllocateMachineRoomNearby:按broker的機房就近配置設定
消費端重試
如果消費失敗,通過捕獲異常,然後傳回ConsumeConcurrentlyStatus.RECONSUME_LATER,那麼服務端就會建立一個%RETRY%my_test_consumer_group的隊列來儲存需要重試的消息實作重試
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String messageBody="";
try {
messageBody = new String(msg.getBody(),"utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 重新消費
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
String tags = msg.getTags();
SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println("接收時間:" +sf.format(new Date()) + ",topic:"+topic+",tags:"+tags+",msg:"+messageBody);
}
// 消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
死信隊列
如果經過了所有的delayTimeLevel,消息仍然消費失敗,則會建立一個死信隊列%DLQ%topic_name,将重試失敗的消息轉發到死信隊列中
高可用架構
二主二從高可用架構圖
在RocketMQ中,Slave節點既可以實作備份,其中有一個Slave還可以實作讀寫功能,進而分擔Master節點的壓力
主從怎麼關聯在一起?
- 叢集的名字相同,比如brokerClusterName=lucifer-cluster
- 連接配接到相同的nameserver
- 在配置檔案中,brokerId=0代表是master,brokerId=1代表是slave
主從同步brokerRole
SYNC_MASTER:主從同步雙寫(推薦),當master和slave都寫入成功後才給用戶端傳回成功
ASYNC_MASTER:主從異步複制,隻要master寫入成功,就會給用戶端傳回成功
flushDiskType刷盤類型
ASYNC_FLUSH:異步刷盤(預設),先把消息緩存,直接給producer傳回ACK,資料可能丢失
SYNC_FLUSH:同步刷盤,每條消息都儲存到磁盤後才傳回ACK
主從同步流程
- slave連接配接到master後,每隔5s向主伺服器發送commitLog檔案最大偏移量拉取還未同步的消息;
- master收到伺服器發送的偏移量進行解析,傳回查找出來的未同步的消息給slave;
- slave收到消息後,将消息寫入commitLog中,然後更新commmitLog拉去偏移量,接着繼續向master拉取未同步的消息。
故障轉移(Dledger)
Dledger基于raft協定共識算法, 實作commitLog的存儲,實作自動選舉,将commitLog都交給Dledger管理,Dledger在rocketmq-4.5.0之後的版本才有,需要開啟這個功能才能實作自動選舉
Dledger配置
# 是否啟動Dledger
enableDLegerCommitLog=true
# Dledger Raft Group名字
dLegerGroup=broker-a
# DLedger Group内各節點的位址和端口,至少需要3個節點
dLegerPeers=n0-192.168.2.101:10911;n1-192.168.2.102:10911;n2-192.168.2.103:10911;
# 本節點ID
dLegerSelfId=n0