天天看點

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

本篇為設計模式第二篇,第一篇可見 設計模式最佳套路 —— 愉快地使用政策模式

管道模式(Pipeline Pattern) 是責任鍊模式(Chain of Responsibility Pattern)的常用變體之一。在管道模式中,管道扮演着流水線的角色,将資料傳遞到一個加工處理序列中,資料在每個步驟中被加工處理後,傳遞到下一個步驟進行加工處理,直到全部步驟處理完畢。

PS:純的責任鍊模式在鍊上隻會有一個處理器用于處理資料,而管道模式上多個處理器都會處理資料。

何時使用管道模式

任務代碼較為複雜,需要拆分為多個子步驟時,尤其是後續可能在任意位置添加新的子步驟、删除舊的子步驟、交換子步驟順序,可以考慮使用管道模式。

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

愉快地使用管道模式

背景回放

最開始做模型平台的時候,建立模型執行個體的功能,包括:“輸入資料校驗 -> 根據輸入建立模型執行個體 -> 儲存模型執行個體到相關 DB 表”總共三個步驟,也不算複雜,是以當時的代碼大概是這樣的:

public class ModelServiceImpl implements ModelService {

    /**
     * 送出模型(構模組化型執行個體)
     */
    public CommonReponse<Long> buildModelInstance(InstanceBuildRequest request) {
        // 輸入資料校驗
        validateInput(request);
        // 根據輸入建立模型執行個體
        ModelInstance instance = createModelInstance(request);
        // 儲存執行個體到相關 DB 表
        saveInstance(instance);
    }
}           

然而沒有過多久,我們發現表單輸入資料的格式并不完全符合模型的輸入要求,于是我們要加入 “表單資料的預處理”。這功能還沒動手呢,又有業務方提出自己也存在需要對資料進行處理的情況(比如根據商家的表單輸入,生成一些其他業務資料作為模型輸入)。

是以在 “輸入資料校驗” 之後,還需要加入 “表單輸入輸出預處理” 和 “業務方自定義資料處理(可選)”。這個時候我就面臨一個選擇:是否繼續通過在 buildModelInstance 中加入新的方法來實作這些新的處理步驟?好處就是可以當下偷懶,但是壞處呢:

1、ModelService 應該隻用來接收 HSF 請求,而不應該承載業務邏輯,如果将 送出模型 的邏輯都寫在這個類當中,違反了 單一職責,而且後面會導緻 類代碼爆炸

2、将來每加入一個新的處理步驟或者删除某個步驟,我就要修改 buildModelInstance 這個本應該非常内聚的方法,違反了 開閉原則

是以,為了不給以後的自己挖坑,我覺得要思考一個萬全的方案。這個時候,我小腦袋花開始飛轉,突然閃過了 Netty 中的 ChannelPipeline —— 對哦,管道模式,不就正是我需要的嘛!

管道模式的實作方式也是多種多樣,接下來基于前面的背景,我分享一下我目前基于 Spring 實作管道模式的 “最佳套路”(如果你有更好的套路,歡迎賜教,一起讨論哦)。

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

定義管道處理的上下文

/**
 * 傳遞到管道的上下文
 */
@Getter
@Setter
public class PipelineContext {

    /**
     * 處理開始時間
     */
    private LocalDateTime startTime;

    /**
     * 處理結束時間
     */
    private LocalDateTime endTime;

    /**
     * 擷取資料名稱
     */
    public String getName() {
        return this.getClass().getSimpleName();
    }
}           

定義上下文處理器

/**
 * 管道中的上下文處理器
 */
public interface ContextHandler<T extends PipelineContext> {

    /**
     * 處理輸入的上下文資料
     *
     * @param context 處理時的上下文資料
     * @return 傳回 true 則表示由下一個 ContextHandler 繼續處理,傳回 false 則表示處理結束
     */
    boolean handle(T context);
}           

為了友善說明,我們現在先定義出最早版 【送出模型邏輯】 的上下文和相關處理器:

/**
 * 模型執行個體建構的上下文
 */
@Getter
@Setter
public class InstanceBuildContext extends PipelineContext {

    /**
     * 模型 id
     */
    private Long modelId;

    /**
     * 使用者 id
     */
    private long userId;

    /**
     * 表單輸入
     */
    private Map<String, Object> formInput;

    /**
     * 儲存模型執行個體完成後,記錄下 id
     */
    private Long instanceId;

    /**
     * 模型建立出錯時的錯誤資訊
     */
    private String errorMsg;

    // 其他參數

    @Override
    public String getName() {
        return "模型執行個體建構上下文";
    }
}           

處理器 - 輸入資料校驗:

@Component
public class InputDataPreChecker implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--輸入資料校驗--");

        Map<String, Object> formInput = context.getFormInput();

        if (MapUtils.isEmpty(formInput)) {
            context.setErrorMsg("表單輸入資料不能為空");
            return false;
        }

        String instanceName = (String) formInput.get("instanceName");

        if (StringUtils.isBlank(instanceName)) {
            context.setErrorMsg("表單輸入資料必須包含執行個體名稱");
            return false;
        }

        return true;
    }
}           

處理器 - 根據輸入建立模型執行個體:

@Component
public class ModelInstanceCreator implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--根據輸入資料建立模型執行個體--");

        // 假裝建立模型執行個體

        return true;
    }
}           

處理器 - 儲存模型執行個體到相關DB表:

@Component
public class ModelInstanceSaver implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--儲存模型執行個體到相關DB表--");

        // 假裝儲存模型執行個體

        return true;
    }
}           

到這裡,有個問題就出現了:應該使用什麼樣的方式,将同一種 Context 的 ContextHandler 串聯為管道呢?思考一下:

1、給 ContextHandler 加一個 setNext 方法,每個實作類必須指定其下一個處理器。缺點也很明顯,如果在目前管道中間加入一個新的 ContextHandler,那麼要勢必要修改前一個 ContextHandler 的 setNext 方法;另外,代碼是寫給人閱讀的,這樣做沒法一眼就直覺的知道整個管道的處理鍊路,還要進入到每個相關的 ContextHandler 中去檢視才知道。

2、給 ContextHandler 加上 @Order 注解,根據 @Order 中給定的數字來确定每個 ContextHandler 的序列,一開始時每個數字間隔的可以大些(比如 10、20、30),後續加入新的 ContextHandler 時,可以指定數字為 (11、21、31)這種,那麼可以避免上面方案中要修改代碼的問題,但是仍然無法避免要進入每個相關的 ContextHandler 中去檢視才能知道管道處理鍊路的問題。

3、提前寫好一份路由表,指定好 ”Context -> 管道“ 的映射(管道用 List 來表示),以及管道中處理器的順序 。Spring 來根據這份路由表,在啟動時就建構好一個 Map,Map 的鍵為 Context 的類型,值為 管道(即 List)。這樣的話,如果想知道每個管道的處理鍊路,直接看這份路由表就行,一目了然。缺點嘛,就是每次加入新的 ContextHandler 時,這份路由表也需要在對應管道上進行小改動 —— 但是如果能讓閱讀代碼更清晰,我覺得這樣的修改是值得的、可接受的~

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

建構管道路由表

基于 Spring 的 Java Bean 配置,我們可以很友善的建構管道的路由表:

/**
 * 管道路由的配置
 */
@Configuration
public class PipelineRouteConfig implements ApplicationContextAware {

    /**
     * 資料類型->管道中處理器類型清單 的路由
     */
    private static final
    Map<Class<? extends PipelineContext>,
        List<Class<? extends ContextHandler<? extends PipelineContext>>>> PIPELINE_ROUTE_MAP = new HashMap<>(4);

    /*
     * 在這裡配置各種上下文類型對應的處理管道:鍵為上下文類型,值為處理器類型的清單
     */
    static {
        PIPELINE_ROUTE_MAP.put(InstanceBuildContext.class,
                               Arrays.asList(
                                       InputDataPreChecker.class,
                                       ModelInstanceCreator.class,
                                       ModelInstanceSaver.class
                               ));

        // 将來其他 Context 的管道配置
    }

    /**
     * 在 Spring 啟動時,根據路由表生成對應的管道映射關系
     */
    @Bean("pipelineRouteMap")
    public Map<Class<? extends PipelineContext>, List<? extends ContextHandler<? extends PipelineContext>>> getHandlerPipelineMap() {
        return PIPELINE_ROUTE_MAP.entrySet()
                                 .stream()
                                 .collect(Collectors.toMap(Map.Entry::getKey, this::toPipeline));
    }

    /**
     * 根據給定的管道中 ContextHandler 的類型的清單,建構管道
     */
    private List<? extends ContextHandler<? extends PipelineContext>> toPipeline(
            Map.Entry<Class<? extends PipelineContext>, List<Class<? extends ContextHandler<? extends PipelineContext>>>> entry) {
        return entry.getValue()
                    .stream()
                    .map(appContext::getBean)
                    .collect(Collectors.toList());
    }

    private ApplicationContext appContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        appContext = applicationContext;
    }
}           

定義管道執行器

最後一步,定義管道執行器。管道執行器 根據傳入的上下文資料的類型,找到其對應的管道,然後将上下文資料放入管道中去進行處理。

/**
 * 管道執行器
 */
@Component
public class PipelineExecutor {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 引用 PipelineRouteConfig 中的 pipelineRouteMap
     */
    @Resource
    private Map<Class<? extends PipelineContext>,
                List<? extends ContextHandler<? super PipelineContext>>> pipelineRouteMap;

    /**
     * 同步處理輸入的上下文資料<br/>
     * 如果處理時上下文資料流通到最後一個處理器且最後一個處理器傳回 true,則傳回 true,否則傳回 false
     *
     * @param context 輸入的上下文資料
     * @return 處理過程中管道是否暢通,暢通傳回 true,不暢通傳回 false
     */
    public boolean acceptSync(PipelineContext context) {
        Objects.requireNonNull(context, "上下文資料不能為 null");
        // 拿到資料類型
        Class<? extends PipelineContext> dataType = context.getClass();
        // 擷取資料處理管道
        List<? extends ContextHandler<? super PipelineContext>> pipeline = pipelineRouteMap.get(dataType);

        if (CollectionUtils.isEmpty(pipeline)) {
            logger.error("{} 的管道為空", dataType.getSimpleName());
            return false;
        }

        // 管道是否暢通
        boolean lastSuccess = true;

        for (ContextHandler<? super PipelineContext> handler : pipeline) {
            try {
                // 目前處理器處理資料,并傳回是否繼續向下處理
                lastSuccess = handler.handle(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
            }

            // 不再向下處理
            if (!lastSuccess) { break; }
        }

        return lastSuccess;
    }
}           

使用管道模式

此時,我們可以将最開始的 buildModelInstance 修改為:

public CommonResponse<Long> buildModelInstance(InstanceBuildRequest request) {
    InstanceBuildContext data = createPipelineData(request);
    boolean success = pipelineExecutor.acceptSync(data);

    // 建立模型執行個體成功
    if (success) {
        return CommonResponse.success(data.getInstanceId());
    }

    logger.error("建立模式執行個體失敗:{}", data.getErrorMsg());
    return CommonResponse.failed(data.getErrorMsg());
}           

我們模拟一下模型執行個體的建立過程:

參數正常時:

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

參數出錯時:

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

這個時候我們再為 InstanceBuildContext 加入新的兩個 ContextHandler:FormInputPreprocessor(表單輸入資料預處理) 和 BizSideCustomProcessor(業務方自定義資料處理)。

@Component
public class FormInputPreprocessor implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--表單輸入資料預處理--");

        // 假裝進行表單輸入資料預處理

        return true;
    }
}           
@Component
public class BizSideCustomProcessor implements ContextHandler<InstanceBuildContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(InstanceBuildContext context) {
        logger.info("--業務方自定義資料處理--");

        // 先判斷是否存在自定義資料處理,如果沒有,直接傳回 true

        // 調用業務方的自定義的表單資料處理

        return true;
    }
}           

此時 buildModelInstance 不需要做任何修改,我們隻需要在 “路由表” 裡面,将這兩個 ContextHandler 加入到 InstanceBuildContext 關聯的管道中,Spring 啟動的時候,會自動幫我們建構好每種 Context 對應的管道:

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

加入新的處理器

再模拟一下模型執行個體的建立過程:

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

異步處理

管道執行器 PipelineExecutor 中,acceptSync 是個同步的方法。

小蜜:看名字你就知道你悄悄埋伏筆了。

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

對于步驟繁多的任務,很多時候我們更需要的是異步處理,比如某些耗時長的定時任務。管道處理異步化非常的簡單,我們先定義一個線程池,比如:

<!-- 專門用于執行管道任務的線程池 -->
<bean id="pipelineThreadPool"
      class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="4" /> <!-- 核心線程數 -->
    <property name="maxPoolSize" value="8" />  <!-- 最大線程數 -->
    <property name="keepAliveSeconds" value="960" />  <!-- 線程最大空閑時間/秒(根據管道使用情況指定)-->
    <property name="queueCapacity" value="256" />     <!-- 任務隊列大小(根據管道使用情況指定)-->
    <property name="threadNamePrefix" value="pipelineThreadPool" />
    <property name="rejectedExecutionHandler">
        <bean class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy" />
    </property>
</bean>           

然後在 PipelineExecutor 中加入異步處理的方法:

/**
 * 管道線程池
 */
@Resource
private ThreadPoolTaskExecutor pipelineThreadPool;

/**
 * 異步處理輸入的上下文資料
 *
 * @param context  上下文資料
 * @param callback 處理完成的回調
 */
public void acceptAsync(PipelineContext context, BiConsumer<PipelineContext, Boolean> callback) {
    pipelineThreadPool.execute(() -> {
        boolean success = acceptSync(context);

        if (callback != null) {
            callback.accept(context, success);
        }
    });
}           

通用處理

比如我們想記錄下每次管道處理的時間,以及在處理前和處理後都列印相關的日志。那麼我們可以提供兩個通用的 ContextHandler,分别放在每個管道的頭和尾:

@Component
public class CommonHeadHandler implements ContextHandler<PipelineContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(PipelineContext context) {
        logger.info("管道開始執行:context={}", JSON.toJSONString(context));

        // 設定開始時間
        context.setStartTime(LocalDateTime.now());

        return true;
    }
}           
@Component
public class CommonTailHandler implements ContextHandler<PipelineContext> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public boolean handle(PipelineContext context) {
        // 設定處理結束時間
        context.setEndTime(LocalDateTime.now());

        logger.info("管道執行完畢:context={}", JSON.toJSONString(context));

        return true;
    }
}           

通用頭、尾處理器可以在路由表裡面放置,但是每次新加一種 PipelineContext 都要加一次,好像沒有必要 —— 我們直接修改下 管道執行器 PipelineExecutor 中的 acceptSync 方法:

@Component
public class PipelineExecutor {

    ......

    @Autowired
    private CommonHeadHandler commonHeadHandler;

    @Autowired
    private CommonTailHandler commonTailHandler;

    public boolean acceptSync(PipelineContext context) {
        ......

        // 【通用頭處理器】處理
        commonHeadHandler.handle(context);

        // 管道是否暢通
        boolean lastSuccess = true;

        for (ContextHandler<? super PipelineContext> handler : pipeline) {
            try {
                // 目前處理器處理資料,并傳回是否繼續向下處理
                lastSuccess = handler.handle(context);
            } catch (Throwable ex) {
                lastSuccess = false;
                logger.error("[{}] 處理異常,handler={}", context.getName(), handler.getClass().getSimpleName(), ex);
            }

            // 不再向下處理
            if (!lastSuccess) { break; }
        }

        // 【通用尾處理器】處理
        commonTailHandler.handle(context);

        return lastSuccess;
    }
}           

總結

通過管道模式,我們大幅降低了系統的耦合度和提升了内聚程度與擴充性:

  • ModelService 隻負責處理 HSF 請求,不用關心具體的業務邏輯
  • PipelineExecutor 隻做執行工作,不用關心具體的管道細節
  • 每個 ContextHandler 隻負責自己那部分的業務邏輯,不需要知道管道的結構,與其他ContextHandler 的業務邏輯解耦
  • 新增、删除 或者 交換子步驟時,都隻需要操作路由表的配置,而不要修改原來的調用代碼
設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結

關注「淘系技術」微信公衆号,一個有溫度有内容的技術社群~

設計模式最佳套路2 —— 愉快地使用管道模式何時使用管道模式愉快地使用管道模式總結