RocketMQ分享
文章目錄
- RocketMQ分享
-
- 什麼是消息中間件,為什麼需要?
-
- 1. 系統解耦
- 2. 異步調用
- 3. 流量削峰
- RocketMq的基本架構
-
- 啟動的過程:
- 功能特性:
- 深入了解一下消息存儲的結構
- 對于中間件,我們肯定回考慮到的兩點,高可用和高并發性
-
-
- 首先,我們來說一下RocketMQ為什麼具有 `高可用性`
- 接下來,我們來讨論一下為什麼RocketMQ具有高并發性
-
- 最後,我們來講解一個比較感興趣的幾個點
-
- 1. RocketMQ如何實作分布式事務的?
- 2. RocketMq生産者、消費者如何實作的重試機制?
什麼是消息中間件,為什麼需要?
消息中間件
是利用高效可靠的消息傳遞機制進行異步的資料傳輸,并基于資料通信進行分布式系統的內建。(有種中間協調者的感覺)
為什麼需要?
1. 系統解耦
目前有一個場景,比如A系統産生了一些資料,B、C系統都需要,那麼A系統可以主動發送給B、C系統,但是,如果後續的D、E、F、G等都需要A的系統資料,負責A系統的同學都要将他們接入,那麼負責A系統的同學可能會不堪重負。是以我們需要對系統進行解耦,是以引入了消息中間件。引入之後的場景就變為,負責系統A的同學隻需要往消息中間件發送消息,至于哪些系統需要自行去消費就可以了。
原來:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9MGRPd3aE10MNRVT3V1MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL2UTOzQjNwAjM4ITMwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
現在:
2. 異步調用
傳統模式下:
現階段:
3. 流量削峰
如果突然大量請求湧進來,勢必會造成伺服器崩潰,那麼我們可以将請求寫入消息中間件,然後再慢慢進行消費,用有限的機器資源承載高并發請求。
例如我們的mysql資料庫打比方最多支援5000每秒的QPS,我們需要搞一個活動,QPS會直接提升到1W每秒的QPS,那我們我們需要怎麼做才能抵抗大流量。
同樣,有利就有弊,引入了消息中間件會帶來系統可用性降低(一旦消息中間件挂掉,那麼整個系統不可用)、系統複雜性增加(需要考慮消息丢失問題以及重複消費等問題)的問題。
目前市面上比較流行的消息中間件是: RabbitMq、Kakfa、RocketMq。這裡我們着重聊一聊RocketMq
RocketMq的基本架構
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-hJvq84c3-1628153983939)(/Users/zhuxin/Desktop/rocketmq.jpg)]
Producer:消息的釋出者
Consumer:消息的消費者
NameServer:Topic的路由注冊中心。兩個功能:1.Boker管理 2.路由資訊管理
BrokerServer:負責消息的存儲、投遞和查詢等功能,核心子產品
啟動的過程:
- 啟動NameServer,NameServer起來後監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
-
Broker啟動,跟所有的NameServer保持長連接配接,定時發送心跳包。心跳包中包含目前Broker資訊(IP+端口等)以及存儲所有Topic資訊。注冊成功後,NameServer叢集中就有Topic跟Broker的映射關系。
收發消息前,先建立Topic,建立Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動建立Topic。
- Producer發送消息,啟動時先跟NameServer叢集中的其中一台建立長連接配接,并從NameServer中擷取目前發送的Topic存在哪些Broker上,輪詢從隊列清單中選擇一個隊列,然後與隊列所在的Broker建立長連接配接進而向Broker發消息。
- Consumer跟Producer類似,跟其中一台NameServer建立長連接配接,擷取目前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連接配接通道,開始消費消息。
功能特性:
- 訂閱-釋出
-
消息順序: 按照發送的順序來消費,例如訂單的結算。順序消息也分為全局順序消息和分區順序消息:
分區順序是指某個partition下的所有消息都要保證順序。
全局順序是指某個Topic下的所有消息都要保證順序,其實就是分區順序的一種特例,一個topic隻有一個partition.
- 消息過濾: 消費者根據tag對于消息效率,減少無用的網絡傳輸
- 消息可靠性
- 至少一次: 每個消息至少發送一次。消費者在消費之後手動送出ack消息
- 回溯消息: 消息在消費之後會儲存一段時間,可以重新消費
- 事務消息(*): 通過事務消息達到分布式事務的最終一緻性
- 定時消息: 消息發送到broker之後,不會被立即消費,會在特定的時候投遞給指定的topic。比如電商裡,送出了一個訂單就可以發送一個延時消息,1h後去檢查這個訂單的狀态,如果還是未付款就取消訂單釋放庫存。
- 消息重試: Rocket為每個消費者組設定一個重試隊列
- 消息重投: 在消息量大,網絡抖動、消息重複的情況下,消息重投會導緻消息重複的問題
- 流量控制
- 死信隊列: 在消息重試之後達到重試的最大次數之後,會被放入另外一個隊列被存儲。
深入了解一下消息存儲的結構
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-GqHUP4up-1628153983947)(/Users/zhuxin/Desktop/rocketMQ消息存儲結構.png)]
消息結構主要是由2部分組成:
- CommitLog:消息主體以及中繼資料的存儲主體,存儲Producer端寫入的消息主體内容,消息内容不是定長的
-
ConsumeQueue:消息消費的隊列,儲存了指定Topic下的隊列消息在CommitLog中的起始實體偏移量offset,消息大小size和消息Tag的HashCode值。可以看做是CommitLog的索引檔案
之是以消息中間件的讀取速度較高,是因為他們使用到了頁緩存和記憶體映射,kafka也是如此.
頁緩存(PageCache)是OS對檔案的緩存,是從主記憶體分割出來的一塊區域,是以對檔案的讀寫速度和對主記憶體的讀寫速度差不多,對于資料的寫入,OS會先寫入至PageCache内,随後通過異步的方式由核心線程将Cache内的資料刷盤至實體磁盤上。對于資料的讀取,如果一次讀取檔案時出現未命中PageCache的情況,OS從實體磁盤上通路讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-E002cq6c-1628153983951)(/Users/zhuxin/Desktop/刷盤.png)]
(1) 同步刷盤:如上圖所示,隻有在消息真正持久化至磁盤後RocketMQ的Broker端才會真正傳回給Producer端一個成功的ACK響應。同步刷盤對MQ消息可靠性來說是一種不錯的保障,但是性能上會有較大影響,一般适用于金融業務應用該模式較多。
(2) 異步刷盤:能夠充分利用OS的PageCache的優勢,隻要消息寫入PageCache即可将成功的ACK傳回給Producer端。消息刷盤采用背景異步線程送出的方式進行,降低了讀寫延遲,提高了MQ的性能和吞吐量
對于中間件,我們肯定回考慮到的兩點,高可用和高并發性
首先,我們來說一下RocketMQ為什麼具有 高可用性
高可用性
什麼是高可用:指的是通過設計減少系統不能提供服務的時間。如果系統每運作100個時間機關,會有1個時間機關無法提供服務,我們說系統的可用性是99%。
在RocketMQ 4.5之後之前,rocketMQ的叢集是一主多從機制來實作高可用,brokerid=0 的辨別為master,而其他的辨別為slave。master具有讀寫的功能,而slave隻有讀的功能。但是這種機制存在主要的弊端是如果master挂掉之後,需要人為幹預重新開機或者切換,無法自動将其中的slave切換為Master.
為了解決故障轉移方面的問題,在RocketMQ 4.5之後引入了DLedger,使用raft算法來實作故障轉移。如果master當機之後,會采用raft算法自動将進行一個leader的選舉,算法可參照一個動畫。
接下來,我們來讨論一下為什麼RocketMQ具有高并發性
消息生産者:
- 本地緩存topic的路由資訊,同僚每隔30s從NameSpace更新路由。
- 消息發送通過負載均衡的算法選擇broker,避免單個伺服器壓力過大。
- 發送異常機制,重試和規避該broker,在某一時段内選擇其他的Broker
-
支援消息的批量發送
消息消費者:
-
消費者消費消息的時候進行負載均衡,由RebalanceService線程預設每隔20s進行一次消息隊列負載,根據目前消費組内消費者個數與主題隊列數量按照某一種負載算法進行隊列配置設定,配置設定原則為同一個消費者可以配置設定多個消息消費隊列,同一個消息消費隊列同一時間隻會配置設定給一個消費者。
服務端:
- 零拷貝技術,服務端的高并發讀寫主要利用Linux作業系統的PageCache特性,通過Java的MappedByteBuffer直接操作PageCache。MappedByteBuffer能直接将檔案映射到記憶體,通過順序寫盤(Commitlog),預讀資料來盡量命中PageCahe,進而大大減少磁盤IO。
- Commitlog,消息存儲檔案,将所有的Topic都存在Commitlog中,保證消息的順序性,采用建立索引的方式來大大壓縮搜尋資料的時間。
最後,我們來講解一個比較感興趣的幾個點
1. RocketMQ如何實作分布式事務的?
RocketMQ采用2PC的思想來實作送出事務消息的最終一緻性。
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-F6XTpngm-1628153983954)(/Users/zhuxin/Desktop/分布式事務.png)]
主要分為以下幾個步驟:
- 生産者發送半消息到MQ Server,暫時不能被投遞和消費
- MQ Server傳回成功接收
- 生産者執行本地事務
- 生産者向MQ Server發送送出還是復原
- 如果MQ Server沒有收到回報,則主動詢問生産者
- 生産者檢查本地事務狀态
- 如果送出,Consumer消費
- 如果復原,Consumer不消費
生産者:
@RequestMapping("sendTransaction")
public Object sendTransaction(@RequestParam(value = "msg",required = false)String msg){
String transactionId = UUID.randomUUID().toString();
org.springframework.messaging.Message<String> message = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).build();
//發送半消息
TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("topic03",message,transactionId);
//執行本地事務
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
//監聽發送
@RocketMQTransactionListener
public class MQLocalListener implements RocketMQLocalTransactionListener {
/**
* 用于執行本地事務
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
log.info("[執行本地事務]消息體參數: transactionId={}", transactionId);
//執行本地方法
try {
Thread.sleep(2000);
String payload = new String((byte[]) msg.getPayload());
if ("zzz".equals(payload)){
log.info("[復原本地事務]");
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (InterruptedException e) {
e.printStackTrace();
//異常,發送復原信号
return RocketMQLocalTransactionState.ROLLBACK;
}
//正常,發送送出信号
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 回查本地事務執行結果
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
MessageHeaders headers = msg.getHeaders();
String transactionId = headers.get(RocketMQHeaders.TRANSACTION_ID, String.class);
log.info("【回查本地事務】transactionId={}", transactionId);
//查詢日志記錄
// 如果存在則傳回 RocketMQLocalTransactionState.COMMIT
// 如果失敗則傳回 RocketMQLocalTransactionState.ROLLBACK
return RocketMQLocalTransactionState.COMMIT;
}
}
消費者:
public static void main(String[] args) throws MQClientException {
int i=0;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topic03","*");
consumer.setConsumerGroup("topic03-group");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
String msg = new String(msgs.get(0).getBody());
System.out.println("收到消息:"+msg);
// 标記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
因為rocketMQ實作的最終一緻性,是以,不管消費者有沒有消費成功,生産者都會送出自身的事務,如果消費者重試一直失敗,那麼隻能進行人工幹預,或者執行自定義的planB,這些都需要在消費者端自己實作。
2. RocketMq生産者、消費者如何實作的重試機制?
生産者重試機制:
一般發生Producer端的重試原因可能是網絡原因導緻發送消息到MQ失敗,或者是Broker挂掉等
接下來,我們看一下producer發送消息的源碼
// DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//省略
//檢視是否有該topic,若無直接報錯
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
//省略
//如果是同步發送的話,重試的次數等于1+設定的次數,異步的預設隻重試一次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//查找topic發送的隊列,若無則報錯
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
//省略
//發送消息,限定在逾時時間-已消耗的時間内發送完消息并得到回複
//這一部分大家可以直接檢視源碼,個人覺得不是重試機制的重點
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
//如果是同步,失敗則繼續重試
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
//這裡有一個重要的點,再網上其他部落格沒有提及,若沒有直接報錯且傳回值為SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,producer重試的必須要配置isRetryAnotherBrokerWhenNotStoreOK這個參數為true,預設是false,可以這樣設定 rocketMQTemplate.getProducer().setRetryAnotherBrokerWhenNotStoreOK(true);
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
//省略
//繼續重試
continue;
} catch (MQBrokerException e) {
//省略
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
//報錯
throw e;
}
} else {
break;
}
}
//省略
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
以上就是Producer端的重試機制,我們可以總結有幾個特點
- 同步發送可以指定重試次數,異步發送固定重試一次
- 中斷異常不會去重試
- 重試沒有間隔時間,是直接重試
消費者重試機制:
Consumer端的重試主要有兩種情況,
Exception
和
Timeout
。
// ConsumeMessageConcurrentlyService#ConsumeRequest#run()
public void run() {
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
//如果有消息消費前後的需要處理内容
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap<String, String>());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 監聽者開始消費消息
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
//省略
}
//省略
//對結果進行處理
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
//如果是廣播模式的話,就不用重試
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
//如果是叢集模式的話,就重試
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
//将所有消息挨個重試
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
//查詢broker的位址
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
//預設delayLevel為0,可以通過delayLevelWhenNextConsume進行設定
//設定請求逾時時間為5000s
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//如果重試第一次報錯,則直接從等級3開始重試,也就是等待30s
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
//發送重試消息
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
//接下來我們來看一下broker是怎樣處理重試消息的
//SendMessageProcessor#asyncProcessRequest
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
//重試消息
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//省略
//擷取消費者組的訂閱資訊
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
//省略
//建立主題,在原主題的上面新增加%RETRY%
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
//擷取分區隊列的Id
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0;
if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
//建立主題資訊
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
//省略
//擷取重試消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
//省略
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}
msgExt.setWaitStoreMsgOK(false);
int delayLevel = requestHeader.getDelayLevel();
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
//如果重試次數大于規定最大重試次數或者delayLevel小于0
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return CompletableFuture.completedFuture(response);
}
} else {
//設定延遲,從3開始
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
//省略
//重新發送消息到commitlog中
CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return putMessageResult.thenApply((r) -> {
if (r != null) {
switch (r.getPutMessageStatus()) {
case PUT_OK:
String backTopic = msgExt.getTopic();
String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (correctTopic != null) {
backTopic = correctTopic;
}
this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
default:
break;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(r.getPutMessageStatus().name());
return response;
}
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("putMessageResult is null");
return response;
});
}
總結一下消費者重試機制:
- 如果結果傳回
,那麼消息發送成功,無需重試。如果結果是CONSUME_SUCCESS
,此時所有消息都會發送給broker重試RECONSUME_LATER
- 消費者向 Broker 發送 ACK 消息,如果發生成功,重試機制由 broker 處理,如果發送 ack 消息失敗,則将該任務直接在消費者這邊,再次将本次消費任務
- 需要延遲執行的消息,在存入 commitlog 之前,會備份原先的主題(retry+消費組名稱)、與消費隊列ID,然後将主題修改為SCHEDULE_TOPIC_XXXX,會被延遲任務 ScheduleMessageService 延遲拉取。
- ScheduleMessageService 在執行過程中,會再次存入 commitlog 檔案中放入之前,會清空延遲等級,并恢複主題與隊列,這樣,就能被消費者所消費,因為消費者在啟動時就訂閱了該消費組的重試主題。
消息的重試機制保證了rocketmq的至少一次的特性。