天天看点

Spring-Statemachine状态机最佳实践

状态机

有限状态机是一种用来进行对象行为建模的工具,其作用主要是描述对象在它的生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。在电商场景(订单、物流、售后)、分布式集群管理(分布式计算平台任务编排)等场景都有大规模的使用。

Spring-Statemachine状态机最佳实践

状态机的要素:

状态机可归纳为4个要素,现态、条件、动作、次态。“现态”和“条件”是因,“动作”和“次态”是果。

  1. 现态:指当前所处的状态
  2. 条件:又称“事件”,当一个条件被满足,将会触发一个动作,或者执行一次状态的迁移
  3. 动作:条件满足后执行的动作。动作执行完毕后,可以迁移到新的状态,也可以仍旧保持原状态。动作不是必须的,当条件满足后,也可以不执行任何动作,直接迁移到新的状态。
  4. 次态:条件满足后要迁往的新状态。“次态”是相对于“现态”而言的,“次态”一旦被激活,就转换成“现态”。

状态机动作类型:

  1. 进入动作:在进入状态时进行
  2. 退出动作:在退出状态时进行
  3. 输入动作:依赖于当前状态和输入条件进行
  4. 转移动作:在进行特定转移时进行

Spring-statemachine

具体使用步骤如下:

maven引入

<dependency>
    <groupId>org.springframework.statemachine</groupId>
    <artifactId>spring-statemachine-starter</artifactId>
</dependency>           

定义业务状态

public enum OrderStatus implements MachineState {

    // 待支付
    WAIT_PAYMENT,

    // 待发货
    WAIT_DELIVER,

    // 待收货
    WAIT_RECEIVE,

    // 订单结束
    FINISH;

}

public interface MachineState {

    /**
     * 状态名称
     *
     * @return
     */
    String name();

}           

定义业务事件

public enum OrderEvent implements MachineEvent {

    // 支付
    PAYED,

    // 发货
    DELIVERY,

    // 确认收货
    RECEIVED;

}

public interface MachineEvent {

    /**
     * 事件名称
     *
     * @return
     */
    String name();

}           

状态机配置

@Slf4j
@Configuration
@EnableStateMachineFactory(name = OrderStateMachineConfig.ORDER_STATE_MACHINE_FACTORY_NAME)
public class OrderStateMachineConfig extends StateMachineConfigurerAdapter<OrderStatus, OrderEvent> {

    /**
     * 订单状态机ID
     */
    public static final String ORDER_STATE_MACHINE_ID = "orderStateMachineId";
    /**
     * 订单状态机工厂名称
     */
    public static final String ORDER_STATE_MACHINE_FACTORY_NAME = "orderStateMachineFactory";

    /**
     * 状态监听器
     *
     * @param config
     * @throws Exception
     */
    @Override
    public void configure(StateMachineConfigurationConfigurer<OrderStatus, OrderEvent> config) throws Exception {
        config
                .withConfiguration()
                .machineId(ORDER_STATE_MACHINE_ID)
                .transitionConflictPolicy(TransitionConflictPolicy.CHILD)
                .regionExecutionPolicy(RegionExecutionPolicy.PARALLEL);
    }

    /**
     * 配置状态
     *
     * @param states
     * @throws Exception
     */
    @Override
    public void configure(StateMachineStateConfigurer<OrderStatus, OrderEvent> states) throws Exception {
        states.withStates()
                //定义了初始状态
                .initial(OrderStatus.WAIT_PAYMENT)
                // 定义结束状态
                .end(OrderStatus.FINISH)
                //指定了使用上一步中定义的所有状态作为该状态机的状态定义
                .states(EnumSet.allOf(OrderStatus.class));
    }

    /**
     * 配置状态转换事件关系
     * 命名中我们很容易理解每一个迁移动作,都有来源状态source,目标状态target以及触发事件event
     *
     * @param transitions
     * @throws Exception
     */
    @Override
    public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderEvent> transitions) throws Exception {
        transitions
                // 待支付 到 待发货 的事件 支付
                .withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderEvent.PAYED)
                .and()
                // 待发货 到 待收货 的事件 发货
                .withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderEvent.DELIVERY)
                .and()
                // 待收货 到 订单结束 的事件 确认收货
                .withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderEvent.RECEIVED);
    }
}           

定义业务对象

@Data
public class NewOrder implements MatchineObj<OrderStatus> {

    private String id;

    private String orderNo;

    private String name;

    /**
     * 状态
     */
    private String status;

    @Override
    public String matchineId() {
        return OrderStateMachineConfig.ORDER_STATE_MACHINE_ID;
    }

    @Override
    public String factoryName() {
        return OrderStateMachineConfig.ORDER_STATE_MACHINE_FACTORY_NAME;
    }

    @Override
    public String stateName() {
        return status;
    }

    @Override
    public Class<OrderStatus> machineStateEnum() {
        return OrderStatus.class;
    }
}

public interface MatchineObj<S extends MachineState> {

    /**
     * 状态机id
     *
     * @return
     */
    String matchineId();

    /**
     * bean 工厂名称
     *
     * @return
     */
    String factoryName();

    /**
     * 状态名称
     *
     * @return
     */
    String stateName();

    /**
     * 枚举状态class
     *
     * @return
     */
    Class<S> machineStateEnum();
}           

定义业务状态机

@Slf4j
@Service
public class BusinessStateMachineService implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    /**
     * 状态机下一步
     *
     * @param matchineObj 状态对象
     * @param event       事件对象
     * @param <S>
     * @param <E>
     */
    @Deprecated
    public <S extends Enum<S> & MachineState, E extends Enum<E> & MachineEvent> boolean next(final MatchineObj<S> matchineObj, final E event) {
        AtomicBoolean success = new AtomicBoolean(Boolean.FALSE);
        final StateMachinePersister<S, E, MatchineObj<S>> stateMachinePersist = getStateMachinePersist(matchineObj, event);
        final StateMachineFactory<S, E> stateMachineFactory = getStateMachineFactory(matchineObj);
        final StateMachine<S, E> stateMachine = stateMachineFactory.getStateMachine(matchineObj.matchineId());
        try {
            // 恢复状态
            stateMachinePersist.restore(stateMachine, matchineObj);
            final Message<E> data = MessageBuilder.withPayload(event).build();
            // 执行下一步
            stateMachine.sendEvent(Mono.just(data))
                    .subscribe(machineEventResult -> {
                        switch (machineEventResult.getResultType()) {
                            case ACCEPTED:
                                success.set(Boolean.TRUE);
                                break;
                            case DEFERRED:
                            case DENIED:
                                // 拒绝的
                                throw new IllegalArgumentException("改状态失败: machineId=" + matchineObj.matchineId() +
                                        ", factoryName=" + matchineObj.factoryName() +
                                        ", state=" + matchineObj.stateName() +
                                        ", event=" + event.name());
                            default:
                        }
                    });
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalArgumentException("改状态失败: machineId=" + matchineObj.matchineId() +
                    ", factoryName=" + matchineObj.factoryName() +
                    ", state=" + matchineObj.stateName() +
                    ", event=" + event.name());
        } finally {
            stateMachine.stopReactively().subscribe();
        }
        return success.get();
    }

    /**
     * 状态机下一步
     * 使用注意:先执行状态流转,然后在操作业务。具体用乐观锁或者分布式锁都可以
     *
     * @param matchineObj    状态对象
     * @param event          事件对象
     * @param resultConsumer 成功之后消费
     * @param <S>
     * @param <E>
     */
    public <S extends Enum<S> & MachineState, E extends Enum<E> & MachineEvent> void next(final MatchineObj<S> matchineObj, final E event, Consumer<MatchineObj<S>> resultConsumer) {
        final StateMachinePersister<S, E, MatchineObj<S>> stateMachinePersist = getStateMachinePersist(matchineObj, event);
        final StateMachineFactory<S, E> stateMachineFactory = getStateMachineFactory(matchineObj);
        final StateMachine<S, E> stateMachine = stateMachineFactory.getStateMachine(matchineObj.matchineId());
        try {
            // 恢复状态
            stateMachinePersist.restore(stateMachine, matchineObj);
            final Message<E> data = MessageBuilder.withPayload(event).build();
            // 执行下一步
            stateMachine.sendEvent(Mono.just(data))
                    .subscribe(machineEventResult -> {
                        switch (machineEventResult.getResultType()) {
                            case ACCEPTED:
                                // 认可的
                                Optional.ofNullable(resultConsumer).ifPresent(matchineObjConsumer -> {
                                    matchineObjConsumer.accept(matchineObj);
                                });
                                break;
                            case DEFERRED:
                                // 推迟的
                            case DENIED:
                                throw new IllegalArgumentException("改状态失败: machineId=" + matchineObj.matchineId() +
                                        ", factoryName=" + matchineObj.factoryName() +
                                        ", state=" + matchineObj.stateName() +
                                        ", event=" + event.name());
                            default:
                        }
                    });
        } catch (Exception e) {
            e.printStackTrace();
            throw new IllegalArgumentException("改状态失败: machineId=" + matchineObj.matchineId() +
                    ", factoryName=" + matchineObj.factoryName() +
                    ", state=" + matchineObj.stateName() +
                    ", event=" + event.name());
        } finally {
            stateMachine.stopReactively().subscribe();
        }
    }


    /**
     * 获取持久化对象(控制流程能不能走下一步)
     *
     * @param matchineObj 状态对象
     * @param event       事件对象
     * @param <S>
     * @param <E>
     * @return
     */
    private <S extends Enum<S> & MachineState, E extends Enum<E> & MachineEvent> DefaultStateMachinePersister<S, E, MatchineObj<S>> getStateMachinePersist(MatchineObj<S> matchineObj, E event) {
        final StateMachinePersist<S, E, MatchineObj<S>> stateMachinePersist = new StateMachinePersist<S, E, MatchineObj<S>>() {

            @Override
            public void write(StateMachineContext<S, E> context, MatchineObj<S> contextObj) {
                //这里不做任何持久化工作
            }

            @Override
            public StateMachineContext<S, E> read(MatchineObj<S> contextObj) {
                final Class<S> aClass = contextObj.machineStateEnum();
                return new DefaultStateMachineContext<S, E>(S.valueOf(aClass, contextObj.stateName()),
                        null,
                        null,
                        null,
                        null,
                        matchineObj.matchineId());
            }
        };
        return new DefaultStateMachinePersister<>(stateMachinePersist);
    }

    /**
     * 根据状态对象 获取状态机工厂
     *
     * @param matchineObj 状态对象
     * @param <S>
     * @param <E>
     * @return
     */
    private <S extends Enum<S> & MachineState, E extends Enum<E> & MachineEvent> StateMachineFactory<S, E> getStateMachineFactory(MatchineObj<S> matchineObj) {
        return applicationContext.getBean(matchineObj.factoryName(), StateMachineFactory.class);
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}           
注意: 1.状态机ID 唯一 2.状态机工厂名称 唯一

测试

初始化

// 初始化状态
final NewOrder newOrder = new NewOrder();
newOrder.setId("22");
newOrder.setName("测试订单");
newOrder.setStatus(OrderStatus.WAIT_PAYMENT.name());
// 保存数据库操作           

业务状态流转

// 正常业务逻辑
// 1.查询业务数据 知道当前是什么状态
// 2.执行什么事件
// 3.执行成功后操作哪些
final NewOrder newOrder = new NewOrder();
newOrder.setId("22");
newOrder.setName("测试订单");
newOrder.setStatus(OrderStatus.WAIT_PAYMENT.name());
stateMachineService.next(newOrder, OrderEvent.PAYED, orderStatusMatchineObj -> {
    log.info("修改状态成功");
});