数据类型
主业务数据(Master Data)
- 比如 ItemModel,记录了商品的主数据;
- 比如 item_stock 记录了商品库存的主数据;
操作型数据(Log Data)
- 比如库存扣减这样的操作发生了,需要把操作本身的过程记录下来,用于支持这种记录的数据,就是操作型数据;
- 记录下操作型数据是为了追踪,比如库存流水的操作状态,可以根据这个状态做回滚,或者查询正在处理中的状态,使得很多异步的动作,比如下单之前,先落一条待扣减的库存操作的流水,成功之后,再把流水的状态设置为成功,或者下单失败时候,将对应的库存操作的流水设置为失败,可以根据这个操作流水的状态反查,得到异步动作进行的当前状态;
库存操作流水模型构建
创建库存流水表 stock_log
- 顺便生成 mybatis-generator 的一套;
- CREATE TABLE `stock_log` (
- `stock_log_id` varchar( ) COLLATE utf8_unicode_ci NOT NULL,
- `item_id` int NOT NULL DEFAULT '0',
- `amount` int NOT NULL DEFAULT '0',
- `status` int NOT NULL COMMENT '1 表示初始状态, 2 表示下单减库存成功, 3 表示下单回滚',
- PRIMARY KEY (`stock_log_id`)
- ) ENGINE =InnoDB DEFAULT CHARSET =utf8 COLLATE =utf8_unicode_ci;
下单前生成库存流水
- 用户的下单请求到来时,先生成一个库存流水记录,库存流水的 status 设置成 1;
- 下单的时候,把库存流水号带上;
- @RequestMapping(value = "/createorder", method = {RequestMethod.POST}, consumes = {CONTENT_TYPE_FORMED})
- @ResponseBody
- public CommonReturnType createOrder (@RequestParam(name = "itemId") Integer itemId,
- @RequestParam(name = "amount") Integer amount,
- @RequestParam(name = "promoId", required = false) Integer promoId) throws BusinessException {
- String token = httpServletRequest.getParameterMap().get( "token")[ ];
- if (StringUtils.isEmpty(token)) {
- throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
- }
- UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
- if (userModel == null) {
- throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
- }
- String stockLogId = itemService.initStockLog(itemId, amount);
- if(!mqProducer.transactionAsyncReduceStock(userModel.getId(), promoId, itemId, amount, stockLogId)) {
- throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
- }
- return CommonReturnType.create( null);
- }
下单成功后,修改库存流水的 status 为 2;
- 整个下单逻辑要经历的步骤有:1. 查询验证,2. 减库存,3. 下单,4. 增销量,5. 设置库存流水状态;
- // 5. 设置库存流水状态为成功
- StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
- if (stockLogDO == null) {
- throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
- }
- stockLogDO.setStatus( );
- stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
本地事务(下单)失败,要设置库存流水的 status 为 3;
- 设置完了让 Broker 中的 Prepared 状态的消息回滚;
- @Override
- public LocalTransactionState executeLocalTransaction( Message message, Object args) {
- // 真正要执行的操作:创建订单
- Integer userId = ( Integer)(( Map) args). get( "userId");
- Integer itemId = ( Integer)(( Map) args). get( "itemId");
- Integer promoId = ( Integer)(( Map) args). get( "promoId");
- Integer amount = ( Integer)(( Map) args). get( "amount");
- String stockLogId = ( String)(( Map) args). get( "stockLogId");
- try {
- OrderModel orderModel = orderService. createOrder(userId, itemId, promoId, amount, stockLogId);
- } catch ( BusinessException e) {
- e. printStackTrace();
- // 设置对应的 stockLog 为回滚状态
- StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
- stockLogDO. setStatus( );
- stockLogDOMapper. updateByPrimaryKeySelective(stockLogDO);
- return LocalTransactionState. ROLLBACK_MESSAGE;
- }
- return LocalTransactionState. COMMIT_MESSAGE;
- }
Broker 长时间没收到 Commit 提交要走反查逻辑
- 反查的依据就是库存操作流水号;
- @Override
- public LocalTransactionState checkLocalTransaction( MessageExt msg) {
- // 根据是否扣减库存成功,来判断要返回 COMMIT, ROLLBACK, 还是继续 UNKNOW
- String jsonString = new String(msg. getBody());
- Map< String, Object> map = JSON. parseObject(jsonString, Map. class);
- String stockLogId = ( String) map. get( "stockLogId");
- StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
- if (stockLogDO == null) {
- return LocalTransactionState. UNKNOW;
- }
- if (stockLogDO. getStatus(). intValue() == ) {
- return LocalTransactionState. COMMIT_MESSAGE;
- } else if (stockLogDO. getStatus(). intValue() == ) {
- return LocalTransactionState. UNKNOW;
- }
- return LocalTransactionState. ROLLBACK_MESSAGE;
- }
数据类型
主业务数据(Master Data)
- 比如 ItemModel,记录了商品的主数据;
- 比如 item_stock 记录了商品库存的主数据;
操作型数据(Log Data)
- 比如库存扣减这样的操作发生了,需要把操作本身的过程记录下来,用于支持这种记录的数据,就是操作型数据;
- 记录下操作型数据是为了追踪,比如库存流水的操作状态,可以根据这个状态做回滚,或者查询正在处理中的状态,使得很多异步的动作,比如下单之前,先落一条待扣减的库存操作的流水,成功之后,再把流水的状态设置为成功,或者下单失败时候,将对应的库存操作的流水设置为失败,可以根据这个操作流水的状态反查,得到异步动作进行的当前状态;
库存操作流水模型构建
创建库存流水表 stock_log
- 顺便生成 mybatis-generator 的一套;
- CREATE TABLE `stock_log` (
- `stock_log_id` varchar( ) COLLATE utf8_unicode_ci NOT NULL,
- `item_id` int NOT NULL DEFAULT '0',
- `amount` int NOT NULL DEFAULT '0',
- `status` int NOT NULL COMMENT '1 表示初始状态, 2 表示下单减库存成功, 3 表示下单回滚',
- PRIMARY KEY (`stock_log_id`)
- ) ENGINE =InnoDB DEFAULT CHARSET =utf8 COLLATE =utf8_unicode_ci;
下单前生成库存流水
- 用户的下单请求到来时,先生成一个库存流水记录,库存流水的 status 设置成 1;
- 下单的时候,把库存流水号带上;
- @RequestMapping(value = "/createorder", method = {RequestMethod.POST}, consumes = {CONTENT_TYPE_FORMED})
- @ResponseBody
- public CommonReturnType createOrder (@RequestParam(name = "itemId") Integer itemId,
- @RequestParam(name = "amount") Integer amount,
- @RequestParam(name = "promoId", required = false) Integer promoId) throws BusinessException {
- String token = httpServletRequest.getParameterMap().get( "token")[ ];
- if (StringUtils.isEmpty(token)) {
- throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
- }
- UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token);
- if (userModel == null) {
- throw new BusinessException(EmBusinessError.USER_NOT_LOGIN, "用户还未登录,不能下单");
- }
- String stockLogId = itemService.initStockLog(itemId, amount);
- if(!mqProducer.transactionAsyncReduceStock(userModel.getId(), promoId, itemId, amount, stockLogId)) {
- throw new BusinessException(EmBusinessError.UNKNOWN_ERROR, "下单失败");
- }
- return CommonReturnType.create( null);
- }
下单成功后,修改库存流水的 status 为 2;
- 整个下单逻辑要经历的步骤有:1. 查询验证,2. 减库存,3. 下单,4. 增销量,5. 设置库存流水状态;
- // 5. 设置库存流水状态为成功
- StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId);
- if (stockLogDO == null) {
- throw new BusinessException(EmBusinessError.UNKNOWN_ERROR);
- }
- stockLogDO.setStatus( );
- stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO);
本地事务(下单)失败,要设置库存流水的 status 为 3;
- 设置完了让 Broker 中的 Prepared 状态的消息回滚;
- @Override
- public LocalTransactionState executeLocalTransaction( Message message, Object args) {
- // 真正要执行的操作:创建订单
- Integer userId = ( Integer)(( Map) args). get( "userId");
- Integer itemId = ( Integer)(( Map) args). get( "itemId");
- Integer promoId = ( Integer)(( Map) args). get( "promoId");
- Integer amount = ( Integer)(( Map) args). get( "amount");
- String stockLogId = ( String)(( Map) args). get( "stockLogId");
- try {
- OrderModel orderModel = orderService. createOrder(userId, itemId, promoId, amount, stockLogId);
- } catch ( BusinessException e) {
- e. printStackTrace();
- // 设置对应的 stockLog 为回滚状态
- StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
- stockLogDO. setStatus( );
- stockLogDOMapper. updateByPrimaryKeySelective(stockLogDO);
- return LocalTransactionState. ROLLBACK_MESSAGE;
- }
- return LocalTransactionState. COMMIT_MESSAGE;
- }
Broker 长时间没收到 Commit 提交要走反查逻辑
- 反查的依据就是库存操作流水号;
- @Override
- public LocalTransactionState checkLocalTransaction( MessageExt msg) {
- // 根据是否扣减库存成功,来判断要返回 COMMIT, ROLLBACK, 还是继续 UNKNOW
- String jsonString = new String(msg. getBody());
- Map< String, Object> map = JSON. parseObject(jsonString, Map. class);
- String stockLogId = ( String) map. get( "stockLogId");
- StockLogDO stockLogDO = stockLogDOMapper. selectByPrimaryKey(stockLogId);
- if (stockLogDO == null) {
- return LocalTransactionState. UNKNOW;
- }
- if (stockLogDO. getStatus(). intValue() == ) {
- return LocalTransactionState. COMMIT_MESSAGE;
- } else if (stockLogDO. getStatus(). intValue() == ) {
- return LocalTransactionState. UNKNOW;
- }
- return LocalTransactionState. ROLLBACK_MESSAGE;
- }