天天看點

可靠消息最終一緻性分布式事務

作者:Hu先生Linux背景開發

一、前言

可靠消息最終一緻性方案主要适用于消息資料能夠獨立存儲:

  1. 能夠降低系統之間耦合度
  2. 業務對資料一緻性的時間敏感度高

此方案需要實作的服務模式:

  1. 可查詢操作:提供查詢自身事務狀态的接口。
  2. 幂等操作:隻要參數相同,無論調用多少次接口,都應該和第一次調用産生的結果相同。

那麼什麼時候回查? 事務發送端執行本地事務時(已經發送了 Half 消息了),這時候發送端當機了或者逾時了,就需要回查了。 (1)實作方案

實作方案有兩種:

1.基于本地消息

  • 優點:在業務應用中實作了消息的可靠性,減少了對消息中間件的依賴。
  • 缺點:
  1. 綁定了具體的業務場景,耦合性太高,不可公用和擴充。
  2. 消息資料與業務資料在同一資料庫,占用了業務系統的擴充。
  3. 消息資料可能會受到資料庫并發性的影響。

2.基于消息隊列中間件

  • 優點:
  1. 消息資料能夠獨立存儲,與具體的業務資料庫解耦。
  2. 消息的并發性和吞吐量優于本地消息表方案。
  • 缺點:
  1. 發送一次消息需要完成兩次網絡互動:1.消息的發送 ; 2. 消息的送出或復原。
  2. 需要實作消息的回查接口,增加了開發成本。

(2)注意的問題

1、事務發送方本地事務與消息發送的原子性問題:

  • 原因:執行本地事務和發送消息,要麼都成功,要麼都失敗。
  • 解決方案:通過消息确認服務本地事務執行成功。
// 原子性:事務 + 消息确認(復原)
@Override
@Transactional(rollbackFor = Exception.class)
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object obj) {
    try{

        TxMessage txMessage = this.getTxMessage(msg);
        // 1. 執行本地事務
        orderService.submitOrderAndSaveTxNo(txMessage);
        
        // 2. 送出事務
        return RocketMQLocalTransactionState.COMMIT;
        
    } catch (Exception e){

        // 異常復原事務
        return RocketMQLocalTransactionState.ROLLBACK;
    }

}           

2、事務參與方接收消息的可靠性問題:

  • 原因:由于伺服器當機、服務崩潰或網絡異常等原因,導緻事務參與方不能正常接收消息; 或者接收消息後處理事務的過程中發生異常,無法将結果正确回傳到消息庫中。
  • 解決方案:通過消息恢複服務保證事務參與方的可靠性。

3、事務參與方接收消息的幂等性問題:

  • 原因:可靠消息服務可能會多次向事務參與方發送消息
  • 解決方案:需要具有幂等性,隻要參數相同,無論調用多少次接口或方法,結果都相同。

C++背景開發架構師免費學習位址:Linux C/C++背景開發學習資料

另外還整理一些C++背景開發架構師 相關學習資料,面試題,教學視訊,以及學習路線圖,免費分享有需要的可以自行添加:720209036 :qun檔案共享入~ 群檔案共享

可靠消息最終一緻性分布式事務

(3)實戰

通過 RocketMQ 消息中間件實作可靠消息最終一緻性分布式事務,模拟電商業務中的下單扣減庫存場景。 涉及服務有:

  • 訂單服務
  • 庫存服務
可靠消息最終一緻性分布式事務

整體流程如下:

可靠消息最終一緻性分布式事務
  • 第一步:訂單服務向 RocketMQ 發送 Half 消息。
  • 第二步:RocketMQ 向訂單服務響應 Half 消息發送成功。
  • 第三步:訂單服務執行本地事務,向本地資料庫中插入、更新、删除資料。
  • 第四步:訂單服務向 RocketMQ 發送送出事務或者復原事務的消息。
  • 第五步:如果庫存服務未收到消息,或者執行事務失敗,且 RocketMQ 未删除儲存的消息資料,RocketMQ 會回查訂單服務的接口,查詢事務狀态,以此确認是再次送出事務還是復原事務。
  • 第六步:訂單服務查詢本地資料庫,确認事務是否執行成功。
  • 第七步:訂單服務根據查詢出的事務狀态,向 RocketMQ 發送送出事務或者復原事務的消息。
  • 第八步:如果第七步中訂單服務向 RocketMQ 發送的是送出事務的消息,則 RocketMQ 會向庫存服務投遞消息。
  • 第九步:如果第七步中訂單服務向 RocketMQ 發送的是復原事務的消息,則 RocketMQ 不會向庫存微服務投遞消息,并且會删除内部存儲的消息資料。
  • 第十步:如果 RocketMQ 向庫存服務投遞的是執行本地事務的消息,則庫存服務會執行本地事務,向本地資料庫中插入、更新、删除資料。
  • 第十一步:如果 RocketMQ 向庫存服務投遞的是查詢本地事務狀态的消息,則庫存服務會查詢本地資料庫中事務的執行狀态。

二、實戰實驗

涉及服務有:

  • 訂單服務:項目位址
  • 庫存服務:項目位址

實驗準備:

  • MySQL:8.0.20
  • RocketMQ 消息中間件:rocketmq-all-4.5.0-bin-release
  • RocketMQ 用戶端:rocketmq-spring-boot-starter 2.0.2
  • Spring Boot 版本:2.2.6.RELEASE

訂單服務重點相關代碼:

1.發送 Half 消息:

@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
    @Override
    public void submitOrder(Long productId, Integer payCount) {
        // 1. 生成全局分布式序列号
        String txNo = UUID.randomUUID().toString();
        。。。 。。。
        // 2. 封裝消息
        Message<String> message = 
            MessageBuilder.withPayload(jsonObject.toJSONString()).build();
        // 3. 發送一條事務消息
        rocketMQTemplate.sendMessageInTransaction("tx_order_group", "topic_txmsg",
                message, null);
    }
}           

2.處理本地事務

@Slf4j
@Component
@RocketMQTransactionListener(txProducerGroup = "tx_order_group")
public class OrderTxMessageListener implements RocketMQLocalTransactionListener {
    @Autowired
    private OrderService orderService;
    @Autowired
    private OrderMapper orderMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, 
                                                                 Object obj) {
        try{
            // 1. 擷取消息并解析消息
            TxMessage txMessage = this.getTxMessage(msg);
            // 2. 送出訂單 并且 儲存事務日志
            orderService.submitOrderAndSaveTxNo(txMessage);
            // 3. 事務狀态為送出
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            // 發生異常
            // 事務狀态為復原
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 1. 擷取消息并解析消息
        TxMessage txMessage = this.getTxMessage(msg);
        // 2. 查詢訂單是否存在
        Integer exists = orderMapper.isExistsTx(txMessage.getTxNo());
        if(exists != null){
            // 訂單存在:事務狀态為送出
            return RocketMQLocalTransactionState.COMMIT;
        }
        // 訂單不存在:事務狀态為未知
        // 這裡需要再次調用:處理本地事務嘛?
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}           

庫存服務重點相關代碼:

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "tx_stock_group", topic = "topic_txmsg")
public class StockTxMessageConsumer implements RocketMQListener<String> {
    @Autowired
    private StockService stockService;

    @Override
    public void onMessage(String message) {
        // 監聽到對應消息
        // 擷取消息并解析
        TxMessage txMessage = this.getTxMessage(message);
        stockService.decreaseStock(txMessage);
    }
}           

服務:

  • 訂單服務端口:8080
  • 庫存服務端口:8081
  • RocketMQ: 9876

資料準備:

USE tx_msg_stock;

INSERT INTO stock (id, product_id, total_count) VALUES (1, 1001, 10000);

SELECT * FROM stock;
+----|------------|-------------+
| id | product_id | total_count |
+----|------------|-------------+
|  1 |       1001 |       10000 |
+----|------------|-------------+           

(1)正常流程

1.請求下單接口:調用 訂單服務

$ curl "http://localhost:8080/order/submit_order?productId=1&payCount=1"
下單成功           

訂單服務日志:

2022-05-09 14:19:05.197  c.d.t.message.OrderTxMessageListener     : 訂單微服務執行本地事務
2022-05-09 14:19:05.233  c.d.t.message.OrderTxMessageListener     : 訂單微服務送出事務
2022-05-09 14:21:05.090  INFO 19423 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[172.17.0.3:10909] result: true           

2.庫存服務日志:

2022-05-09 14:22:59.956  [MessageThread_5] c.d.t.message.StockTxMessageConsumer     : 庫存微服務開始消費事務消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"3fcd4e8d-1f5b-448d-ad3d-693c335f994e"}}
2022-05-09 14:22:59.956  [MessageThread_5] c.d.t.service.impl.StockServiceImpl      : 庫存微服務執行本地事務,商品id:1, 購買數量:1           

3.檢視對應資料庫:

-- order 庫下
SELECT * FROM `order`;
+---------------|---------------------|---------------|------------|-----------+
| id            | create_time         | order_no      | product_id | pay_count |
+---------------|---------------------|---------------|------------|-----------+
| 1652077145222 | 2022-05-09 14:19:05 | 1652077145224 |          1 |         1 |
+---------------|---------------------|---------------|------------|-----------+

SELECT * FROM tx_log;
+--------------------------------------|---------------------+
| tx_no                                | create_time         |
+--------------------------------------|---------------------+
| 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:19:05 |
+--------------------------------------|---------------------+



-- stock 庫下

SELECT * FROM tx_log;
+--------------------------------------|---------------------+
| tx_no                                | create_time         |
+--------------------------------------|---------------------+
| 3fcd4e8d-1f5b-448d-ad3d-693c335f994e | 2022-05-09 06:23:00 |
+--------------------------------------|---------------------+


SELECT * FROM stock;
+----|------------|-------------+
| id | product_id | total_count |
+----|------------|-------------+
|  1 |          1 |        9999 |
+----|------------|-------------+           

(2)異常流程:消息中間件當機

1. 步驟一出現異常

即:還沒開始下單,消息中間件當機了,那麼立刻下單失敗。 如圖:

可靠消息最終一緻性分布式事務

2. 步驟四出現異常

即:此步驟還在下單事務中,當送出中間請求失敗,本地事務不會復原。

  • 這時候,RocketMQ 用戶端會不斷去重試。
  • 當 RocketMQ 恢複後,RocketMQ 會去查詢一次
可靠消息最終一緻性分布式事務

實驗步驟:

  1. 在執行本地事務之後,睡眠 30s
  2. 此期間,消息中間件當機:broker 關閉。

日志如下:

2022-05-09 15:27:14.980  c.d.t.message.OrderTxMessageListener     : 訂單微服務執行本地事務
2022-05-09 15:27:15.015  c.d.t.message.OrderTxMessageListener     : 訂單微服務送出事務
2022-05-09 15:27:15.015  c.d.t.message.OrderTxMessageListener     : 嘗試睡 30s 
2022-05-09 15:27:39.025  INFO 22815 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:27:39.025  INFO 22815 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:27:45.015  c.d.t.message.OrderTxMessageListener     : 睡醒,起來幹活了
2022-05-09 15:27:48.041  INFO 22815 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:28:09.017  INFO 22815 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true
2022-05-09 15:28:09.019  INFO 22815 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[] result: true

。。。。。。
2022-05-09 15:35:47.481  INFO 22815 --- [pool-1-thread-1] c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務           

(3)異常流程:事務發送方執行本地事務失敗

即:這時候捕捉到異常:

  1. 訂單服務給 RocketMQ 發送復原消息:RocketMQLocalTransactionState.ROLLBACK
  2. RocketMQ 接收到消息後,會回查
  3. 回查,發現不存在這個訂單,訂單服務向 RocketMQ 發送 未知消息:RocketMQLocalTransactionState.UNKNOWN UNKNOWN 未知狀态:可能是事務正在執行中出異常等,這種情況下消息系統不知道該如何處理,目前的邏輯是會直接丢棄掉,等待後續檢查邏輯來處理。
可靠消息最終一緻性分布式事務

日志如下:

2022-05-09 15:45:11.829  c.d.t.message.OrderTxMessageListener     : 訂單微服務執行本地事務
2022-05-09 15:45:11.861  c.d.t.message.OrderTxMessageListener     : 訂單微服務復原事務

2022-05-09 15:45:47.490  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:46:47.484  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:47:47.489  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:48:47.491  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:49:47.492  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:50:47.494  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:51:47.496  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:52:47.497  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:53:47.498  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:54:47.501  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:55:47.501  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:56:47.503  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:57:47.504  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:58:47.506  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務
2022-05-09 15:59:47.510  c.d.t.message.OrderTxMessageListener     : 訂單微服務查詢本地事務           

可看到 RocketMQ 回查了 15 次。

(4)異常流程:事務接收方當機

即:RocketMQ 無法推送消息給消息接收方。

  • 此時,訂單服務還是會下單成功
  • 庫存服務無法處理

當庫存服務再次上線後:會接收到消息。

訂單服務日志:

2022-05-09 16:47:53.142  c.d.t.message.OrderTxMessageListener     : 訂單微服務執行本地事務
2022-05-09 16:47:53.174  c.d.t.message.OrderTxMessageListener     : 訂單微服務送出事務           

庫存服務日志:

2022-05-09 16:48:10.283  c.d.t.message.StockTxMessageConsumer     : 庫存微服務開始消費事務消息:{"txMessage":{"payCount":1,"productId":1,"txNo":"d5d86dae-76c2-4b56-9597-a12dae14325a"}}
2022-05-09 16:48:10.284  c.d.t.service.impl.StockServiceImpl      : 庫存微服務執行本地事務,商品id:1, 購買數量:1           

(5)異常流程:事務接收方執行本地事務失敗

即:當接收到消息後,執行本地事務失敗,RocketMQ 會不斷發送消費消息。

事務接收方執行本地事務失敗,措施有:

  1. 記錄日志,人工介入處理。
  2. 重試,再出錯,則人工介入。

實驗步驟:

  1. 下單錯誤商品,訂單下單成功。
  2. 庫存中沒有此商品資料,向上抛錯。

日志:

2022-05-09 14:19:59.930  WARN 19542 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt [queueId=0, storeSize=480, queueOffset=1, sysFlag=8, bornTimestamp=1652077145061, bornHost=/172.17.0.1:36848, storeTimestamp=1652077199916, storeHost=/172.17.0.3:10911, msgId=AC11000300002A9F0000000000000D1B, commitLogOffset=3355, bodyCR

java.lang.NullPointerException: null
    at com.donald.txmsgstock.service.impl.StockServiceImpl.decreaseStock(StockServiceImpl.java:35) ~[classes/:na]
    at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:27) ~[classes/:na]
    at com.donald.txmsgstock.message.StockTxMessageConsumer.onMessage(StockTxMessageConsumer.java:16) ~[classes/:na]
    at org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer$DefaultMessageListenerConcurrently.consumeMessage(DefaultRocketMQListenerContainer.java:308) ~[rocketmq-spring-boot-2.0.2.jar:2.0.2]
    at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417) [rocketmq-client-4.4.0.jar:4.4.0]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_162]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_162]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]           

原文位址:可靠消息最終一緻性分布式事務 - 掘金

繼續閱讀