引言
本文代碼已送出至Github(版本号: 52553aa6fe8b34ff162a1fb33e8f58494b4d2c3f
),有興趣的同學可以下載下傳來看看:https://github.com/ylw-github/taodong-shop
閱讀本文前,有興趣的同學可以參考我之前寫的聚合支付的文章:
- 《淘東電商項目(52) -聚合支付開篇》
- 《淘東電商項目(53) -銀聯支付案例源碼分析》
- 《淘東電商項目(54) -銀聯支付案例(同步與異步)》
- 《淘東電商項目(55) -支付系統核心表設計》
- 《淘東電商項目(56) -支付系統分布式事務的解決方案》
- 《淘東電商項目(57) -聚合支付(支付令牌接口)》
- 《淘東電商項目(58) -聚合支付(基于設計模式自動跳轉支付接口)》
- 《淘東電商項目(59) -聚合支付(內建銀聯支付)》
- 《淘東電商項目(60) -聚合支付(內建支付寶)》
- 《淘東電商項目(61) -聚合支付(基于模闆方法設計模式管理支付回調)》
- 《淘東電商項目(62) -聚合支付(基于模闆方法設計模式管理支付回調-支付寶)》
- 《淘東電商項目(63) -聚合支付(多線程日志收集)》
- 《淘東電商項目(64) -聚合支付(XXL-JOB任務排程平台整合)》
- 《淘東電商項目(65) -聚合支付(異步對賬)》
本文講解聚合支付最後的一個問題 - 分布式事務。舉個例子,比如要增加一個“積分功能”,當第三方伺服器異步傳回支付成功結果,請求我們的支付伺服器時,同時也要做積分增加的功能,如何能保證,支付結果插入資料庫成功的同時保證積分一定能增加成功呢?這裡涉及到了分布式事務的問題,本文主要基于Rabbit來解決這個問題。
本文目錄結構:
l____引言
l____ 1.原理圖
l____ 2.積分資料庫建表
l____ 3.核心代碼
l________ 3.1 內建RabbitMQ
l________ 3.2 生産者代碼
l________ 3.3 消費者代碼
l____ 4.測試
1.原理圖
如上圖,如果支付成功,第三方支付伺服器會請求項目的支付服務,傳回支付結果,這個時候,我們代碼要處理的是如下步驟:
- 更新訂單狀态為“已支付”,即
為1(注意,這裡的方法使用了status
事務注解修飾)@Transactional
- 更新了支付狀态之後,會使用MQ來生産消息,生産增加積分消息MSG
- 如果這個時候程式出錯,會復原,也就是訂單的狀态在資料庫中沒有修改,而已經增加了積分。
針對以上的問題,做出了如下的解決方案:
- 對于第2個步驟,使用RabbitMQ的消息确認機制,保證消息一定可以投遞到RabbitMQ伺服器的增加積分隊列,消費者使用手動簽收的方式,保證消息一定可以消費到,并把增加積分消息更新到資料庫的積分表中。
- 對于第3個步驟,如果程式出錯了,會復原,是以資料庫部分的代碼不生效,訂單的支付狀态沒變,是以增加多了一個支付狀态補償隊列,當支付狀态補償消費者接收到消息後,會檢查支付狀态是否已經修改,如果沒有修改,則更新訂單的狀态。
從上面的解決步驟,可以知道,使用RabbitMQ保證了積分一定可以更新本地資料庫,同時訂單狀态一定可以修改,達到最終一緻性的效果,同時解決了分布式事務的問題。
2.積分資料庫建表
講解前,先貼上積分資料庫的建表語句:
CREATE TABLE `integral` (
`ID` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵ID',
`USER_ID` int(11) DEFAULT NULL COMMENT '使用者ID',
`PAYMENT_ID` varchar(1024) DEFAULT NULL COMMENT '支付ID',
`INTEGRAL` varchar(32) DEFAULT NULL COMMENT '積分',
`AVAILABILITY` int(11) DEFAULT NULL COMMENT '是否可用',
`REVISION` int(11) DEFAULT NULL COMMENT '樂觀鎖',
`CREATED_BY` varchar(32) DEFAULT NULL COMMENT '建立人',
`CREATED_TIME` datetime DEFAULT NULL COMMENT '建立時間',
`UPDATED_BY` varchar(32) DEFAULT NULL COMMENT '更新人',
`UPDATED_TIME` datetime DEFAULT NULL COMMENT '更新時間',
PRIMARY KEY (`ID`)
) ENGINE=InnoDB AUTO_INCREMENT=47 DEFAULT CHARSET=utf8 COMMENT=' ';
3.核心代碼
3.1 內建RabbitMQ
RabbitMQ的搭建本文不再詳述,之前有講解過,有興趣的童鞋可以參閱之前寫過的文章: 《消息中間件系列教程(04) -RabbitMQ -簡介&安裝》,下面開始講解項目內建。
①添加maven依賴:
<!-- 添加springboot對amqp的支援 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
②applicatoin.yml配置:
spring:
rabbitmq:
####連接配接位址
host: 127.0.0.1
####端口号
port: 5672
####賬号
username: guest
####密碼
password: guest
### 位址
virtual-host: integral_host
###開啟消息确認機制 confirms
publisher-confirms: true
publisher-returns: true
③RabbitMQ配置檔案:
@Component
public class RabbitmqConfig {
// 添加積分隊列
public static final String INTEGRAL_DIC_QUEUE = "integral_queue";
// 補單隊列,
public static final String INTEGRAL_CREATE_QUEUE = "integral_create_queue";
// 積分交換機
private static final String INTEGRAL_EXCHANGE_NAME = "integral_exchange_name";
// 1.定義訂單隊列
@Bean
public Queue directIntegralDicQueue() {
return new Queue(INTEGRAL_DIC_QUEUE);
}
// 2.定義補訂單隊列
@Bean
public Queue directCreateintegralQueue() {
return new Queue(INTEGRAL_CREATE_QUEUE);
}
// 2.定義交換機
@Bean
DirectExchange directintegralExchange() {
return new DirectExchange(INTEGRAL_EXCHANGE_NAME);
}
// 3.積分隊列與交換機綁定
@Bean
Binding bindingExchangeintegralDicQueue() {
return BindingBuilder.bind(directIntegralDicQueue()).to(directintegralExchange()).with("integralRoutingKey");
}
// 3.補單隊列與交換機綁定
@Bean
Binding bindingExchangeCreateintegral() {
return BindingBuilder.bind(directCreateintegralQueue()).to(directintegralExchange()).with("integralRoutingKey");
}
}
③在RabbitMQ控制台增加virtual-host:
④配置設定guest對新增的virtual-host有使用者權限:
3.2 生産者代碼
①生産者代碼(注意裡面用了消息确認機制,且使用訂單的id作為全局唯一id來解決幂等性的問題):
/**
* description: 生産者投遞積分
* create by: YangLinWei
* create time: 2020/5/19 11:37 上午
*/
@Component
@Slf4j
public class IntegralProducer implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void send(JSONObject jsonObject) {
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
String paymentId = jsonObject.getString("paymentId");
// 封裝消息
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(paymentId)
.build();
// 建構回調傳回的資料(消息id)
this.rabbitTemplate.setMandatory(true);
this.rabbitTemplate.setConfirmCallback(this);
CorrelationData correlationData = new CorrelationData(jsonString);
rabbitTemplate.convertAndSend("integral_exchange_name", "integralRoutingKey", message, correlationData);
}
// 生産消息确認機制 生産者往伺服器端發送消息的時候,采用應答機制
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String jsonString = correlationData.getId();
System.out.println("消息id:" + correlationData.getId());
if (ack) {
log.info(">>>使用MQ消息确認機制確定消息一定要投遞到MQ中成功");
return;
}
JSONObject jsonObject = JSONObject.parseObject(jsonString);
// 生産者消息投遞失敗的話,采用遞歸重試機制
send(jsonObject);
log.info(">>>使用MQ消息确認機制投遞到MQ中失敗");
}
}
②調用生産者處的代碼,在支付結果異步回調處處理(銀聯支付結果異步回調處處理
UnionPayCallbackTemplate
,注意發送MQ使用了
@Async
注解,不阻塞目前線程)注意下面模拟抛異常了:
@Override
public String asyncService(Map<String, String> verifySignature) {
String orderId = verifySignature.get("orderId"); // 擷取背景通知的資料,其他字段也可用類似方式擷取
String respCode = verifySignature.get("respCode");
// 判斷respCode=00、A6後,對涉及資金類的交易,請再發起查詢接口查詢,确定交易成功後更新資料庫。
System.out.println("orderId:" + orderId + ",respCode:" + respCode);
// 1.判斷respCode是否為已經支付成功斷respCode=00、A6後,
if (!(respCode.equals("00") || respCode.equals("A6"))) {
return failResult();
}
// 根據日志 手動補償 使用支付id調用第三方支付接口查詢
PaymentTransactionEntity paymentTransaction = paymentTransactionMapper.selectByPaymentId(orderId);
if (paymentTransaction.getPaymentStatus().equals(PayConstant.PAY_STATUS_SUCCESS)) {
// 網絡重試中,之前已經支付過
return successResult();
}
// 2.将狀态改為已經支付成功
paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "", orderId+"","yinlian_pay");
// 3.調用積分服務接口增加積分(處理幂等性問題) MQ
addMQIntegral(paymentTransaction); // 使用MQ
int i = 1 / 0; // 支付狀态還是為待支付狀态但是 積分缺增加
return successResult();
}
/**
* 基于MQ增加積分
*/
@Async
public void addMQIntegral(PaymentTransactionEntity paymentTransaction) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("paymentId", paymentTransaction.getPaymentId());
jsonObject.put("userId", paymentTransaction.getUserId());
jsonObject.put("integral", 100);
integralProducer.send(jsonObject);
}
3.3 消費者代碼
①首先看看支付狀态補償消費者代碼(注意這裡使用了手動簽收):
/**
* description: 支付回調檢查狀态,是否為已經支付完成
* create by: YangLinWei
* create time: 2020/5/19 1:52 下午
*/
@Component
@Slf4j
public class PayCheckStateConsumer {
@Autowired
private PaymentTransactionMapper paymentTransactionMapper;
// 死信隊列(備胎) 消息被拒絕、隊列長度滿了 定時任務 人工補償
@RabbitListener(queues = "integral_create_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
log.info(">>>messageId:{},msg:{}", messageId, msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
String paymentId = jsonObject.getString("paymentId");
if (StringUtils.isEmpty(paymentId)) {
log.error(">>>>支付id不能為空 paymentId:{}", paymentId);
basicNack(message, channel);
return;
}
// 1.使用paymentId查詢之前是否已經支付過
PaymentTransactionEntity paymentTransactionEntity = paymentTransactionMapper.selectByPaymentId(paymentId);
if (paymentTransactionEntity == null) {
log.error(">>>>支付id paymentId:{} 未查詢到", paymentId);
basicNack(message, channel);
return;
}
Integer paymentStatus = paymentTransactionEntity.getPaymentStatus();
if (paymentStatus.equals(PayConstant.PAY_STATUS_SUCCESS)) {
log.error(">>>>支付id paymentId:{} ", paymentId);
basicNack(message, channel);
return;
}
// 安全期間 主動調用第三方接口查詢
String paymentChannel = jsonObject.getString("paymentChannel");
int updatePaymentStatus = paymentTransactionMapper.updatePaymentStatus(PayConstant.PAY_STATUS_SUCCESS + "",
paymentId, paymentChannel);
if (updatePaymentStatus > 0) {
basicNack(message, channel);
return;
}
// 繼續重試
} catch (Exception e) {
e.printStackTrace();
basicNack(message, channel);
}
}
private void basicNack(Message message, Channel channel) throws IOException {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
②增加積分消費者代碼(注意這裡使用了手動簽收)::
/**
* description: 積分服務消費者
* create by: YangLinWei
* create time: 2020/5/19 2:10 下午
*/
@Component
@Slf4j
public class IntegralConsumer {
@Autowired
private IntegralMapper integralMapper;
@RabbitListener(queues = "integral_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
log.info(">>>messageId:{},msg:{}", messageId, msg);
JSONObject jsonObject = JSONObject.parseObject(msg);
String paymentId = jsonObject.getString("paymentId");
if (StringUtils.isEmpty(paymentId)) {
log.error(">>>>支付id不能為空 paymentId:{}", paymentId);
basicNack(message, channel);
return;
}
// 使用paymentId查詢是否已經增加過積分 網絡重試間隔
IntegralEntity resultIntegralEntity = integralMapper.findIntegral(paymentId);
if (resultIntegralEntity != null) {
log.error(">>>>paymentId:{}已經增加過積分", paymentId);
// 已經增加過積分,通知MQ不要在繼續重試。
basicNack(message, channel);
return;
}
Integer userId = jsonObject.getInteger("userId");
if (userId == null) {
log.error(">>>>paymentId:{},對應的使用者userId參數為空", paymentId);
basicNack(message, channel);
return;
}
Long integral = jsonObject.getLong("integral");
if (integral == null) {
log.error(">>>>paymentId:{},對應的使用者integral參數為空", integral);
return;
}
IntegralEntity integralEntity = new IntegralEntity();
integralEntity.setPaymentId(paymentId);
integralEntity.setIntegral(integral);
integralEntity.setUserId(userId);
integralEntity.setAvailability(1);
// 插入到資料庫中
int insertIntegral = integralMapper.insertIntegral(integralEntity);
if (insertIntegral > 0) {
// 手動簽收消息,通知mq伺服器端删除該消息
basicNack(message, channel);
}
// 采用重試機制
} catch (Exception e) {
log.error(">>>>ERROR MSG:", e.getMessage());
basicNack(message, channel);
}
}
// 消費者擷取到消息之後 手動簽收 通知MQ删除該消息
private void basicNack(Message message, Channel channel) throws IOException {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
}
4.測試
依次啟動Eureka注冊中心、xxlsso單點登入系統、member會員服務、pay支付服務、pay-web支付門戶服務、還有integral積分服務,啟動後如下圖:
啟動RabbitMQ服務(我的是Mac系統,已經啟動的可以忽略):
cd /usr/local/Cellar/rabbitmq/3.8.2/sbin
./rabbitmq-server -detached
①模拟新增訂單,浏覽器輸入:http://localhost:8600/cratePayToken?payAmount=999&orderId=20200513141452&userId=27&productName=玉米香腸
②确認送出訂單,浏覽器輸入:http://localhost:8079/pay?payToken=pay_88c6262f3a494ae98d0873283514abf5
可以看到目前資料庫,訂單狀态為未支付:
③按照提示,使用銀聯支付,一步一步直至支付完成: