天天看點

一文看懂RocketMQ生産者發送消息源碼解析(下)sendKernelImpl()總結面試場景快問快答

sendKernelImpl()

建構發送消息的    

請求頭部 RequestHeader

一文看懂RocketMQ生産者發送消息源碼解析(下)sendKernelImpl()總結面試場景快問快答

上下文SendMessageContext

一文看懂RocketMQ生産者發送消息源碼解析(下)sendKernelImpl()總結面試場景快問快答

然後調用方法MQClientAPIImpl#sendMessage(),将消息發送給隊列所在Broker。

一文看懂RocketMQ生産者發送消息源碼解析(下)sendKernelImpl()總結面試場景快問快答

至此,消息被發送給遠端調用的封裝類MQClientAPIImpl,完成後續序列化和網絡傳輸等步驟。

RocketMQ的Producer無論同步還是異步發送消息,都統一到了同一流程。

異步發送消息的實作,也是通過一個線程池,在異步線程執行的調用和同步發送相同的底層方法來實作的。

方法的一個參數區分同步or異步發送

這使得整個流程統一,很多同步異步代碼可複用,代碼結構清晰簡單,易維護。

使用同步發送,目前線程會阻塞等待服務端的響應,直到收到響應或者逾時方法才會傳回,是以在業務代碼調用同步發送的時候,隻要傳回成功,消息就一定發送成功了。

而異步發送,發送的邏輯都是在Executor的異步線程中執行的,是以不會阻塞目前線程,當服務端傳回響應或者逾時之後,Producer會調用Callback方法來給業務代碼傳回結果。業務代碼需要在Callback中來判斷發送結果。

總結

本文分析了RocketMQ用戶端消息生産的實作過程,包括Producer初始化和發送消息的主流程。Producer中包含的幾個核心的服務都是有狀态的,在Producer啟動時,由MQClientInstance類中來統一啟動。

在發送消息的流程中,RocketMQ分了三種發送方式:

  1. 單向
  2. 同步
  3. 異步

這三種發送方式對應的發送流程基本相同,同步和異步發送由已封裝好的MQClientAPIImpl類分别實作。

面試場景快問快答

DefaultMQProducer有個屬性defaultTopicQueueNums,它是用來設定topic的ConsumeQueue的數量的嗎?有同學可能認為consumeQueue的數量是建立topic的時候指定的,跟producer沒有關系,那這參數有什麼用呢?

這參數是控制用戶端在生産消費的時候會通路同一個主題的隊列數量,假設一個主題有100個隊列,對每個用戶端,它沒必要100個隊列都通路,隻需使用其中幾個隊列。

在RocketMq的控制台上可以建立topic,需要指定writeQueueNums,readQueueNums,perm,這三個參數是有什麼用呢?這裡為什麼要區分寫跟讀隊列呢?不應該隻有一個consumeQueue?

writeQueueNums和readQueueNums是在服務端來控制每個用戶端在生産和消費的時候,分别通路多少個隊列。這兩參數是服務端參數,優先級高于用戶端控制的參數defaultTopicQueueNums的。perm是設定Topic讀寫等權限的參數。

使用者請求–>異步處理—>使用者收到響應結果。異步處理的作用是:用更少的線程來接收更多的使用者請求,然後異步處理業務邏輯。異步處理完後,如何将結果通知給原先的使用者呢?即使有回調接口,我了解也是給使用者發個短信之類的處理,那結果怎麼傳回到定位到使用者,并傳回之前請求的頁面上呢?需要讓之前的請求線程阻塞嗎?那也無法達到【用更少的線程來接收更多的使用者請求】的目的丫。

如果局限于:“APP/浏覽器 --[http協定]–>web 服務”這樣的場景,受限于http協定,前端和web服務的互動一定是單向和同步的。一定要等待結果然後傳回響應,但是,這種情況仍然可以使用異步方法。像spring web這種架構,它把處理web請求都給你封裝好了,你隻要寫個handler很友善。但這handler隻能是一個同步方法,它必須在傳回值中給出響應結果,是以導緻很多同學思維轉不過來。