天天看點

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

引言

本文代碼已送出至Github(版本号:

52553aa6fe8b34ff162a1fb33e8f58494b4d2c3f

),有興趣的同學可以下載下傳來看看:https://github.com/ylw-github/taodong-shop

閱讀本文前,有興趣的同學可以參考我之前寫的聚合支付的文章:

  • 《淘東電商項目(52) -聚合支付開篇》
  • 《淘東電商項目(53) -銀聯支付案例源碼分析》
  • 《淘東電商項目(54) -銀聯支付案例(同步與異步)》
  • 《淘東電商項目(55) -支付系統核心表設計》
  • 《淘東電商項目(56) -支付系統分布式事務的解決方案》
  • 《淘東電商項目(57) -聚合支付(支付令牌接口)》
  • 《淘東電商項目(58) -聚合支付(基于設計模式自動跳轉支付接口)》
  • 《淘東電商項目(59) -聚合支付(內建銀聯支付)》
  • 《淘東電商項目(60) -聚合支付(內建支付寶)》
  • 《淘東電商項目(61) -聚合支付(基于模闆方法設計模式管理支付回調)》
  • 《淘東電商項目(62) -聚合支付(基于模闆方法設計模式管理支付回調-支付寶)》
  • 《淘東電商項目(63) -聚合支付(多線程日志收集)》
  • 《淘東電商項目(64) -聚合支付(XXL-JOB任務排程平台整合)》
  • 《淘東電商項目(65) -聚合支付(異步對賬)》

本文講解聚合支付最後的一個問題 - 分布式事務。舉個例子,比如要增加一個“積分功能”,當第三方伺服器異步傳回支付成功結果,請求我們的支付伺服器時,同時也要做積分增加的功能,如何能保證,支付結果插入資料庫成功的同時保證積分一定能增加成功呢?這裡涉及到了分布式事務的問題,本文主要基于Rabbit來解決這個問題。

本文目錄結構:

l____引言

l____ 1.原理圖

l____ 2.積分資料庫建表

l____ 3.核心代碼

l________ 3.1 內建RabbitMQ

l________ 3.2 生産者代碼

l________ 3.3 消費者代碼

l____ 4.測試

1.原理圖

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

如上圖,如果支付成功,第三方支付伺服器會請求項目的支付服務,傳回支付結果,這個時候,我們代碼要處理的是如下步驟:

  1. 更新訂單狀态為“已支付”,即

    status

    為1(注意,這裡的方法使用了

    @Transactional

    事務注解修飾)
  2. 更新了支付狀态之後,會使用MQ來生産消息,生産增加積分消息MSG
  3. 如果這個時候程式出錯,會復原,也就是訂單的狀态在資料庫中沒有修改,而已經增加了積分。

針對以上的問題,做出了如下的解決方案:

  • 對于第2個步驟,使用RabbitMQ的消息确認機制,保證消息一定可以投遞到RabbitMQ伺服器的增加積分隊列,消費者使用手動簽收的方式,保證消息一定可以消費到,并把增加積分消息更新到資料庫的積分表中。
  • 對于第3個步驟,如果程式出錯了,會復原,是以資料庫部分的代碼不生效,訂單的支付狀态沒變,是以增加多了一個支付狀态補償隊列,當支付狀态補償消費者接收到消息後,會檢查支付狀态是否已經修改,如果沒有修改,則更新訂單的狀态。

從上面的解決步驟,可以知道,使用RabbitMQ保證了積分一定可以更新本地資料庫,同時訂單狀态一定可以修改,達到最終一緻性的效果,同時解決了分布式事務的問題。

2.積分資料庫建表

講解前,先貼上積分資料庫的建表語句:

CREATE TABLE `integral` (
  `ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
  `USER_ID` int(11) DEFAULT NULL COMMENT '使用者ID',
  `PAYMENT_ID` varchar(1024) DEFAULT NULL COMMENT '支付ID',
  `INTEGRAL` varchar(32) DEFAULT NULL COMMENT '積分',
  `AVAILABILITY` int(11) DEFAULT NULL COMMENT '是否可用',
  `REVISION` int(11) DEFAULT NULL COMMENT '樂觀鎖',
  `CREATED_BY` varchar(32) DEFAULT NULL COMMENT '建立人',
  `CREATED_TIME` datetime DEFAULT NULL COMMENT '建立時間',
  `UPDATED_BY` varchar(32) DEFAULT NULL COMMENT '更新人',
  `UPDATED_TIME` datetime DEFAULT NULL COMMENT '更新時間',
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8 COMMENT=' ';
           
淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

3.核心代碼

3.1 內建RabbitMQ

RabbitMQ的搭建本文不再詳述,之前有講解過,有興趣的童鞋可以參閱之前寫過的文章: 《消息中間件系列教程(04) -RabbitMQ -簡介&安裝》,下面開始講解項目內建。

①添加maven依賴:

<!-- 添加springboot對amqp的支援 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
           

②applicatoin.yml配置:

spring:
  rabbitmq:
    ####連接配接位址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####賬号
    username: guest
    ####密碼
    password: guest
    ### 位址
    virtual-host: integral_host
    ###開啟消息确認機制 confirms
    publisher-confirms: true
    publisher-returns: true
           

③RabbitMQ配置檔案:

@Component
public class RabbitmqConfig {

    // 添加積分隊列
    public static final String INTEGRAL_DIC_QUEUE = "integral_queue";
    // 補單隊列,
    public static final String INTEGRAL_CREATE_QUEUE = "integral_create_queue";
    // 積分交換機
    private static final String INTEGRAL_EXCHANGE_NAME = "integral_exchange_name";

    // 1.定義訂單隊列
    @Bean
    public Queue directIntegralDicQueue() {
        return new Queue(INTEGRAL_DIC_QUEUE);
    }

    // 2.定義補訂單隊列
    @Bean
    public Queue directCreateintegralQueue() {
        return new Queue(INTEGRAL_CREATE_QUEUE);
    }

    // 2.定義交換機
    @Bean
    DirectExchange directintegralExchange() {
        return new DirectExchange(INTEGRAL_EXCHANGE_NAME);
    }

    // 3.積分隊列與交換機綁定
    @Bean
    Binding bindingExchangeintegralDicQueue() {
        return BindingBuilder.bind(directIntegralDicQueue()).to(directintegralExchange()).with("integralRoutingKey");
    }

    // 3.補單隊列與交換機綁定
    @Bean
    Binding bindingExchangeCreateintegral() {
        return BindingBuilder.bind(directCreateintegralQueue()).to(directintegralExchange()).with("integralRoutingKey");
    }

}
           

③在RabbitMQ控制台增加virtual-host:

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

④配置設定guest對新增的virtual-host有使用者權限:

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)
淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

3.2 生産者代碼

①生産者代碼(注意裡面用了消息确認機制,且使用訂單的id作為全局唯一id來解決幂等性的問題):

/**
 * description: 生産者投遞積分
 * create by: YangLinWei
 * create time: 2020/5/19 11:37 上午
 */
@Component
@Slf4j
public class IntegralProducer implements RabbitTemplate.ConfirmCallback {
	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Transactional
	public void send(JSONObject jsonObject) {

		String jsonString = jsonObject.toJSONString();
		System.out.println("jsonString:" + jsonString);
		String paymentId = jsonObject.getString("paymentId");
		// 封裝消息
		Message message = MessageBuilder.withBody(jsonString.getBytes())
				.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(paymentId)
				.build();
		// 建構回調傳回的資料(消息id)
		this.rabbitTemplate.setMandatory(true);
		this.rabbitTemplate.setConfirmCallback(this);
		CorrelationData correlationData = new CorrelationData(jsonString);
		rabbitTemplate.convertAndSend("integral_exchange_name", "integralRoutingKey", message, correlationData);

	}

	// 生産消息确認機制 生産者往伺服器端發送消息的時候,采用應答機制
	@Override
	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
		String jsonString = correlationData.getId();
		System.out.println("消息id:" + correlationData.getId());
		if (ack) {
			log.info(">>>使用MQ消息确認機制確定消息一定要投遞到MQ中成功");
			return;
		}
		JSONObject jsonObject = JSONObject.parseObject(jsonString);
		// 生産者消息投遞失敗的話,采用遞歸重試機制
		send(jsonObject);
		log.info(">>>使用MQ消息确認機制投遞到MQ中失敗");
	}
}
           

②調用生産者處的代碼,在支付結果異步回調處處理(銀聯支付結果異步回調處處理

UnionPayCallbackTemplate

,注意發送MQ使用了

@Async

注解,不阻塞目前線程)注意下面模拟抛異常了:

@Override
public String asyncService(Map<String, String> verifySignature) {

	String orderId = verifySignature.get("orderId"); // 擷取背景通知的資料,其他字段也可用類似方式擷取
	String respCode = verifySignature.get("respCode");

	// 判斷respCode=00、A6後,對涉及資金類的交易,請再發起查詢接口查詢,确定交易成功後更新資料庫。
	System.out.println("orderId:" + orderId + ",respCode:" + respCode);
	// 1.判斷respCode是否為已經支付成功斷respCode=00、A6後,
	if (!(respCode.equals("00") || respCode.equals("A6"))) {
		return failResult();
	}
	// 根據日志 手動補償 使用支付id調用第三方支付接口查詢
	PaymentTransactionEntity paymentTransaction = paymentTransactionMapper.selectByPaymentId(orderId);
	if (paymentTransaction.getPaymentStatus().equals(PayConstant.PAY_STATUS_SUCCESS)) {
		// 網絡重試中,之前已經支付過
		return successResult();
	}
	// 2.将狀态改為已經支付成功
	paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "", orderId+"","yinlian_pay");
	// 3.調用積分服務接口增加積分(處理幂等性問題) MQ
	addMQIntegral(paymentTransaction); // 使用MQ
	int i = 1 / 0; // 支付狀态還是為待支付狀态但是 積分缺增加
	return successResult();
}

/**
 * 基于MQ增加積分
 */
@Async
public void addMQIntegral(PaymentTransactionEntity paymentTransaction) {
	JSONObject jsonObject = new JSONObject();
	jsonObject.put("paymentId", paymentTransaction.getPaymentId());
	jsonObject.put("userId", paymentTransaction.getUserId());
	jsonObject.put("integral", 100);
	integralProducer.send(jsonObject);
}
           

3.3 消費者代碼

①首先看看支付狀态補償消費者代碼(注意這裡使用了手動簽收):

/**
 * description: 支付回調檢查狀态,是否為已經支付完成
 * create by: YangLinWei
 * create time: 2020/5/19 1:52 下午
 */
@Component
@Slf4j
public class PayCheckStateConsumer {
    @Autowired
    private PaymentTransactionMapper paymentTransactionMapper;

    // 死信隊列(備胎) 消息被拒絕、隊列長度滿了 定時任務 人工補償

    @RabbitListener(queues = "integral_create_queue")
    public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        try {
            String messageId = message.getMessageProperties().getMessageId();
            String msg = new String(message.getBody(), "UTF-8");
            log.info(">>>messageId:{},msg:{}", messageId, msg);
            JSONObject jsonObject = JSONObject.parseObject(msg);
            String paymentId = jsonObject.getString("paymentId");
            if (StringUtils.isEmpty(paymentId)) {
                log.error(">>>>支付id不能為空 paymentId:{}", paymentId);
                basicNack(message, channel);
                return;
            }
            // 1.使用paymentId查詢之前是否已經支付過
            PaymentTransactionEntity paymentTransactionEntity = paymentTransactionMapper.selectByPaymentId(paymentId);
            if (paymentTransactionEntity == null) {
                log.error(">>>>支付id paymentId:{} 未查詢到", paymentId);
                basicNack(message, channel);
                return;
            }
            Integer paymentStatus = paymentTransactionEntity.getPaymentStatus();
            if (paymentStatus.equals(PayConstant.PAY_STATUS_SUCCESS)) {
                log.error(">>>>支付id paymentId:{} ", paymentId);
                basicNack(message, channel);
                return;
            }
            // 安全期間 主動調用第三方接口查詢
            String paymentChannel = jsonObject.getString("paymentChannel");
            int updatePaymentStatus = paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "",
                    paymentId, paymentChannel);
            if (updatePaymentStatus > 0) {
                basicNack(message, channel);
                return;
            }
            // 繼續重試

        } catch (Exception e) {
            e.printStackTrace();
            basicNack(message, channel);
        }

    }

    private void basicNack(Message message, Channel channel) throws IOException {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

    }

}

           

②增加積分消費者代碼(注意這裡使用了手動簽收)::

/**
 * description: 積分服務消費者
 * create by: YangLinWei
 * create time: 2020/5/19 2:10 下午
 */
@Component
@Slf4j
public class IntegralConsumer {
	@Autowired
	private IntegralMapper integralMapper;

	@RabbitListener(queues = "integral_queue")
	public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
		try {
			String messageId = message.getMessageProperties().getMessageId();
			String msg = new String(message.getBody(), "UTF-8");
			log.info(">>>messageId:{},msg:{}", messageId, msg);
			JSONObject jsonObject = JSONObject.parseObject(msg);
			String paymentId = jsonObject.getString("paymentId");
			if (StringUtils.isEmpty(paymentId)) {
				log.error(">>>>支付id不能為空 paymentId:{}", paymentId);
				basicNack(message, channel);
				return;
			}
			// 使用paymentId查詢是否已經增加過積分 網絡重試間隔
			IntegralEntity resultIntegralEntity = integralMapper.findIntegral(paymentId);
			if (resultIntegralEntity != null) {
				log.error(">>>>paymentId:{}已經增加過積分", paymentId);
				// 已經增加過積分,通知MQ不要在繼續重試。
				basicNack(message, channel);
				return;
			}
			Integer userId = jsonObject.getInteger("userId");
			if (userId == null) {
				log.error(">>>>paymentId:{},對應的使用者userId參數為空", paymentId);
				basicNack(message, channel);
				return;
			}
			Long integral = jsonObject.getLong("integral");
			if (integral == null) {
				log.error(">>>>paymentId:{},對應的使用者integral參數為空", integral);
				return;
			}
			IntegralEntity integralEntity = new IntegralEntity();
			integralEntity.setPaymentId(paymentId);
			integralEntity.setIntegral(integral);
			integralEntity.setUserId(userId);
			integralEntity.setAvailability(1);
			// 插入到資料庫中
			int insertIntegral = integralMapper.insertIntegral(integralEntity);
			if (insertIntegral > 0) {
				// 手動簽收消息,通知mq伺服器端删除該消息
				basicNack(message, channel);
			}
			// 采用重試機制
		} catch (Exception e) {
			log.error(">>>>ERROR MSG:", e.getMessage());
			basicNack(message, channel);
		}

	}

	// 消費者擷取到消息之後 手動簽收 通知MQ删除該消息
	private void basicNack(Message message, Channel channel) throws IOException {
		channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
	}

}
           

4.測試

依次啟動Eureka注冊中心、xxlsso單點登入系統、member會員服務、pay支付服務、pay-web支付門戶服務、還有integral積分服務,啟動後如下圖:

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

啟動RabbitMQ服務(我的是Mac系統,已經啟動的可以忽略):

cd /usr/local/Cellar/rabbitmq/3.8.2/sbin
./rabbitmq-server -detached
           

①模拟新增訂單,浏覽器輸入:http://localhost:8600/cratePayToken?payAmount=999&orderId=20200513141452&userId=27&productName=玉米香腸

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

②确認送出訂單,浏覽器輸入:http://localhost:8079/pay?payToken=pay_88c6262f3a494ae98d0873283514abf5

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

可以看到目前資料庫,訂單狀态為未支付:

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)

③按照提示,使用銀聯支付,一步一步直至支付完成:

淘東電商項目(66) -聚合支付(基于RabbitMQ解決分布式事務-積分場景)