2.2 Broker端如何處理事務消息?
SendMessageProcessor#asyncSendMessage
跟進去看看真正處理半消息的業務邏輯,這段處理邏輯在類TransactionalMessageBridge
- putHalfMessage
- parseHalfMessageInner
- RocketMQ并非将事務消息儲存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 後,把這個事務消息儲存在
設計思想
特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC
序号為 0 的 queue
這套 topic 和 queue 對消費者不可見,是以裡面的消息也永遠不會被消費。這就保證在事務送出成功之前,這個事務消息對 Consumer 是消費不到的。
2.3 Broker端如何事務反查?
在Broker的TransactionalMessageCheckService服務中啟動了一個定時器,定時從事務消息queue中讀出所有待反查的事務消息。
AbstractTransactionalMessageCheckListener#resolveHalfMsg
針對每個需要反查的半消息,Broker會給對應的Producer發一個要求執行事務狀态反查的RPC請求
AbstractTransactionalMessageCheckListener#sendCheckMessage
Broker2Client#checkProducerTransactionState
根據RPC傳回響應中的反查結果,來決定這個半消息是需要送出還是復原,或者後續繼續來反查。
最後,送出或者復原事務。首先把半消息标記為已處理
如果是送出事務,就把半消息從半消息隊列中複制到該消息真正的topic和queue中
如果是復原事務,什麼都不做
EndTransactionProcessor#processRequest
最後結束該事務。
3 總結
- 整體實作流程
- RocketMQ是基于兩階段送出來實作的事務,把這些事務消息暫存在一個特殊的queue中,待事務送出後再移動到業務隊列中。最後,RocketMQ的事務适用于解決本地事務和發消息的資料一緻性問題。
參考
https://juejin.im/post/6844904193526857742