高可用,叢集,存儲架構,順序消費,延遲隊列,事務消息...
上篇文章消息隊列那麼多,為什麼建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那麼,為啥又要搞一個RocketMQ出來呢?是重複造輪子嗎?本文我們就帶大家來詳細探讨RocketMQ究竟好在哪裡。
RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級别的容量和靈活的可擴充性。它是阿裡巴巴于2012年開源的第三代分布式消息中間件。
随着阿裡巴巴的電商業務不斷發展,需要一款更高性能的消息中間件,RocketMQ就是這個業務背景的産物。RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級别的容量和靈活的可擴充性,它是阿裡巴巴于2012年開源的第三代分布式消息中間件。RocketMQ經曆了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。值得一提的是,RocketMQ最初就是借鑒了Kafka進行改造開發而來的,是以熟悉Kafka的朋友,會發現RocketMQ的原理和Kafka有很多相似之處。
RocketMQ前身叫做MetaQ,在MeataQ釋出3.0版本的時候改名為RocketMQ,其本質上的設計思路和Kafka類似,因為最初就是基于Kafka改造而來,經過不斷的疊代與版本更新,2016年11月21日,阿裡巴巴向Apache軟體基金會捐贈了RocketMQ 。近年來被越來越多的國内企業使用。
本文帶大家從以下幾個方面詳細了解RocketMQ:
- RocketMQ如何保證消息存儲的可靠性?
- RocketMQ如何保證消息隊列服務的高可用?
- 如何建構一個高可用的RocketMQ雙主雙從最小叢集?
- RocketMQ消息是如何存儲的?
- RocketMQ是如何保證存取消息的效率的?
- 如何實作基于Message Key的高效查詢?
- 如何實作基于Message Id的高效查詢?
- RocketMQ的Topic在叢集中是如何存儲的?
- Broker自動建立Topic會有什麼問題?
- RocketMQ如何保證消息投遞的順序性?
- RocketMQ如何保證消息消費的順序性?
- 實作分布式事務的手段有哪些?
- RocketMQ如何實作事務消息?
- RocketMQ事務消息是如何存儲的?
1. RocketMQ技術架構
RocketMQ的架構主要分為四部分,如下圖所示:
-
:消息生産者,支援叢集方式部署;Producer
-
:消息消費者,支援叢集方式部署,支援pull,push模式擷取消息進行消費,支援叢集和廣播方式消費;Consumer
-
:Topic路由注冊中心,類似于Dubbo中的zookeeper,支援Broker的動态注冊與發現;NameServer
- 提供心跳檢測機制,檢查Broker是否存活;
- 接收Broker叢集的注冊資訊,作為路由資訊的基本資料;
- NameServier各個執行個體不互相進行通信,每個NameServer都儲存了一份完整的路由資訊,這與zookeeper有所差別,不用作複雜的節點資料同步與選主過程;
-
:主要負責消息的存儲、投遞和查詢,以及服務高可用保證。BrokerServer包含以下幾個重要的子子產品:BrokerServer
- Remoting Module:整個Broker的實體,負責處理來自clients端的請求;
- Client Manager:負責管理用戶端(Producer/Consumer)和維護Consumer的Topic訂閱資訊;
- StoreService:提供友善簡單的API接口處理消息存儲到實體硬碟和查詢功能;
- HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的資料同步功能;
- Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。
2. RocketMQ執行原理
RocketMQ執行原理如下圖所示:
- 首先,啟動每個NameServer節點,共同構成一個NameServer Cluster。NameServer啟動後,監聽端口,等待Broker、Producer、Consumer的連接配接;
- 然後啟動Broker的主從節點,這個時候Broker會與所有的NameServer建立并保持長連接配接,定時發送心跳包,把自己的資訊(IP+端口号)以及存儲的所有Topic資訊注冊到每個NameServer中。這樣NameServer叢集中就有Topic和Broker的映射關系了;
- 收發消息前,先建立Topic,建立Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動建立Topic,每個Topic預設會配置設定4個Queue;
- 啟動生産者,這個時候生産者會把資訊注冊到NameServer中,并且從NameServer擷取Broker伺服器,Queue等資訊;
- 啟動消費者,這個時候消費者會把資訊注冊到NameServer中,并且從NameServer擷取Broker伺服器,Queue等資訊;
- 生産者發送消息到Broker叢集中的時候,會從所有的Master節點的對應Topic中選擇一個Queue,然後與Queue所在的Broker建立長連接配接進而向Broker投遞消息。消息實際上是存儲在了CommitLog檔案中,而Queue檔案裡面存儲的實際是消息在CommitLog中的存儲位置資訊;
- 消費者從Broker叢集中消費消息的時候,會通過特定的負載均衡算法,綁定一個消息隊列進行消費;
- 消費者會定時(或者kill階段)把Queue的消費進度offset送出到Broker的consumerOffset.json檔案中記錄起來;
- 主節點和從節點之間可以是同步或者異步的進行資料複制,相關配置參數:
-
,可選值:brokerRole
-
:異步複制方式(異步雙寫),生産者寫入消息到Master之後,無需等到消息複制到Slave即可傳回,消息的複制由旁路線程進行異步複制;ASYNC_MASTER
-
:同步複制方式(同步雙寫),生産者寫入消息到Master之後,需要等到Slave複制成功才可以傳回。如果有多個Slave,隻需要有一個Slave複制成功,并成功應答,就算複制成功了。這裡是否持久化到磁盤依賴于另一個參數:SYNC_MASTER
;flushDiskType
-
:從節點SLAVE
-
-
3. RocketMQ叢集
本節我們來看看一個雙主雙從的RocketMQ是如何搭建的。
叢集配置參數說明:
在讨論叢集前,我們需要了解兩個關鍵的叢集配置參數:brokerRole,flushDiskType。brokerRole在前一節已經介紹了,而flushDiskType則是刷盤方式的配置,主要有:
- ASYNC_FLUSH: 異步刷盤
- SYNC_FLUSH: 同步刷盤
3.1 如何保證消息存儲的可靠性?
brokerRole确定了主從同步是異步的還是同步的,flushDiskType确定了資料刷盤的方式是同步的還是異步的。
如果業務場景對消息丢失容忍度很低,可以采用SYNC_MASTER + ASYNC_FLUSH的方式,這樣隻有master和slave在刷盤前同時挂掉,消息才會丢失,也就是說即使有一台機器出故障,仍然能保證資料不丢;
如果業務場景對消息丢失容忍度比較高,則可以采用ASYNC_MASTER + ASYNC_FLUSH的方式,這樣可以盡可能的提高消息的吞吐量。
3.2 如何保證消息隊列服務的高可用?
消費端的高可用
Master Broker支援讀和寫,Slave Broker隻支援讀。
當Master不可用的時候,Consumer會自動切換到Slave進行讀,也就是說,當Master節點的機器出現故障後,Consumer仍然可以從Slave節點讀取消息,不影響消費端的消費程式。
生産端的高可用
在RocketMQ中,機器的主從節點關系是提前配置好的,沒有類似Kafka的Master動态選主功能。
- brokerName: broker的名稱,需要把Master和Slave節點配置成相同的名稱,表示他們的主從關系,相同的brokerName的一組broker,組成一個broker組;
- brokerId: broker的id,0表示Master節點的id,大于0表示Slave節點的id。
如果一個Master當機了,要讓生産端程式繼續可以生産消息,您需要部署多個Master節點,組成多個broker組。這樣在建立Topic的時候,就可以把Topic的不同消息隊列分布在多個broker組中,即使某一個broker組的Master節點不可用了,其他組的Master節點仍然可用,保證了Producer可以繼續發送消息。
3.3 如何建構一個高可用的RocketMQ雙主雙從最小叢集?
為了盡可能的保證
消息不丢失
,并且保證生産者和消費者的
可用性
,我們可以建構一個雙主雙從的叢集,搭建的架構圖如下所示:
部署架構說明:
- 兩個Broker組,保證了其中一個Broker組的Master節點挂掉之後,另一個Master節點仍然可以接受某一個Topic的消息投遞;
- 主從同步采用SYNC_MASTER,保證了生産者寫入消息到Master之後,需要等到Slave也複制成功,才傳回消息投遞成功。這樣即使主節點或者從節點挂掉了,也不會導緻丢資料;
- 由于主節點有了從節點做備份,是以,落盤政策可以使用ASYNC_FLUSH,進而盡可能的提高消息的吞吐量;
- 如果隻提供兩台伺服器,要部署這個叢集的情況下,可以把Broker Master1和Broker Slave2部署在一台機器,Broker Master2和Broker Slave1部署在一台機器。
關鍵配置參數
以下是關鍵的配置參數:
Broker Master1
# NameServer位址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 叢集名稱
brokerClusterName=itzhai-com-cluster
# brokerIP位址
brokerIP1=192.168.1.100
# broker通信端口
listenPort=10911
# broker名稱
brokerName=broker‐1
# 0表示主節點
brokerId=0
# 2點進行消息删除
deleteWhen=02
# 消息在磁盤上保留48小時
fileReservedTime=48
# 主從同步複制
brokerRole=SYNC_MASTER
# 異步刷盤
flushDiskType=ASYNC_FLUSH
# 自動建立Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐m
Broker Slave1
# NameServer位址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 叢集名稱
brokerClusterName=itzhai-com-cluster
# brokerIP位址
brokerIP1=192.168.1.101
# broker通信端口
listenPort=10911
# broker名稱
brokerName=broker‐1
# 非0表示從節點
brokerId=1
# 2點進行消息删除
deleteWhen=02
# 消息在磁盤上保留48小時
fileReservedTime=48
# 從節點
brokerRole=SLAVE
# 異步刷盤
flushDiskType=ASYNC_FLUSH
# 自動建立Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐s
Broker Master2
# NameServer位址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 叢集名稱
brokerClusterName=itzhai-com-cluster
# brokerIP位址
brokerIP1=192.168.1.102
# broker通信端口
listenPort=10911
# broker名稱
brokerName=broker‐2
# 0表示主節點
brokerId=0
# 2點進行消息删除
deleteWhen=02
# 消息在磁盤上保留48小時
fileReservedTime=48
# 主從同步複制
brokerRole=SYNC_MASTER
# 異步刷盤
flushDiskType=ASYNC_FLUSH
# 自動建立Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐m
Broker Slave2
# NameServer位址
namesrvAddr=192.168.1.100:9876;192.168.1.101:9876
# 叢集名稱
brokerClusterName=itzhai-com-cluster
# brokerIP位址
brokerIP1=192.168.1.103
# broker通信端口
listenPort=10911
# broker名稱
brokerName=broker‐2
# 非0表示從節點
brokerId=1
# 2點進行消息删除
deleteWhen=02
# 消息在磁盤上保留48小時
fileReservedTime=48
# 從節點
brokerRole=SLAVE
# 異步刷盤
flushDiskType=ASYNC_FLUSH
# 自動建立Topic
autoCreateTopicEnable=true
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐s
寫了那麼多頂層架構圖,不寫寫底層内幕,就不是IT宅(itzhai.com)的文章風格,接下來,我們就來看看底層存儲架構。
4. RocketMQ存儲架構
我們在
broker.conf
檔案中配置了消息存儲的根目錄:
# 消息存儲根目錄
storePathRootDir=/data/rocketmq/store‐m
進入這個目錄,我們可以發現如下的目錄結構:
其中:
- abort:該檔案在broker啟動時建立,關閉時删除,如果broker異常退出,則檔案會存在,在下次啟動時會走修複流程;
- checkpoint:檢查點,主要存放以下内容:
- physicMsgTimestamp:commitlog檔案最後一次落盤時間;
- logicsMsgTimestamp:consumequeue最後一次落盤時間;
- indexMsgTimestamp:索引檔案最後一次落盤時間;
- commitlog:存放消息的完整内容,所有的topic消息都會通過檔案追加的形式寫入到該檔案中;
- config:消息隊列的配置檔案,包括了topic配置,消費的偏移量等資訊。其中consumerOffset.json檔案存放消息隊列消費的進度;
- consumequeue:topic的邏輯隊列,在消息存放到commitlog之後,會把消息的存放位置記錄到這裡,隻有記錄到這裡的消息,才能被消費者消費;
- index:消息索引檔案,通過Message Key查詢消息時,是通過該檔案進行檢索查詢的。
4.1 RocketMQ消息是如何存儲的
下面我們來看看關鍵的commitlog以及consumequeue:
消息投遞到Broker之後,是先把實際的消息内容存放到CommitLog中的,然後再把消息寫入到對應主題的ConsumeQueue中。其中:
CommitLog:消息的實體存儲檔案,存儲實際的消息内容。每個Broker上面的CommitLog被該Broker上所有的ConsumeQueue共享。
單個檔案大小預設為1G,檔案名長度為20位,左邊補零,剩餘為起始偏移量。預配置設定好空間,消息順序寫入日志檔案。當檔案滿了,則寫入下一個檔案,下一個檔案的檔案名基于檔案第一條消息的偏移量進行命名;
ConsumeQueue:消息的邏輯隊列,相當于CommitLog的索引檔案。RocketMQ是基于Topic主題訂閱模式實作的,每個Topic下會建立若幹個邏輯上的消息隊列ConsumeQueue,在消息寫入到CommitLog之後,通過Broker的背景服務線程(ReputMessageService)不停地分發請求并異步建構ConsumeQueue和IndexFile(索引檔案,後面介紹),然後把每個ConsumeQueue需要的消息記錄到各個ConsumeQueue中。
ConsumeQueue主要記錄8個位元組的commitLogOffset(消息在CommitLog中的實體偏移量), 4個位元組的msgSize(消息大小), 8個位元組的TagHashcode,每個元素固定20個位元組。
ConsumeQueue相當于CommitLog檔案的索引,可以通過ConsumeQueue快速從很大的CommitLog檔案中快速定位到需要的消息。
ConsumeQueue的存儲結構
主題消息隊列:在consumequeue目錄下,按照topic的次元存儲消息隊列。
重試消息隊列:如果topic中的消息消費失敗,則會把消息發到重試隊列,重新隊列按照消費端的GroupName來分組,命名規則:
%RETRY%ConsumerGroupName
死信消息隊列:如果topic中的消息消費失敗,并且超過了指定重試次數之後,則會把消息發到死信隊列,死信隊列按照消費端的GroupName來分組,命名規則:
%DLQ%ConsumerGroupName
假設我們現在有一個topic:
itzhai-test
,消費分組:
itzhai_consumer_group
,當消息消費失敗之後,我們檢視consumequeue目錄,會發現多處了一個重試隊列:
我們可以在RocketMQ的控制台看到這個重試消息隊列的主題和消息:
如果一直重試失敗,達到一定次數之後(預設是16次,重試時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),就會把消息投遞到死信隊列:
4.2 RocketMQ是如何保證存取消息的效率的
4.2.1 如何保證高效寫
每條消息的長度是不固定的,為了提高寫入的效率,RocketMQ預先配置設定好1G空間的CommitLog檔案,采用順序寫的方式寫入消息,大大的提高寫入的速度。
RocketMQ中消息刷盤主要可以分為同步刷盤和異步刷盤兩種,通過flushDiskType參數進行配置。如果需要提高寫消息的效率,降低延遲,提高MQ的性能和吞吐量,并且不要求消息資料存儲的高可靠性,可以把刷盤政策設定為異步刷盤。
4.2.2 如何保證高效讀
為了提高讀取的效率,RocketMQ使用ConsumeQueue作為消費消息的索引,使用IndexFile作為基于消息key的查詢的索引。下面來詳細介紹下。
4.2.2.1 ConsumeQueue
讀取消息是随機讀的,為此,RocketMQ專門建立了ConsumeQueue索引檔案,每次先從ConsumeQueue中擷取需要的消息的位址,消息大小,然後從CommitLog檔案中根據位址直接讀取消息内容。在讀取消息内容的過程中,也盡量利用到了作業系統的頁緩存機制,進一步加速讀取速度。
ConsumeQueue由于每個元素大小是固定的,是以可以像通路數組一樣通路每個消息元素。并且占用空間很小,大部分的ConsumeQueue能夠被全部載入記憶體,是以這個索引查找的速度很快。每個ConsumeQueue檔案由30w個元素組成,占用空間在6M以内。每個檔案預設大小為600萬個位元組,當一個ConsumeQueue類型的檔案寫滿之後,則寫入下一個檔案。
4.2.2.2 IndexFile為什麼按照Message Key查詢效率高?
我們在RocketMQ的store目錄中可以發現有一個index目錄,這個是一個用于輔助提高查詢消息效率的索引檔案。通過該索引檔案實作基于消息key來查詢消息的功能。
實體存儲結構
IndexFile索引檔案實體存儲結構如下圖所示:
- Header:索引頭檔案,40 bytes,包含以下資訊:
-
:索引檔案中第一個索引消息存入Broker的時間戳;beginTimestamp
-
:索引檔案中最後一個索引消息存入Broker的時間戳endTimestamp
-
:索引檔案中第一個索引消息在CommitLog中的偏移量;beginPHYOffset
-
:索引檔案中最後一個索引消息在CommitLog中的偏移量;endPhyOffset
-
:建構索引使用的slot數量;hashSlotCount
-
:索引的總數;indexCount
-
- Slot Table:槽位表,類似于Redis的Slot,或者哈希表的key,使用消息的key的hashcode與slotNum取模可以得到具體的槽的位置。每個槽位占4 bytes,一個IndexFile可以存儲500w個slot;
- Index Linked List:消息的索引内容,如果哈希取模後發生槽位碰撞,則建構成連結清單,一個IndexFile可以存儲2000w個索引:
-
:消息的哈希值;Key Hash
-
:消息在CommitLog中的偏移量;Commit Log Offset
-
:消息存儲的時間戳;Timestamp
-
:下一個索引的位置,如果消息取模後發生槽位槽位碰撞,則通過此字段把碰撞的消息構成連結清單。Next Index Offset
-
每個IndexFile檔案的大小:40b + 4b * 5000000 + 20b * 20000000 = 420000040b,約為400M。
邏輯存儲結構
IndexFile索引檔案的邏輯存儲結構如下圖所示:
IndexFile邏輯上是基于哈希表來實作的,Slot Table為哈希鍵,Index Linked List中存儲的為哈希值。
4.2.2.3 為什麼按照MessageId查詢效率高?
RocketMQ中的MessageId的長度總共有16位元組,其中包含了:消息存儲主機位址(IP位址和端口),消息Commit Log offset。“
按照MessageId查詢消息的流程:Client端從MessageId中解析出Broker的位址(IP位址和端口)和Commit Log的偏移位址後封裝成一個RPC請求後通過Remoting通信層發送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息傳回。
4.3 RocketMQ叢集是如何做資料分區的?
我們繼續看看在叢集模式下,RocketMQ的Topic資料是如何做分區的。IT宅(itzhai.com)提醒大家,實踐出真知。這裡我們部署兩個Master節點:
4.3.1 RocketMQ的Topic在叢集中是如何存儲的
我們通過手動配置每個Broker中的Topic,以及ConsumeQueue數量,來實作Topic的資料分片,如,我們到叢集中手動配置這樣的Topic:
-
建立broker-a
,4個隊列;itzhai-com-test-1
-
broker-b
,2個隊列。itzhai-com-test-1
建立完成之後,Topic分片叢集分布如下:
即:
可以發現,RocketMQ是把Topic分片存儲到各個Broker節點中,然後在把Broker節點中的Topic繼續分片為若幹等分的ConsumeQueue,進而提高消息的吞吐量。ConsumeQueue是作為負載均衡資源配置設定的基本單元。
這樣把Topic的消息分區到了不同的Broker上,進而增加了消息隊列的數量,進而能夠支援更塊的并發消費速度(隻要有足夠的消費者)。
4.3.2 Broker自動建立Topic會有什麼問題?
假設設定為通過Broker自動建立Topic(autoCreateTopicEnable=true),并且Producer端設定Topic消息隊列數量設定為4,也就是預設值:
producer.setDefaultTopicQueueNums(4);
嘗試往一個新的 topic
itzhai-test-queue-1
連續發送10條消息,發送完畢之後,檢視Topic狀态:
我們可以發現,在兩個broker上面都建立了
itzhai-test-queue-a
,并且每個broker上的消息隊列數量都為4。怎麼回事,我配置的明明是期望建立4個隊列,為什麼加起來會變成了8個?如下圖所示:
由于時間關系,本文我們不會帶大家從源碼方面去解讀為啥會出現這種情況,接下來我們通過一種更加直覺的方式來驗證下這個問題:繼續做實驗。
我們繼續嘗試往一個新的 topic
itzhai-test-queue-10
發送1條消息,注意,這一次不做并發發送了,隻發送一條,發送完畢之後,檢視Topic狀态:
可以發現,這次建立的消息隊列數量又是對的了,并且都是在broker-a上面建立的。接下來,無論怎麼并發發送消息,消息隊列的數量都不會繼續增加了。
其實這也是并發請求Broker,觸發自動建立Topic的bug。
為了更加嚴格的管理Topic的建立和分片配置,一般在生産環境都是配置為手動建立Topic,通過送出運維工單申請建立Topic以及Topic的資料配置設定。
接下來我們來看看RocketMQ的特性。更多其他技術的底層架構内幕分析,請通路我的部落格IT宅(itzhai.com)或者關注Java架構雜談公衆号。
5. RocketMQ特性
5.1 生産端
5.1.1 消息釋出
RocketMQ中定義了如下三種消息通信的方式:
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
-
:同步發送,生産端會阻塞等待發送結果;SYNC
- 應用場景:這種方式應用場景非常廣泛,如重要業務事件通知。
-
:異步發送,生産端調用發送API之後,立刻傳回,在拿到Broker的響應結果後,觸發對應的SendCallback回調;ASYNC
- 應用場景:一般用于鍊路耗時較長,對 RT 較為敏感的業務場景;
-
:單向發送,發送方隻負責發送消息,不等待伺服器回應且沒有回調函數觸發,即隻發送請求不等待應答。 此方式發送消息的過程耗時非常短,一般在微秒級别;ONEWAY
- 應用場景:适用于耗時非常短,對可靠性要求不高的場景,如日志收集。
SYNC和ASYNC關注發送結果,ONEWAY不關注發送結果。發送結果如下:
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
-
:消息發送成功。SEND_OK并不意味着投遞是可靠的,要確定消息不丢失,需要開啟SYNC_MASTER同步或者SYNC_FLUSH同步寫;SEND_OK
-
:消息發送成功,但是刷盤逾時。如果Broker的flushDiskType=SYNC_FLUSH,并且5秒内沒有完成消息的刷盤,則會傳回這個狀态;FLUSH_DISK_TIMEOUT
-
:消息發送成功,但是伺服器同步到Slave時逾時。如果Broker的brokerRole=SYNC_MASTER,并且5秒内沒有完成同步,則會傳回這個狀态;FLUSH_SLAVE_TIMEOUT
-
:消息發送成功,但是無可用的Slave節點。如果Broker的brokerRole=SYNC_MASTER,但是沒有發現SLAVE節點或者SLAVE節點挂掉了,那麼會傳回這個狀态。SLAVE_NOT_AVAILABLE
源碼内容更精彩,歡迎大家進一步閱讀源碼詳細了解消息發送的内幕:
- 同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
- 異步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
- 單向發送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)
5.1.2 順序消費
消息的有序性指的是一類消息消費的時候,可以按照發送順序來消費,比如:在
Java架構雜談
茶餐廳吃飯産生的消息:進入餐廳、點餐、下單、上菜、付款,消息要按照這個順序消費才有意義,但是多個顧客産生的消息是可以并行消費的。順序消費又分為全局順序消費和分區順序消費:
-
:同一個Topic下的消息,所有消息按照嚴格的FIFO順序進行釋出和消費。适用于:性能要求不高,所有消息嚴格按照FIFO進行釋出和消費的場景;全局順序
-
:同一個Topic下,根據消息的特定業務ID進行sharding key分區,同一個分區内的消息按照嚴格的FIFO順序進行釋出和消費。适用于:性能要求高,在同一個分區中嚴格按照FIFO進行釋出和消費的場景。分區順序
一般情況下,生産者是會以輪訓的方式把消息發送到Topic的消息隊列中的:
在同一個Queue裡面,消息的順序性是可以得到保證的,但是如果一個Topic有多個Queue,以輪訓的方式投遞消息,那麼就會導緻消息亂序了。
為了保證消息的順序性,需要把保持順序性的消息投遞到同一個Queue中。
5.1.2.1 如何保證消息投遞的順序性
RocketMQ提供了
MessageQueueSelector
接口,可以用來實作自定義的選擇投遞的消息隊列的算法:
for (int i = 0; i < orderList.size(); i++) {
String content = "Hello itzhai.com. Java架構雜談," + new Date();
Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i,
content.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
// 訂單号與消息隊列個數取模,保證讓同一個訂單号的消息落入同一個消息隊列
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());
System.out.printf("content: %s, sendResult: %s%n", content, sendResult);
}
如上圖,我們實作了
MessageQueueSelector
接口,并在實作的select方法裡面,指定了選擇消息隊列的算法:訂單号與消息隊列個數取模,保證讓同一個訂單号的消息落入同一個消息隊列:
有個異常場景需要考慮:假設某一個Master節點挂掉了,導緻Topic的消息隊列數量發生了變化,那麼繼續使用以上的選擇算法,就會導緻在這個過程中同一個訂單的消息會分散到不同的消息隊列裡面,最終導緻消息不能順序消費。
為了避免這種情況,隻能選擇犧牲failover特性了。
現在投遞到消息隊列中的消息保證了順序,那如何保證消費也是順序的呢?
5.1.2.2 如何保證消息消費的順序性?
RocketMQ中提供了
MessageListenerOrderly
,該對象用于有順序收異步傳遞的消息,一個隊列對應一個消費線程,使用方法如下:
consumer.registerMessageListener(new MessageListenerOrderly() {
// 消費次數,用于輔助模拟各種消費結果
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
如果您使用的是
MessageListenerConcurrently
,表示并發消費,為了保證消息消費的順序性,需要設定為單線程模式。
使用 MessageListenerOrderly
的問題:如果遇到某條消息消費失敗,并且無法跳過,那麼消息隊列的消費進度就會停滞。
5.1.3 延遲隊列(定時消息)
定時消費是指消息發送到Broker之後不會立即被消費,而是等待特定的時間之後才投遞到Topic中。定時消息會暫存在名為
SCHEDULE_TOPIC_XXXX
的topic中,并根據delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一個queue隻存相同延遲的消息,保證具有相同延遲的消息能夠順序消費。比如,我們設定1秒後把消息投遞到
topic-itzhai-com
topic,則存儲的檔案目錄如下所示:
Broker會排程地消費SCHEDULE_TOPIC_XXXX,将消息寫入真實的topic。
定時消息的副作用:定時消息會在第一次寫入Topic和排程寫入實際的topic都會進行計數,是以發送數量,tps都會變高。
使用延遲隊列的場景:送出了訂單之後,如果等待超過約定的時間還未支付,則把訂單設定為逾時狀态。
RocketMQ提供了以下幾個固定的延遲級别:
public class MessageStoreConfig {
...
// 10個level,level:1~18
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
...
}
level = 0 表示不使用延遲消息。
另外,消息消費失敗也會進入延遲隊列,消息發送時間與設定的延遲級别和重試次數有關。
以下是發送延遲消息的代碼:
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 指定該消息在10秒後被消費者消費
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}
5.1.4 資料完整性與事務消息
通過消息對系統進行解耦之後,勢必會遇到分布式系統資料完整性的問題。
5.1.4.1 實作分布式事務的手段有哪些?
我們可以通過以下手段解決分布式系統資料最終一緻性問題:
- 資料庫層面的
,二階段送出,同步阻塞,效率低下,存在協調者單點故障問題,極端情況下存在資料不一緻的風險。對應技術上的XA、JTA/JTS。這是分布式環境下事務處理的典型模式;2PC(Two-phase commit protocol)
-
,三階段送出,引入了參與者逾時機制,增加了預送出階段,使得故障恢複之後協調者的決策複雜度降低,但整體的互動過程變得更長了,性能有所下降,仍舊會存在資料不一緻的問題;3PC
- 業務層面的TCC ,
。對業務的侵入較大,和業務緊耦合,對于每一個操作都需要定義三個動作分别對應:Try - Confirm - Cancel
,将資源層的兩階段送出協定轉換到業務層,成為業務模型中的一部分;Try - Confirm - Cancel
- 本地消息表;
- 事務消息;
RocketMQ事務消息(Transactional Message)則是通過事務消息來實作分布式事務的最終一緻性。下面看看RocketMQ是如何實作事務消息的。
5.1.4.2 RocketMQ如何實作事務消息?
如下圖:
事務消息有兩個流程:
- 事務消息發送及送出:
- 發送half消息;
- 服務端響應half消息寫入結果;
- 根據half消息的發送結果執行本地事務。如果發送失敗,此時half消息對業務不可見,本地事務不執行;
- 根據本地事務狀态執行Commit或者Rollback。Commit操作會觸發生成ConsumeQueue索引,此時消息對消費者可見;
-
補償流程:
5. 對于沒有Commit/Rollback的事務消息,會處于pending狀态,這對這些消息,MQ Server發起一次回查;
6. Producer收到回查消息,檢查回查消息對應的本地事務的轉塔體;
7. 根據本地事務狀态,重新執行Commit或者Rollback。
補償階段主要用于解決消息的Commit或者Rollback發生逾時或者失敗的情況。
half消息:并不是發送了一半的消息,而是指消息已經發送到了MQ Server,但是該消息未收到生産者的二次确認,此時該消息暫時不能投遞到具體的ConsumeQueue中,這種狀态的消息稱為half消息。
5.1.4.3 RocketMQ事務消息是如何存儲的?
發送到MQ Server的half消息對消費者是不可見的,為此,RocketMQ會先把half消息的Topic和Queue資訊存儲到消息的屬性中,然後把該half消息投遞到一個專門的處理事務消息的隊列中:
RMQ_SYS_TRANS_HALF_TOPIC
,由于消費者沒有訂閱該Topic,是以無法消息half類型的消息。
生産者執行Commit half消息的時候,會存儲一條專門的Op消息,用于辨別事務消息已确定的狀态,如果一條事務消息還沒有對應的Op消息,說明這個事務的狀态還無法确定。RocketMQ會開啟一個定時任務,對于pending狀态的消息,會先向生産者發送回查事務狀态請求,根據事務狀态來決定是否送出或者復原消息。
當消息被标記為Commit狀态之後,會把half消息的Topic和Queue相關屬性還原為原來的值,最終建構實際的消費索引(ConsumeQueue)。
RocketMQ并不會無休止的嘗試消息事務狀态回查,預設查找15次,超過了15次還是無法擷取事務狀态,RocketMQ預設復原該消息。并列印錯誤日志,可以通過重寫AbstractTransactionalMessageCheckListener類修改這個行為。
可以通過Broker的配置參數:transactionCheckMax來修改此值。
5.1.5 消息重投
如果消息釋出方式是同步發送會重投,如果是異步發送會重試。
消息重投可以盡可能保證消息投遞成功,但是可能會造成消息重複。
什麼情況會造成重複消費消息?
- 出現消息量大,網絡抖動的時候;
- 生産者主動重發;
- 消費負載發生變化。
可以使用的消息重試政策:
-
:設定同步發送失敗的重投次數,預設為2。是以生産者最多會嘗試發送retryTimesWhenSendFailed+1次。retryTimesWhenSendFailed
- 為了最大程度保證消息不丢失,重投的時候會嘗試向其他broker發送消息;
- 超過重投次數,抛出異常,讓用戶端自行處理;
- 觸發重投的異常:RemotingException、MQClientException和部分MQBrokerException;
-
:設定異步發送失敗重試次數,異步重試不會選擇其他Broker,不保證消息不丢失;retryTimesWhenSendAsyncFailed
-
:消息刷盤(主或備)逾時或slave不可用(傳回狀态非SEND_OK),是否嘗試發送到其他broker,預設false。重要的消息可以開啟此選項。retryAnotherBrokerWhenNotStoreOK
oneway釋出方式不支援重投。
5.1.6 批量消息
為了提高系統的吞吐量,提高發送效率,可以使用批量發送消息。
批量發送消息的限制:
- 同一批批量消息的topic,waitStoreMsgOK屬性必須保持一緻;
- 批量消息不支援延遲隊列;
- 批量消息一次課發送的上限是4MB。
發送批量消息的例子:
String topic = "itzhai-test-topic";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));
producer.send(messages);
如果發送的消息比較多,會增加複雜性,為此,可以對大消息進行拆分。以下是拆分的例子:
public class ListSplitter implements Iterator<List<Message>> {
// 限制最大大小
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
private int getStartIndex() {
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT) {
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
}
return currIndex;
}
private int calcMessageSize(Message message) {
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
return tmpSize;
}
}
// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
// handle the error
}
}
5.1.7 消息過濾
RocketMQ的消費者可以根據Tag進行消息過濾來擷取自己感興趣的消息,也支援自定義屬性過濾。
Tags是Topic下的次級消息類型/二級類型(注:Tags也支援
TagA || TagB
這樣的表達式),可以在同一個Topic下基于Tags進行消息過濾。
消息過濾是在Broker端實作的,減少了對Consumer無用消息的網絡傳輸,缺點是增加了Broker負擔,實作相對複雜。
5.2 消費端
5.2.1 消費模型
消費端有兩周消費模型:叢集消費和廣播消費。
叢集消費
叢集消費模式下,相同Consumer Group的每個Consumer執行個體平均分攤消息。
廣播消費
廣播消費模式下,相同Consumer Group的每個Consumer執行個體都接收全量的消息。
5.2.2 消息重試
RocketMQ會為每個消費組都設定一個Topic名稱為
%RETRY%consumerGroupName
的重試隊列(這裡需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設定的),用于暫時儲存因為各種異常而導緻Consumer端無法消費的消息。
考慮到異常恢複起來需要一些時間,會為重試隊列設定多個重試級别,每個重試級别都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。
RocketMQ對于重試消息的處理是先儲存至Topic名稱為
SCHEDULE_TOPIC_XXXX
的延遲隊列中,背景定時任務按照對應的時間進行Delay後重新儲存至
%RETRY%consumerGroupName
的重試隊列中。
比如,我們設定1秒後把消息投遞到
topic-itzhai-com
5.2.3 死信隊列
當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正确地消費該消息,此時,消息隊列不會立刻将消息丢棄,而是将其發送到該消費者對應的特殊隊列中。
RocketMQ将這種正常情況下無法被消費的消息稱為
死信消息(Dead-Letter Message)
,将存儲死信消息的特殊隊列稱為
死信隊列(Dead-Letter Queue)
。
在RocketMQ中,可以通過使用console控制台對死信隊列中的消息進行重發來使得消費者執行個體再次進行消費。
由于RocketMQ是使用Java寫的,是以它的代碼特别适合拿來閱讀消遣,我們繼續來看看RocketMQ的源碼結構...
不不,還是算了,一下子又到周末晚上了,時間差不多了,今天就寫到這裡了。有空再聊。
我精心整理了一份Redis寶典給大家,涵蓋了Redis的方方面面,面試官懂的裡面有,面試官不懂的裡面也有,有了它,不怕面試官連環問,就怕面試官一上來就問你Redis的Redo Log是幹啥的?畢竟這種問題我也不會。
在
Java架構雜談
公衆号發送
Redis
關鍵字擷取pdf檔案:
本文作者: arthinking
部落格連結: https://www.itzhai.com/articles/deep-understanding-of-rocketmq.html
高并發異步解耦利器:RocketMQ究竟強在哪裡?
版權聲明: 版權歸作者所有,未經許可不得轉載,侵權必究!聯系作者請加公衆号。
References
apache/rocketmq. Retrieved from https://github.com/apache/rocketmq
Java架構雜談