天天看點

深入淺出Netflix Conductor使用

Netflix Conductor架構是典型的服務編排架構,通過Conductor還可以實作工作流和分布式排程,性能非常卓越。

關于Conductor的基本概念在 https://netflix.github.io/conductor/intro/ 文中已經有深入介紹,本篇将以實戰案例為出發點深入介紹Conductor的使用。

一、Conductor的功能全景圖

深入淺出Netflix Conductor使用

image.png

在正式使用之前我們先來了解Conductor都有哪些功能,通過流程、任務、曆史、監控、用戶端、通信和管理背景幾個層面來做了功能歸類。

  • 流程

    流程引擎預設是用DSL來編寫流程定義檔案,這是一種JSON格式的檔案,我們的工作流案例就是以這個定義檔案為驅動的,但是很可惜目前Conductor隻支援手寫定義,無法通過界面生成,這塊就需要後面通過改造Conductor來增加相應功能。

  • 任務

    這裡面包括的主要是和任務相關的功能,通過這個功能可以進行簡單工作流的實作,還可以進行并行計算。

  • 曆史

    如果想要檢視之前進行過的(完成,失敗等終态)曆史任務,通過這個功能就可以實作。

  • 監控

    當工作流任務流程非常冗長的時候,我們對每個節點的任務運作情況并不了解,這時候就需要有一個任務監控功能及時知道流程的狀态友善我們做出相應決策。同時還有一個重要功能是任務排程,通過這個功能可以實作類似xxl-job的功能,滿足分布式定時排程的需求。

  • 用戶端和通信

    這二個功能本是一體的,既然Conductor是分布式的任務流程那麼核心原理就是通過Server+Worker的方式,利用核心狀态機發消息的方式來驅動用戶端的任務執行,而Worker的實作是跨語言的,可以用JAVA、Python、go等語言實作,而Worker需要長輪詢Server端的狀态來判斷當然是否有自己的任務來執行。

  • 管理背景

    通過管理背景可以檢視任務和工作流的中繼資料定義,工作流的執行狀态等。

二、Conductor的架構圖

深入淺出Netflix Conductor使用

image.png

其中:Task Queues使用Dyno-queues做任務延遲。

三、實戰案例

通過指令行将Netflix Conductor Sever端啟動之後( https://netflix.github.io/conductor/intro/#installing-and-running 介紹了如何安裝Conductor),通路localhost:8080位址顯示如下頁面:

深入淺出Netflix Conductor使用

image.png

這個頁面主要負責的是關于Conductor的任務、工作流的中繼資料管理,提供了很多http接口可供使用,如下圖所示:

深入淺出Netflix Conductor使用

image.png

我們可以直接調用預設提供的接口頁面通過傳遞參數來進行任務和工作流的定義,當然也可以自己寫頁面調用相應的URL來進行。首先我們要先進行任務檔案的定義,如下圖所示:

深入淺出Netflix Conductor使用

image.png

在這個截圖中,我們定義了二個任務,分别是leaderRatify和managerRatify,截圖中的原始定義檔案如下:

[
 
{
 
  "name": "leaderRatify",
 
  "retryCount": 3,
 
  "timeoutSeconds": 1200,
 
  "inputKeys": [
 
    "staffName",
 
    "staffDepartment"
 
  ],
 
  "outputKeys": [
 
    "leaderAgree",
 
    "leaderDisagree"
 
  ],
 
  "timeoutPolicy": "TIME_OUT_WF",
 
  "retryLogic": "FIXED",
 
  "retryDelaySeconds": 600,
 
  "responseTimeoutSeconds": 3600
 
},
 
{
 
  "name": "managerRatify",
 
  "retryCount": 3,
 
  "timeoutSeconds": 1200,
 
  "inputKeys": [
 
    "managerName",
 
    "managerDeparment"
 
  ],
 
  "outputKeys": [
 
    "managerAgree",
 
    "managerDisagree"
 
  ],
 
  "timeoutPolicy": "TIME_OUT_WF",
 
  "retryLogic": "FIXED",
 
  "retryDelaySeconds": 600,
 
  "responseTimeoutSeconds": 3600
 
}
 
]           

複制

任務定義好之後,接下來需要通過任務建立工作流定義,如下圖所示:

深入淺出Netflix Conductor使用

image.png

工作流定義檔案就是我們整個流程所走的路徑,将流程檔案轉換成流程圖如下所示:

深入淺出Netflix Conductor使用

image.png

流程定義檔案的原始檔案内容如下:

{
  "updateTime": 1540448903202,
  "name": "Leave process",
  "description": "a demo for workflow",
  "version": 1,
  "tasks": [
    {
      "name": "leaderRatify",
      "taskReferenceName": "node1",
      "inputParameters": {
        "staffName": "${workflow.input.staffName}",
        "staffDepartment": "${workflow.input.staffDepartment}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    },
    {
      "name": "managerRatify",
      "taskReferenceName": "node2",
      "inputParameters": {
        "managerName": "${node1.output.leaderName}",
        "managerDepartment": "${node1.output.leaderDepartment}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    }
  ],
  "outputParameters": {
    "leaderName": "${node1.output.leaderName}",
    "leaderDepartment": "${node1.output.leaderDepartment}",
    "managerAgree": "${node2.output.managerAgree}",
    "managerDisagree": "${node2.output.managerDisagree}"
  },
  "restartable": true,
  "schemaVersion": 2
}           

複制

上面的流程主要介紹了Task任務定義檔案、工作流流程檔案如何定義和上傳的,這二個檔案主要是提供給Conductor的狀态機使用,而我們真正的任務Worker則需要自己寫java代碼來實作,然後通過長輪詢Conductor Server來擷取自己的狀态以及任務步驟,Worker代碼如下所示:

class LeaderRatifyWorker implements Worker {
    private String taskDefName;
    public SampleWorker(String taskDefName) {
        this.taskDefName = taskDefName;
    }
    @Override
    public String getTaskDefName() {
        return taskDefName;
    }
    @Override
    public TaskResult execute(Task task) {
        System.out.printf("Executing %s%n", taskDefName);
        System.out.println("staffName:" + task.getInputData().get("staffName"));
        System.out.println("staffDepartment:" + task.getInputData().get("staffDepartment"));
        TaskResult result = new TaskResult(task);
        result.setStatus(TaskResult.Status.COMPLETED);
        //Register the output of the task
        result.getOutputData().put("outputKey1", "value");
        result.getOutputData().put("oddEven", 1);
        result.getOutputData().put("mod", 4);
        result.getOutputData().put("leaderAgree", "yes");
        result.getOutputData().put("leaderDisagree", "no");
        return result;
    }
}
class ManagerRatifyWorker implements Worker {
    private String taskDefName;
    public SampleWorker2(String taskDefName) {
        this.taskDefName = taskDefName;
    }
    @Override
    public String getTaskDefName() {
        return taskDefName;
    }
    @Override
    public TaskResult execute(Task task) {
        System.out.printf("Executing %s\n", taskDefName);
        System.out.println("managerName:" + task.getInputData().get("managerName"));
        System.out.println("managerDepartment:" + task.getInputData().get("managerDepartment"));
        TaskResult result = new TaskResult(task);
        result.setStatus(TaskResult.Status.COMPLETED);
        //Register the output of the task
        result.getOutputData().put("managerAgree", String.valueOf(task.getInputData().get("managerName")));
        result.getOutputData().put("managerDisagree", String.valueOf(task.getInputData().get("managerDepartment")));
 
        return result;
    }
}
  
//在main方法中建立工作Worker以及設定需要通路的Conductor Server端api位址,并将流程進入初始化
 public static void main(String[] args) {
        TaskClient taskClient = new TaskClient();
        taskClient.setRootURI("http://localhost:8080/api/");       //Point this to the server API
        int threadCount = 2;         //number of threads used to execute workers.  To avoid starvation, should be same or more than number of workers
        Worker worker1 = new LeaderRatifyWorker("leaderRatify");
        Worker worker2 = new ManagerRatifyWorker("managerRatify");
        //Create WorkflowTaskCoordinator
        WorkflowTaskCoordinator.Builder builder = new WorkflowTaskCoordinator.Builder();
        WorkflowTaskCoordinator coordinator = builder.withWorkers(worker1, worker2).withThreadCount(threadCount).withTaskClient(taskClient).build();
        //Start for polling and execution of the tasks
        coordinator.init();
}           

複制

而後通過如下界面啟動工作流,并傳入工作流輸入參數:

深入淺出Netflix Conductor使用

image.png

當流程執行完以後,我們來通路Conductor的Admin管理界面,通過localhost:5000端口通路,看到如下圖所示界面:

深入淺出Netflix Conductor使用

image.png

選擇左邊菜單的All選項,右側出現所有任務的清單:

深入淺出Netflix Conductor使用

image.png

可以看到目前所有工作流的狀态均已經是執行完畢,通過Status狀态通過看到每個工作流目前的執行狀态,分别是Running、Completed、Timed out、Terminated等狀态。點選右側Workflow清單中第一條workflowID顯示如下界面:

深入淺出Netflix Conductor使用

image.png

界面中的流程圖節點顯示為綠色,表示工作流正常的執行完畢沒有報任何故障,而右上角紅框的Restart表示可以重新開機工作流。

四、小結

通過使用Netflix Conductor後,我們首先來看一下Conductor到底能幹什麼:

  • 以藍圖為主,基于JSON DSL的藍圖定義了執行流程;
  • 跟蹤和管理工作流;
  • 能夠暫停,恢複和重新啟動流程;
  • 使用者界面可視化流程;
  • 能夠在需要時同步處理所有任務;
  • 能夠擴充到數百萬個同時運作的流程;
  • 由客戶抽象的排隊服務提供後端支援;
  • 能夠通過HTTP或其他傳輸操作,例如gRPC;

但如果要大規模使用還需要進行一些定制化開發才能使架構的功效發揮到最大:

  • 流程定義檔案需要自己手寫DSL,需要改造成通過流程設計器界面來生成。
  • 無人員和權限管理功能,需要改造增加。