天天看點

解鎖雲原生 AI 技能 - 開發你的機器學習工作流準備工作開發 Pipeline送出 Pipeline檢視運作結果總結

按照上篇文章 《解鎖雲原生 AI 技能 | 在 Kubernetes 上建構機器學習系統》 搭建了一套 Kubeflow Pipelines 之後,我們一起小試牛刀,用一個真實的案例,學習如何開發一套基于 Kubeflow Pipelines 的機器學習工作流。

準備工作

機器學習工作流是一個任務驅動的流程,同時也是資料驅動的流程,這裡涉及到資料的導入和準備、模型訓練 Checkpoint 的導出評估、到最終模型的導出。這就需要分布式存儲作為傳輸的媒介,此處使用 NAS 作為分布式存儲。

  • 建立分布式存儲,這裡以 NAS 為例。此處 

    NFS_SERVER_IP

     需要替換成真實 NAS 伺服器位址
  1. 建立阿裡雲 NAS 服務,可以參考 文檔
  2. 需要在 NFS Server 中建立 

    /data

# mkdir -p /nfs
# mount -t nfs -o vers=4.0 NFS_SERVER_IP:/ /nfs
# mkdir -p /data
# cd /
# umount /nfs           
  1. 建立對應的 Persistent Volume
# cat nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: user-susan
  labels:
    user-susan: pipelines
spec:
  persistentVolumeReclaimPolicy: Retain
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteMany
  nfs:
    server: NFS_SERVER_IP
    path: "/data"
    
# kubectl create -f nfs-pv.yaml           
  1. 建立 Persistent Volume Claim
# cat nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: user-susan
  annotations:
    description: "this is the mnist demo"
    owner: Tom
spec:
  accessModes:
    - ReadWriteMany
  resources:
    requests:
       storage: 5Gi
  selector:
    matchLabels:
      user-susan: pipelines
# kubectl create -f nfs-pvc.yaml           

開發 Pipeline

由于 Kubeflow Pipelines 提供的例子都是依賴于 Google 的存儲服務,這導緻國内的使用者無法真正體驗 Pipelines 的能力。為此,阿裡雲容器服務團隊提供了基于 NAS 存儲訓練 MNIST 模型的例子,友善您在阿裡雲上使用和學習 Kubeflow Pipelines。具體步驟分 3 步: 

  • (1) 下載下傳資料 
  • (2) 利用 TensorFlow 進行模型訓練 
  • (3) 模型導出

在這 3 個步驟中,後一個步驟都依賴于前一個步驟而完成。

Kubeflow Pipelines 中可以用 Python 代碼描述這樣一個流程, 完整代碼可以檢視 

standalone_pipeline.py

我們在例子中使用了基于開源項目 

Arena

 的 

arena_op

 ,這是對于 Kubeflow 預設的 

container_op

 封裝,它能夠實作對于分布式訓練 MPI 和 PS 模式的無縫銜接,另外也支援使用 GPU 和 RDMA 等異構裝置和分布式存儲的簡單接入,同時友善從 git 源同步代碼,是一個比較實用的工具 API。 

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):
  """A pipeline for end to end machine learning workflow."""
  data=["user-susan:/training"]
  gpus=1
# 1. prepare data
  prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")
  # 2. downalod source code and train the models
  train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])
  # 3. export the model
  export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))           

Kubeflow Pipelines 會将上面的代碼轉化成一個有向無環圖 (DAG), 其中的每一個節點就是 Component (元件),而 Component (元件)之間的連線代表它們之間的依賴關系。從 Pipelines UI 可以看到 DAG 圖:

解鎖雲原生 AI 技能 - 開發你的機器學習工作流準備工作開發 Pipeline送出 Pipeline檢視運作結果總結

首先具體了解一下資料準備的部分,這裡我們提供了 

arena.standalone_job_op

 的 Python API,  需要指定該步驟的

名稱

: name; 

需要使用的容器鏡像

: image; 

要使用的資料以及其對應到容器内部的挂載目錄

: data。

這裡的 data 是一個數組格式, 如 data=["user-susan:/training"],表示可以挂載到多個資料。 其中 

user-susan

 是之前建立的 Persistent Volume Claim, 而 

/training

 為容器内部的挂載目錄。

prepare_data = arena.standalone_job_op(
    name="prepare-data",
    image="byrnedo/alpine-curl",
    data=data,
    command="mkdir -p /training/dataset/mnist && \
  cd /training/dataset/mnist && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/t10k-labels-idx1-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-images-idx3-ubyte.gz && \
  curl -O https://code.aliyun.com/xiaozhou/tensorflow-sample-code/raw/master/data/train-labels-idx1-ubyte.gz")           

而上述步驟實際上是從指定位址利用 curl 下載下傳資料到分布式存儲對應的目錄 

/training/dataset/mnist

,請注意這裡的 

/training

 為分布式存儲的根目錄,類似大家熟悉的根 mount 點;而 

/training/dataset/mnist

 是子目錄。其實後面的步驟可以通過使用同樣的根 mount 點,讀到資料,進行運算。

第二步是利用下載下傳到分布式存儲的資料,并通過 git 指定固定 commit id 下載下傳代碼,并進行模型訓練。

train = arena.standalone_job_op(
    name="train",
    image="tensorflow/tensorflow:1.11.0-gpu-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    gpus=gpus,
    data=data,
    command='''
    echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py \
    --max_steps 500 --data_dir /training/dataset/mnist \
    --log_dir /training/output/mnist  --learning_rate %s \
    --dropout %s''' % (prepare_data.output, learning_rate, dropout),
    metrics=["Train-accuracy:PERCENTAGE"])           

可以看到這個步驟比資料準備要相對複雜一點,除了和第一步驟中的 name, image,  data 和 command 一樣需要指定之外,在模型訓練步驟中,還需要指定:

  • 擷取代碼的方式: 從可重制實驗的角度來看,對于運作試驗代碼的追本溯源,是非常重要的一環。可以在 API 調用時指定 

    sync_source

     的 git 代碼源,同時通過設定 

    env

     中 

    GIT_SYNC_REV

     指定訓練代碼的 commit id;
  • gpu:  預設為 0,就是不使用 GPU;如果為大于 0 的整數值,就代表該步驟需要這個數量的 GPU 數;
  • metrics:  同樣是從可重制和可比較的實驗目的出發,使用者可以将需要的一系列名額導出,并且通過 Pipelines UI 進行直覺的顯示和比較。具體使用方法分為兩步:1. 在調用 API 時以數組的形式指定要收集名額的 metrics name 和名額的展示格式 PERCENTAGE 或者是 RAW,比如 

    metrics=["Train-accuracy:PERCENTAGE"]

    。 2. 由于 Pipelines 預設會從 stdout 日志中收集名額,你需要在真正運作的模型代碼中輸出 {metrics name}={value} 或者 {metrics name}:{value}, 可以參考具體 樣例代碼
解鎖雲原生 AI 技能 - 開發你的機器學習工作流準備工作開發 Pipeline送出 Pipeline檢視運作結果總結

值得注意的是:

在本步驟中指定了和 

prepare_data

 相同的 

data

 參數 ["user-susan:/training"],就可以在訓練代碼中讀到對應的資料,比如 

--data_dir /training/dataset/mnist

另外由于該步驟依賴于 

prepare_data

,可以在方法中通過指定 

prepare_data.output

 表示兩個步驟的依賴關系。

最後 

export_model

 是基于 

train

 訓練産生的 checkpoint,生成訓練模型:

export_model = arena.standalone_job_op(
    name="export-model",
    image="tensorflow/tensorflow:1.11.0-py3",
    sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
    env=["GIT_SYNC_REV=%s" % (commit)],
    data=data,
    command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))           

export_model

 和第二步 

train

 類似,甚至要更為簡單,它隻是從 git 同步模型導出代碼并且利用共享目錄 

/training/output/mnist

 中的 checkpoint 執行模型導出。

整個工作流程看起來還是很直覺的, 下面就可以定義一個 Python 方法将整個流程貫穿在一起:

@dsl.pipeline(
  name='pipeline to run jobs',
  description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
    dropout='0.9',
    model_version='1',
    commit='f097575656f927d86d99dd64931042e1a9003cb2'):           
@dsl.pipeline 是表示工作流的裝飾器,這個裝飾器中需要定義兩個屬性,分别是 

name

 和  

description

入口方法 

sample_pipeline

 中定義了 4 個參數: 

learning_rate

dropout

model_version

 和 

commit

, 分别可以在上面的 

train

export_model

 階段使用。這裡的參數的值實際上是   dsl.PipelineParam  類型,定義成 dsl.PipelineParam 的目的在于可以通過 Kubeflow Pipelines 的原生 UI 将其轉換成輸入表單,表單的關鍵字是參數名稱,而預設值為參數的值。值得注意的是,這裡的 dsl.PipelineParam 對應值實際上隻能是字元串和數字型;而數組和 map,以及自定義類型都是無法通過轉型進行變換的。

實際上,這些參數都可以在使用者送出工作流時進行覆寫,以下就是送出工作流對應的 UI:

解鎖雲原生 AI 技能 - 開發你的機器學習工作流準備工作開發 Pipeline送出 Pipeline檢視運作結果總結

送出 Pipeline

您可以在自己的 Kubernetes 内将前面開發工作流的 Python DSL 送出到 Kubeflow Pipelines 服務中, 實際送出代碼很簡單:

KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
  import kfp.compiler as compiler
  compiler.Compiler().compile(sample_pipeline, __file__ + '.tar.gz')
  client = kfp.Client(host=KFP_SERVICE)
  try:
    experiment_id = client.get_experiment(experiment_name=EXPERIMENT_NAME).id
  except:
    experiment_id = client.create_experiment(EXPERIMENT_NAME).id
  run = client.run_pipeline(experiment_id, RUN_ID, __file__ + '.tar.gz',
                            params={'learning_rate':learning_rate,
                                     'dropout':dropout,
                                    'model_version':model_version,
                                    'commit':commit})           
利用 

compiler.compile

 将 Python 代碼編譯成執行引擎 (Argo) 識别的 DAG 配置檔案;

通過 Kubeflow Pipeline 的用戶端建立或者找到已有的實驗,并且送出之前編譯出的 DAG 配置檔案。

在叢集内準備一個 python3 的環境,并且安裝 Kubeflow Pipelines SDK:

# kubectl create job pipeline-client --namespace kubeflow --image python:3 -- sleep infinity
# kubectl  exec -it -n kubeflow $(kubectl get po -l job-name=pipeline-client -n kubeflow | grep -v NAME| awk '{print $1}') bash           

登入到 Python3 的環境後,執行如下指令,連續送出兩個不同參數的任務:

# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
# pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
# curl -O https://raw.githubusercontent.com/cheyang/pipelines/update_standalone_sample/samples/arena-samples/standalonejob/standalone_pipeline.py
# python3 standalone_pipeline.py --learning_rate 0.0001 --dropout 0.8 --model_version 2
# python3 standalone_pipeline.py --learning_rate 0.0005 --dropout 0.8 --model_version 3           

檢視運作結果

登入到 Kubeflow Pipelines 的 UI: [

https://]()

{pipeline位址}/pipeline/#/experiments, 比如:

https://11.124.285.171/pipeline/#/experiments           
解鎖雲原生 AI 技能 - 開發你的機器學習工作流準備工作開發 Pipeline送出 Pipeline檢視運作結果總結

點選 

Compare runs

 按鈕,可以比較兩個實驗的輸入、花費的時間和精度等一系列名額。讓實驗可追溯是讓實驗可重制的第一步,而利用 Kubeflow Pipelines 本身的實驗管理能力則是開啟實驗可重制的第一步。

解鎖雲原生 AI 技能 - 開發你的機器學習工作流準備工作開發 Pipeline送出 Pipeline檢視運作結果總結

總結

實作一個可以運作的 Kubeflow Pipeline 需要的步驟是:

  1. 建構 Pipeline (流水線)中需要的最小執行單元 Component (元件),如果是利用原生定義的 

    dsl.container_ops

    , 需要建構兩部分代碼:
  • 建構運作時代碼:通常是為每個步驟建構容器鏡像,作為 Pipelines 和真正執行業務邏輯代碼之間的擴充卡。它所做的事情為擷取 Pipelines 上下文的輸入參數,調用業務邏輯代碼,并且将需要傳遞到下個步驟的輸出按照 Pipelines 的規則放到容器内的指定位置,由底層工作流元件負責傳遞。 這樣産生的結果是運作時代碼與業務邏輯代碼會耦合在一起。可以參考  Kubeflow Pipelines 的例子
  • 建構用戶端代碼:這個步驟通常是長成下面的樣子, 熟悉 Kubernetes 的朋友會發現這個步驟實際上就是在編寫 Pod Spec:
container_op = dsl.ContainerOp(
        name=name,
        image='<train-image>',
        arguments=[
            '--input_dir', input_dir,
            '--output_dir', output_dir,
            '--model_name', model_name,
            '--model_version', model_version,
            '--epochs', epochs
        ],
        file_outputs={'output': '/output.txt'}
    )
container_op.add_volume(k8s_client.V1Volume(
            host_path=k8s_client.V1HostPathVolumeSource(
                path=persistent_volume_path),
            name=persistent_volume_name))
container_op.add_volume_mount(k8s_client.V1VolumeMount(
            mount_path=persistent_volume_path,
            name=persistent_volume_name))           

利用原生定義的 

dsl.container_ops

 的好處在于靈活,由于開放了和 Pipelines 的互動接口,使用者可以在 container_ops 這個層面做許多事情。但是它的問題在于:

  • 複用度低。每個 Component 都需要建構鏡像和開發運作時代碼;
  • 複雜度高。使用者需要了解 Kubernetes 的概念,比如 resource limit,  PVC,  node selector 等一系列概念;
  • 支援分布式訓練困難。由于 

    container_op

     為單容器操作,如果需要支援分布式訓練就需要在 container_ops 中送出和管理類似 TFJob 的任務。這裡會帶來複雜度和安全性的雙重挑戰,複雜度比較好了解,安全性是說送出 TFJob 這類任務的權限會需要開放額外的權限給 Pipeline 的開發者。

另一種方式是使用 

arena_op

 這種可以重用的 Component API,它使用通用運作時代碼,可以免去重複建構運作時代碼的工作;同時利用通用一套的 

arena_op

 API 簡化使用者的使用;也支援 Parameter Server 和 MPI 等場景。建議您使用這種方式編譯 Pipelines。

  1. 将建構好的 Component (元件)拼接成 Pipeline (流水線);
  2. 将 Pipeline (流水線)編譯成 Argo 的執行引擎 (Argo) 識别的 DAG 配置檔案, 并送出 DAG 配置檔案到 Kubeflow Pipelines,  利用 Kubeflow Pipelines 自身的 UI 檢視流程結果。