天天看点

17 秒杀系统 | 交易性能优化 | 库存操作流水 | 最终一致性 | RocketMQ 事务型消息反查接口的依据...数据类型库存操作流水模型构建

数据类型

主业务数据(Master Data)

  • 比如 ItemModel,记录了商品的主数据;
  • 比如 item_stock 记录了商品库存的主数据;

操作型数据(Log Data)

  • 比如库存扣减这样的操作发生了,需要把操作本身的过程记录下来,用于支持这种记录的数据,就是操作型数据;
  • 记录下操作型数据是为了追踪,比如库存流水的操作状态,可以根据这个状态做回滚,或者查询正在处理中的状态,使得很多异步的动作,比如下单之前,先落一条待扣减的库存操作的流水,成功之后,再把流水的状态设置为成功,或者下单失败时候,将对应的库存操作的流水设置为失败,可以根据这个操作流水的状态反查,得到异步动作进行的当前状态;

库存操作流水模型构建

创建库存流水表 stock_log

  • 顺便生成 mybatis-generator 的一套;
  1. CREATE TABLE `stock_log` (
  2. `stock_log_id` varchar( ) COLLATE utf8_unicode_ci NOT NULL,
  3. `item_id` int NOT NULL DEFAULT '0',
  4. `amount` int NOT NULL DEFAULT '0',
  5. `status` int NOT NULL COMMENT '1 表示初始状态, 2 表示下单减库存成功, 3 表示下单回滚',
  6. PRIMARY KEY (`stock_log_id`)
  7. ) ENGINE =InnoDB DEFAULT CHARSET =utf8 COLLATE =utf8_unicode_ci;

下单前生成库存流水

  • 用户的下单请求到来时,先生成一个库存流水记录,库存流水的 status 设置成 1;
  • 下单的时候,把库存流水号带上;
  1. @RequestMapping(value = "/createorder", method = {RequestMethod.POST}, consumes = {CONTENT_TYPE_FORMED})
  2. @ResponseBody
  3. public CommonReturnType createOrder (@RequestParam(name = "itemId") Integer itemId,
  4. @RequestParam(name = "amount") Integer amount,
  5. @RequestParam(name = "promoId", required = false) Integer promoId) throws BusinessException {
  6. String token = httpServletRequest.getParameterMap().get( "token")[ ];
  7. if (StringUtils.isEmpty(token)) {
  8. throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
  9. }
  10. UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
  11. if (userModel == null) {
  12. throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
  13. }
  14. String stockLogId = itemService.initStockLog(itemId, amount);
  15. if(!mqProducer.transactionAsyncReduceStock(userModel.getId(), promoId, itemId, amount, stockLogId)) {
  16. throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
  17. }
  18. return CommonReturnType.create( null);
  19. }

下单成功后,修改库存流水的 status 为 2;

  • 整个下单逻辑要经历的步骤有:1. 查询验证,2. 减库存,3. 下单,4. 增销量,5. 设置库存流水状态;
  1. // 5. 设置库存流水状态为成功
  2. StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
  3. if (stockLogDO == null) {
  4. throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
  5. }
  6. stockLogDO.setStatus( );
  7. stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);

本地事务(下单)失败,要设置库存流水的 status 为 3;

  • 设置完了让 Broker 中的 Prepared 状态的消息回滚;
  1. @Override
  2. public LocalTransactionState executeLocalTransaction( Message message, Object args) {
  3. // 真正要执行的操作:创建订单
  4. Integer userId = ( Integer)(( Map) args). get( "userId");
  5. Integer itemId = ( Integer)(( Map) args). get( "itemId");
  6. Integer promoId = ( Integer)(( Map) args). get( "promoId");
  7. Integer amount = ( Integer)(( Map) args). get( "amount");
  8. String stockLogId = ( String)(( Map) args). get( "stockLogId");
  9. try {
  10. OrderModel orderModel = orderService. createOrder(userId, itemId, promoId, amount, stockLogId);
  11. } catch ( BusinessException e) {
  12. e. printStackTrace();
  13. // 设置对应的 stockLog 为回滚状态
  14. StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
  15. stockLogDO. setStatus( );
  16. stockLogDOMapper. updateByPrimaryKeySelective(stockLogDO);
  17. return LocalTransactionState. ROLLBACK_MESSAGE;
  18. }
  19. return LocalTransactionState. COMMIT_MESSAGE;
  20. }

Broker 长时间没收到 Commit 提交要走反查逻辑

  • 反查的依据就是库存操作流水号;
  1. @Override
  2. public LocalTransactionState checkLocalTransaction( MessageExt msg) {
  3. // 根据是否扣减库存成功,来判断要返回 COMMIT, ROLLBACK, 还是继续 UNKNOW
  4. String jsonString = new String(msg. getBody());
  5. Map< String, Object> map = JSON. parseObject(jsonString, Map. class);
  6. String stockLogId = ( String) map. get( "stockLogId");
  7. StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
  8. if (stockLogDO == null) {
  9. return LocalTransactionState. UNKNOW;
  10. }
  11. if (stockLogDO. getStatus(). intValue() == ) {
  12. return LocalTransactionState. COMMIT_MESSAGE;
  13. } else if (stockLogDO. getStatus(). intValue() == ) {
  14. return LocalTransactionState. UNKNOW;
  15. }
  16. return LocalTransactionState. ROLLBACK_MESSAGE;
  17. }

数据类型

主业务数据(Master Data)

  • 比如 ItemModel,记录了商品的主数据;
  • 比如 item_stock 记录了商品库存的主数据;

操作型数据(Log Data)

  • 比如库存扣减这样的操作发生了,需要把操作本身的过程记录下来,用于支持这种记录的数据,就是操作型数据;
  • 记录下操作型数据是为了追踪,比如库存流水的操作状态,可以根据这个状态做回滚,或者查询正在处理中的状态,使得很多异步的动作,比如下单之前,先落一条待扣减的库存操作的流水,成功之后,再把流水的状态设置为成功,或者下单失败时候,将对应的库存操作的流水设置为失败,可以根据这个操作流水的状态反查,得到异步动作进行的当前状态;

库存操作流水模型构建

创建库存流水表 stock_log

  • 顺便生成 mybatis-generator 的一套;
  1. CREATE TABLE `stock_log` (
  2. `stock_log_id` varchar( ) COLLATE utf8_unicode_ci NOT NULL,
  3. `item_id` int NOT NULL DEFAULT '0',
  4. `amount` int NOT NULL DEFAULT '0',
  5. `status` int NOT NULL COMMENT '1 表示初始状态, 2 表示下单减库存成功, 3 表示下单回滚',
  6. PRIMARY KEY (`stock_log_id`)
  7. ) ENGINE =InnoDB DEFAULT CHARSET =utf8 COLLATE =utf8_unicode_ci;

下单前生成库存流水

  • 用户的下单请求到来时,先生成一个库存流水记录,库存流水的 status 设置成 1;
  • 下单的时候,把库存流水号带上;
  1. @RequestMapping(value = "/createorder", method = {RequestMethod.POST}, consumes = {CONTENT_TYPE_FORMED})
  2. @ResponseBody
  3. public CommonReturnType createOrder (@RequestParam(name = "itemId") Integer itemId,
  4. @RequestParam(name = "amount") Integer amount,
  5. @RequestParam(name = "promoId", required = false) Integer promoId) throws BusinessException {
  6. String token = httpServletRequest.getParameterMap().get( "token")[ ];
  7. if (StringUtils.isEmpty(token)) {
  8. throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
  9. }
  10. UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
  11. if (userModel == null) {
  12. throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
  13. }
  14. String stockLogId = itemService.initStockLog(itemId, amount);
  15. if(!mqProducer.transactionAsyncReduceStock(userModel.getId(), promoId, itemId, amount, stockLogId)) {
  16. throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
  17. }
  18. return CommonReturnType.create( null);
  19. }

下单成功后,修改库存流水的 status 为 2;

  • 整个下单逻辑要经历的步骤有:1. 查询验证,2. 减库存,3. 下单,4. 增销量,5. 设置库存流水状态;
  1. // 5. 设置库存流水状态为成功
  2. StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
  3. if (stockLogDO == null) {
  4. throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
  5. }
  6. stockLogDO.setStatus( );
  7. stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);

本地事务(下单)失败,要设置库存流水的 status 为 3;

  • 设置完了让 Broker 中的 Prepared 状态的消息回滚;
  1. @Override
  2. public LocalTransactionState executeLocalTransaction( Message message, Object args) {
  3. // 真正要执行的操作:创建订单
  4. Integer userId = ( Integer)(( Map) args). get( "userId");
  5. Integer itemId = ( Integer)(( Map) args). get( "itemId");
  6. Integer promoId = ( Integer)(( Map) args). get( "promoId");
  7. Integer amount = ( Integer)(( Map) args). get( "amount");
  8. String stockLogId = ( String)(( Map) args). get( "stockLogId");
  9. try {
  10. OrderModel orderModel = orderService. createOrder(userId, itemId, promoId, amount, stockLogId);
  11. } catch ( BusinessException e) {
  12. e. printStackTrace();
  13. // 设置对应的 stockLog 为回滚状态
  14. StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
  15. stockLogDO. setStatus( );
  16. stockLogDOMapper. updateByPrimaryKeySelective(stockLogDO);
  17. return LocalTransactionState. ROLLBACK_MESSAGE;
  18. }
  19. return LocalTransactionState. COMMIT_MESSAGE;
  20. }

Broker 长时间没收到 Commit 提交要走反查逻辑

  • 反查的依据就是库存操作流水号;
  1. @Override
  2. public LocalTransactionState checkLocalTransaction( MessageExt msg) {
  3. // 根据是否扣减库存成功,来判断要返回 COMMIT, ROLLBACK, 还是继续 UNKNOW
  4. String jsonString = new String(msg. getBody());
  5. Map< String, Object> map = JSON. parseObject(jsonString, Map. class);
  6. String stockLogId = ( String) map. get( "stockLogId");
  7. StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
  8. if (stockLogDO == null) {
  9. return LocalTransactionState. UNKNOW;
  10. }
  11. if (stockLogDO. getStatus(). intValue() == ) {
  12. return LocalTransactionState. COMMIT_MESSAGE;
  13. } else if (stockLogDO. getStatus(). intValue() == ) {
  14. return LocalTransactionState. UNKNOW;
  15. }
  16. return LocalTransactionState. ROLLBACK_MESSAGE;
  17. }