天天看點

RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)

2.2 Broker端如何處理事務消息?

SendMessageProcessor#asyncSendMessage

RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)
跟進去看看真正處理半消息的業務邏輯,這段處理邏輯在類

TransactionalMessageBridge

  • putHalfMessage
  • RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)
  • parseHalfMessageInner
  • RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)
  • RocketMQ并非将事務消息儲存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 後,把這個事務消息儲存在

設計思想

特殊的内部 topic:RMQ_SYS_TRANS_HALF_TOPIC

序号為 0 的 queue

這套 topic 和 queue 對消費者不可見,是以裡面的消息也永遠不會被消費。這就保證在事務送出成功之前,這個事務消息對 Consumer 是消費不到的。

2.3 Broker端如何事務反查?

在Broker的TransactionalMessageCheckService服務中啟動了一個定時器,定時從事務消息queue中讀出所有待反查的事務消息。

AbstractTransactionalMessageCheckListener#resolveHalfMsg

針對每個需要反查的半消息,Broker會給對應的Producer發一個要求執行事務狀态反查的RPC請求

RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)

AbstractTransactionalMessageCheckListener#sendCheckMessage

RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)

Broker2Client#checkProducerTransactionState

RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)

根據RPC傳回響應中的反查結果,來決定這個半消息是需要送出還是復原,或者後續繼續來反查。

最後,送出或者復原事務。首先把半消息标記為已處理

如果是送出事務,就把半消息從半消息隊列中複制到該消息真正的topic和queue中

如果是復原事務,什麼都不做

EndTransactionProcessor#processRequest

RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)

最後結束該事務。

3 總結

  • 整體實作流程
  • RocketMQ幹貨集|徹底看懂RocketMQ事務實作原理(下)
  • RocketMQ是基于兩階段送出來實作的事務,把這些事務消息暫存在一個特殊的queue中,待事務送出後再移動到業務隊列中。最後,RocketMQ的事務适用于解決本地事務和發消息的資料一緻性問題。

參考

https://juejin.im/post/6844904193526857742