一、前言
可靠消息最終一緻性方案主要适用于消息資料能夠獨立存儲:
- 能夠降低系統之間耦合度
- 業務對資料一緻性的時間敏感度高
此方案需要實作的服務模式:
- 可查詢操作:提供查詢自身事務狀态的接口。
- 幂等操作:隻要參數相同,無論調用多少次接口,都應該和第一次調用産生的結果相同。
那麼什麼時候回查? 事務發送端執行本地事務時(已經發送了 Half 消息了),這時候發送端當機了或者逾時了,就需要回查了。 (1)實作方案
實作方案有兩種:
1.基于本地消息
- 優點:在業務應用中實作了消息的可靠性,減少了對消息中間件的依賴。
- 缺點:
- 綁定了具體的業務場景,耦合性太高,不可公用和擴充。
- 消息資料與業務資料在同一資料庫,占用了業務系統的擴充。
- 消息資料可能會受到資料庫并發性的影響。
2.基于消息隊列中間件
- 優點:
- 消息資料能夠獨立存儲,與具體的業務資料庫解耦。
- 消息的并發性和吞吐量優于本地消息表方案。
- 缺點:
- 發送一次消息需要完成兩次網絡互動:1.消息的發送 ; 2. 消息的送出或復原。
- 需要實作消息的回查接口,增加了開發成本。
(2)注意的問題
1、事務發送方本地事務與消息發送的原子性問題:
- 原因:執行本地事務和發送消息,要麼都成功,要麼都失敗。
- 解決方案:通過消息确認服務本地事務執行成功。
// 原子性:事務 + 消息确認(復原)
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) {
try{
TxMessage txMessage = this.getTxMessage(msg);
// 1. 執行本地事務
orderService.submitOrderAndSaveTxNo(txMessage);
// 2. 送出事務
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e){
// 異常復原事務
return RocketMQLocalTransactionState.ROLLBACK;
}
}
2、事務參與方接收消息的可靠性問題:
- 原因:由于伺服器當機、服務崩潰或網絡異常等原因,導緻事務參與方不能正常接收消息; 或者接收消息後處理事務的過程中發生異常,無法将結果正确回傳到消息庫中。
- 解決方案:通過消息恢複服務保證事務參與方的可靠性。
3、事務參與方接收消息的幂等性問題:
- 原因:可靠消息服務可能會多次向事務參與方發送消息
- 解決方案:需要具有幂等性,隻要參數相同,無論調用多少次接口或方法,結果都相同。
C++背景開發架構師免費學習位址:Linux C/C++背景開發學習資料
另外還整理一些C++背景開發架構師 相關學習資料,面試題,教學視訊,以及學習路線圖,免費分享有需要的可以自行添加:720209036 :qun檔案共享入~ 群檔案共享
(3)實戰
通過 RocketMQ 消息中間件實作可靠消息最終一緻性分布式事務,模拟電商業務中的下單扣減庫存場景。 涉及服務有:
- 訂單服務
- 庫存服務
整體流程如下:
- 第一步:訂單服務向 RocketMQ 發送 Half 消息。
- 第二步:RocketMQ 向訂單服務響應 Half 消息發送成功。
- 第三步:訂單服務執行本地事務,向本地資料庫中插入、更新、删除資料。
- 第四步:訂單服務向 RocketMQ 發送送出事務或者復原事務的消息。
- 第五步:如果庫存服務未收到消息,或者執行事務失敗,且 RocketMQ 未删除儲存的消息資料,RocketMQ 會回查訂單服務的接口,查詢事務狀态,以此确認是再次送出事務還是復原事務。
- 第六步:訂單服務查詢本地資料庫,确認事務是否執行成功。
- 第七步:訂單服務根據查詢出的事務狀态,向 RocketMQ 發送送出事務或者復原事務的消息。
- 第八步:如果第七步中訂單服務向 RocketMQ 發送的是送出事務的消息,則 RocketMQ 會向庫存服務投遞消息。
- 第九步:如果第七步中訂單服務向 RocketMQ 發送的是復原事務的消息,則 RocketMQ 不會向庫存微服務投遞消息,并且會删除内部存儲的消息資料。
- 第十步:如果 RocketMQ 向庫存服務投遞的是執行本地事務的消息,則庫存服務會執行本地事務,向本地資料庫中插入、更新、删除資料。
- 第十一步:如果 RocketMQ 向庫存服務投遞的是查詢本地事務狀态的消息,則庫存服務會查詢本地資料庫中事務的執行狀态。
二、實戰實驗
涉及服務有:
- 訂單服務:項目位址
- 庫存服務:項目位址
實驗準備:
- MySQL:8.0.20
- RocketMQ 消息中間件:rocketmq-all-4.5.0-bin-release
- RocketMQ 用戶端:rocketmq-spring-boot-starter 2.0.2
- Spring Boot 版本:2.2.6.RELEASE
訂單服務重點相關代碼:
1.發送 Half 消息:
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Override
public void submitOrder(Long productId, Integer payCount) {
// 1. 生成全局分布式序列号
String txNo = UUID.randomUUID().toString();
。。。 。。。
// 2. 封裝消息
Message<String> message =
MessageBuilder.withPayload(jsonObject.toJSONString()).build();
// 3. 發送一條事務消息
rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg",
message, null);
}
}
2.處理本地事務
@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private OrderMapper orderMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object obj) {
try{
// 1. 擷取消息并解析消息
TxMessage txMessage = this.getTxMessage(msg);
// 2. 送出訂單 并且 儲存事務日志
orderService.submitOrderAndSaveTxNo(txMessage);
// 3. 事務狀态為送出
return RocketMQLocalTransactionState.COMMIT;
}catch (Exception e){
// 發生異常
// 事務狀态為復原
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 1. 擷取消息并解析消息
TxMessage txMessage = this.getTxMessage(msg);
// 2. 查詢訂單是否存在
Integer exists = orderMapper.isExistsTx(txMessage.getTxNo());
if(exists != null){
// 訂單存在:事務狀态為送出
return RocketMQLocalTransactionState.COMMIT;
}
// 訂單不存在:事務狀态為未知
// 這裡需要再次調用:處理本地事務嘛?
return RocketMQLocalTransactionState.UNKNOWN;
}
}
庫存服務重點相關代碼:
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements RocketMQListener<String> {
@Autowired
private StockService stockService;
@Override
public void onMessage(String message) {
// 監聽到對應消息
// 擷取消息并解析
TxMessage txMessage = this.getTxMessage(message);
stockService.decreaseStock(txMessage);
}
}
服務:
- 訂單服務端口:8080
- 庫存服務端口:8081
- RocketMQ: 9876
資料準備:
USE tx_msg_stock;
INSERT INTO stock (id, product_id, total_count) VALUES (1, 1001, 10000);
SELECT * FROM stock;
+----|------------|-------------+
| id | product_id | total_count |
+----|------------|-------------+
| 1 | 1001 | 10000 |
+----|------------|-------------+
(1)正常流程
1.請求下單接口:調用 訂單服務
$ curl "http://localhost:8080/order/submit_order?productId=1&payCount=1"
下單成功
訂單服務日志:
2022-05-09 14:19:05.197 c.d.t.message.OrderTxMessageListener : 訂單微服務執行本地事務
2022-05-09 14:19:05.233 c.d.t.message.OrderTxMessageListener : 訂單微服務送出事務
2022-05-09 14:21:05.090 INFO 19423 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[172.17.0.3:10909] result: true
2.庫存服務日志:
2022-05-09 14:22:59.956 [MessageThread_5] c.d.t.message.StockTxMessageConsumer : 庫存微服務開始消費事務消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"3fcd4e8d-1f5b-448d-ad3d-693c335f994e"}}
2022-05-09 14:22:59.956 [MessageThread_5] c.d.t.service.impl.StockServiceImpl : 庫存微服務執行本地事務,商品id:1, 購買數量:1
3.檢視對應資料庫:
-- order 庫下
SELECT * FROM `order`;
+---------------|---------------------|---------------|------------|-----------+
| id | create_time | order_no | product_id | pay_count |
+---------------|---------------------|---------------|------------|-----------+
| 1652077145222 | 2022-05-09 14:19:05 | 1652077145224 | 1 | 1 |
+---------------|---------------------|---------------|------------|-----------+
SELECT * FROM tx_log;
+--------------------------------------|---------------------+
| tx_no | create_time |
+--------------------------------------|---------------------+
| 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:19:05 |
+--------------------------------------|---------------------+
-- stock 庫下
SELECT * FROM tx_log;
+--------------------------------------|---------------------+
| tx_no | create_time |
+--------------------------------------|---------------------+
| 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:23:00 |
+--------------------------------------|---------------------+
SELECT * FROM stock;
+----|------------|-------------+
| id | product_id | total_count |
+----|------------|-------------+
| 1 | 1 | 9999 |
+----|------------|-------------+
(2)異常流程:消息中間件當機
1. 步驟一出現異常
即:還沒開始下單,消息中間件當機了,那麼立刻下單失敗。 如圖:
2. 步驟四出現異常
即:此步驟還在下單事務中,當送出中間請求失敗,本地事務不會復原。
- 這時候,RocketMQ 用戶端會不斷去重試。
- 當 RocketMQ 恢複後,RocketMQ 會去查詢一次
實驗步驟:
- 在執行本地事務之後,睡眠 30s
- 此期間,消息中間件當機:broker 關閉。
日志如下:
2022-05-09 15:27:14.980 c.d.t.message.OrderTxMessageListener : 訂單微服務執行本地事務
2022-05-09 15:27:15.015 c.d.t.message.OrderTxMessageListener : 訂單微服務送出事務
2022-05-09 15:27:15.015 c.d.t.message.OrderTxMessageListener : 嘗試睡 30s
2022-05-09 15:27:39.025 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:27:39.025 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:27:45.015 c.d.t.message.OrderTxMessageListener : 睡醒,起來幹活了
2022-05-09 15:27:48.041 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:28:09.017 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:28:09.019 INFO 22815 --- [lientSelector_1] RocketmqRemoting : closeChannel: close the connection to remote address[] result: true
。。。。。。
2022-05-09 15:35:47.481 INFO 22815 --- [pool-1-thread-1] c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
(3)異常流程:事務發送方執行本地事務失敗
即:這時候捕捉到異常:
- 訂單服務給 RocketMQ 發送復原消息:RocketMQLocalTransactionState.ROLLBACK
- RocketMQ 接收到消息後,會回查
- 回查,發現不存在這個訂單,訂單服務向 RocketMQ 發送 未知消息:RocketMQLocalTransactionState.UNKNOWN UNKNOWN 未知狀态:可能是事務正在執行中出異常等,這種情況下消息系統不知道該如何處理,目前的邏輯是會直接丢棄掉,等待後續檢查邏輯來處理。
日志如下:
2022-05-09 15:45:11.829 c.d.t.message.OrderTxMessageListener : 訂單微服務執行本地事務
2022-05-09 15:45:11.861 c.d.t.message.OrderTxMessageListener : 訂單微服務復原事務
2022-05-09 15:45:47.490 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:46:47.484 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:47:47.489 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:48:47.491 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:49:47.492 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:50:47.494 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:51:47.496 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:52:47.497 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:53:47.498 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:54:47.501 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:55:47.501 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:56:47.503 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:57:47.504 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:58:47.506 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
2022-05-09 15:59:47.510 c.d.t.message.OrderTxMessageListener : 訂單微服務查詢本地事務
可看到 RocketMQ 回查了 15 次。
(4)異常流程:事務接收方當機
即:RocketMQ 無法推送消息給消息接收方。
- 此時,訂單服務還是會下單成功
- 庫存服務無法處理
當庫存服務再次上線後:會接收到消息。
訂單服務日志:
2022-05-09 16:47:53.142 c.d.t.message.OrderTxMessageListener : 訂單微服務執行本地事務
2022-05-09 16:47:53.174 c.d.t.message.OrderTxMessageListener : 訂單微服務送出事務
庫存服務日志:
2022-05-09 16:48:10.283 c.d.t.message.StockTxMessageConsumer : 庫存微服務開始消費事務消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"d5d86dae-76c2-4b56-9597-a12dae14325a"}}
2022-05-09 16:48:10.284 c.d.t.service.impl.StockServiceImpl : 庫存微服務執行本地事務,商品id:1, 購買數量:1
(5)異常流程:事務接收方執行本地事務失敗
即:當接收到消息後,執行本地事務失敗,RocketMQ 會不斷發送消費消息。
事務接收方執行本地事務失敗,措施有:
- 記錄日志,人工介入處理。
- 重試,再出錯,則人工介入。
實驗步驟:
- 下單錯誤商品,訂單下單成功。
- 庫存中沒有此商品資料,向上抛錯。
日志:
2022-05-09 14:19:59.930 WARN 19542 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=0, storeSize=480, queueOffset=1, sysFlag=8, bornTimestamp=1652077145061, bornHost=/172.17.0.1:36848, storeTimestamp=1652077199916, storeHost=/172.17.0.3:10911, msgId=AC11000300002A9F0000000000000D1B, commitLogOffset=3355, bodyCR
java.lang.NullPointerException: null
at com.donald.txmsgstock.service.impl.StockServiceImpl.decreaseStock(StockServiceImpl.java:35) ~[classes/:na]
at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:27) ~[classes/:na]
at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:16) ~[classes/:na]
at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:308) ~[rocketmq-spring-boot-2.0.2.jar:2.0.2]
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) [rocketmq-client-4.4.0.jar:4.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_162]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
原文位址:可靠消息最終一緻性分布式事務 - 掘金