ElasticDL https://github.com/sql-machine-learning/elasticdl
ElasticDL 的首要設計意圖是簡化分布式程式設計。它允許使用者隻提供用 TensorFlow 2.0 API 描述的模型,而不需要使用者寫分布式訓練過程代碼。使用者的模型定義隻要能在本地調通,即可在分布式環境下用大規模資料訓練模型,進而提升研發效率。
同時,ElasticDL 提供的彈性排程的能力在實踐中可以讓叢集的利用高達 90%。當叢集資源不足時,一個訓練作業裡的程序減少;當其他作業結束釋放資源後,程序數量随之增加。這樣的做法比 TensorFlow Distribution Strategy 專注容錯(程序減少的情況下作業不失敗,但不會增加程序數量)更進一步。并且,因為 ElasticDL 作業容忍變化的 worker 數量,是以每個作業的啟動都不必等待叢集有足夠的資源,而是可以見縫插針的盡早開始訓練,進而縮短等待作業啟動的時間,讓研發人員可以盡快看到第一個疊代的結果,萬一分布式訓練有問題,也能盡早發現,進而進一步提升了研發效率。
簡化分布式深度學習程式設計
為了從海量資料中學習規律,我們需要編寫分布式深度學習程式來完成訓練任務。這在工業場景中尤為常見。
可分布式深度學習程式的編寫很難——程式設計者既要了解深度學習,也要了解分布式系統開發。在一個分布式深度學習系統中,需要啟動和監控若幹個 workers。因為既要拆分訓練資料給 workers,還要綜合各個 worker 算出的 gradients 來更新模型,是以涉及通信 (Communication) 和 同步 (Synchronization)。此外,當 worker 數目很多時,作業在執行過程中有 worker 挂掉的機率也會變得很大。如果一個 worker 挂掉,則整個作業重新開機或者恢複到最近的 checkpoint (Fault Recovery),那麼重新開機之後可能又會有 worker 挂掉導緻重新開機,于是作業不斷陷入重新開機和恢複,永遠也無法完成。這進一步要求程式設計者具備設計容錯 (Fault Tolerance) 系統的能力。其實不僅分布式深度學習,其他分布式機器學習程式、分布式離線和線上資料處理程式等各種分布式程式的寫作,都對程式設計者有類似上述要求。
一個常見的解決思路是為特定類型的作業提供分布式程式設計架構,讓使用者隻需要完形填空一樣補上業務邏輯,而分布式計算(包括通信、同步、和容錯)都由架構的代碼來完成。一個典型的例子是離線資料處理程式用 MapReduce 架構來寫。不管是 Google MapReduce 還是 Hadoop MapReduce,使用者基本都隻需填寫 map 和 reduce 兩個函數的實作即可。類似的,線上資料流系統基于 Storm 和 Flink 來寫,使用者隻需提供 bolts 和 nuts 這樣的業務邏輯定義。
在 ElasticDL 之前,螞蟻金服的同僚們使用過多種架構和類似架構的高層 API。這些方案大都基于 TensorFlow 和 Kubernetes。
-
TensorFlow Estimator 作為建構在 TensorFlow 之上的一層 API,允許使用者隻需定義模 型,而訓練過程封裝在一個函數調用裡。利用 Kubeflow 提供的 TF operator,我們可以将該訓練過程以分布式作業的方式啟動在Kubernetes 上。這個方案的局限是:它僅支援 TensorFlow 的 graph mode,不支援 eager execution;而 eager execution 可以大幅簡化調試,尤其友善跟蹤網絡各層輸出。
Keras API 支援 TensorFlow 2.x 和 eager execution。目前 TensorFlow 2.x Keras API 還暫不支援 ParameterServer 分布式政策,對 AllReduce 分布式政策提供了實驗性的支援。
Horovod 對使用者代碼有侵入性,使用者除了必須熟悉 TensorFlow API 之外,還需學習 Horovod API。
以上三個方案的共同局限是,雖然具備一定的容錯能力,不過不支援彈性排程。而且它們都依賴部署 Kubernetes operator,了解 Kubernetes 對 AI 專家來說頗有挑戰。
針對這些局限,我們設計和開發了 ElasticDL 分布式計算架構。使用者定義可以用 TensorFlow 2.x 的 Keras API 來定義模型。并且,分布式執行不要求 Kubernetes 叢集有 任何特殊配置,而是利用每個作業裡的 master 程序來協調訓練資料配置設定、通信、同步和容錯——這也是 ElasticDL 除了容錯,支援彈性排程的原因。
基于 ElasticDL 架構的程式設計
就像 MapReduce 架構中隻需要使用者完形填空兩個函數:map 和 reduce,ElasticDL需要使用者填寫 forward、loss、optimizer、feed 函數。其中 forward 定義深度學習的前向計算過程 (Forward Pass),ElasticDL 會調用 TensorFlow eager execution 的 GradientTape 機制來自動推導對應的後向計算過程 (Backward Pass);loss 函數傳回模 型訓練時使用的損失函數;optimizer 函數傳回模型訓練時使用的優化器;feed 定制化訓練資料到 TensorFlow 模型輸入 (tensors) 的轉換過程。
所有這些函數的程式設計隻需要了解 TensorFlow API,不需要對分布式訓練有任何背景知識。寫完之後,使用者可以在單機上用小資料做調試驗證。如果通過,可以不做任何代碼修改就送出到 Kubernetes 叢集上做分布式的容錯的大規模訓練。
不同于 Kubeflow/TF-operator 給每個叢集部署一個 Kubernetes Operator 的方式, ElasticDL 為每個作業引入一個 master 程序。通過調用 Kubernetes API,master 程序了解叢集情況;同時,作為作業的一部分,master 還了解深度學習作業的特點——包括利用 Python inspection 機制了解上述各個函數的特點,其中調用的 API 函數等。是以, master 有非常充分的資訊來做更優的排程。比如 master 可以請 Kubernetes 把兩個 worker 啟動在同一台實體機上,共用一個 GPU——當一個 worker 讀資料的時候,請另外一個 worker 來做計算,進而始終保持較高的 GPU 使用率。
一個例子
我們用一個 MNIST 手寫數字識别的例子來說明。
def forward():
inputs = tf.keras.Input(shape=(28, 28), name="image")
x = tf.keras.layers.Reshape((28, 28, 1))(inputs)
x = tf.keras.layers.Conv2D(32, kernel_size=(3, 3), activation="relu")(x)
x = tf.keras.layers.Conv2D(64, kernel_size=(3, 3), activation="relu")(x)
x = tf.keras.layers.BatchNormalization()(x)
x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))(x)
x = tf.keras.layers.Dropout(0.25)(x)
x = tf.keras.layers.Flatten()(x)
outputs = tf.keras.layers.Dense(10)(x)
return tf.keras.Model(inputs=inputs, outputs=outputs, name="mnist_model")
除了模型定義之外,使用者還需要指定 feed, loss, optimizer 函數。
def loss(labels, predictions):
labels = tf.reshape(labels, [-1])
return tf.reduce_mean(
input_tensor=tf.nn.sparse_softmax_cross_entropy_with_logits(
logits=predictions, labels=labels
)
)
def optimizer(lr=0.1):
return tf.optimizers.SGD(lr)
def feed(dataset, mode, _):
def _parse_data(record):
if mode == Mode.PREDICTION:
feature_description = {
"image": tf.io.FixedLenFeature([28, 28], tf.float32)
}
else:
feature_description = {
"image": tf.io.FixedLenFeature([28, 28], tf.float32),
"label": tf.io.FixedLenFeature([1], tf.int64),
}
r = tf.io.parse_single_example(record, feature_description)
features = {
"image": tf.math.divide(tf.cast(r["image"], tf.float32), 255.0)
}
if mode == Mode.PREDICTION:
return features
else:
return features, tf.cast(r["label"], tf.int32)
dataset = dataset.map(_parse_data)
if mode == Mode.TRAINING:
dataset = dataset.shuffle(buffer_size=1024)
return dataset
上述每個函數都很容易做單獨測試 (unit test)。而且,利用 TensorFlow 2.x eager execution,上述函數很容易 log 每一層的輸出。基于個特點,ElasticDL worker 在調用 forward 函數的時候,可以列印中間結果,便于調試和複現問題。
ElasticDL 的彈性訓練過程
給定上述模型定義,ElasticDL 的 master 程序按照 asynchronous 或者 synchronous SGD 方法,協調 workers 來做訓練。當使用 asynchronous SGD 方法時,master 會啟動一個高性能的 parameter server,供各個 workers 使用。當使用 synchronous SGD 時,ElasticDL 使用和才雲科技合作研發的一個 Kubernetes-native 的 fault-tolerable AllReduce 實作 FTlib。
Master 負責動态資料劃分
彈性訓練過程的一個容易被忽略的前提是動态資料劃分 (Dynamic data partitioning)。在用 MPI 寫分布式程式的時候,因為作業中程序數量是恒定的,是以經常采用靜态資料 劃分的做法 —— 在訓練之前把訓練資料預先分成 N 個檔案,對應作業中的 N 個 worker 程序。這個做法在彈性排程的時候就失效了 —— 因為彈性排程時,作業中的程序數量是可變的。為此,需要實作動态資料劃分。
ElasticDL 的動态資料劃分是基于索引的。ElasticDL 要求訓練資料是一個或者多個 RecordIO 格式的檔案,或者是 MaxCompute 資料庫系統中的表 (table)。這兩種資料源都允許 master 程序在開始訓練之前,在基本存儲單元 (block) 間快速跳躍着掃描資料,把資料分成小段,稱之為任務 (task)。每個 task 包括的内容如下:
-
1、檔案名或者表名,
2、第一條記錄相對于檔案(或者表)開始處的偏移 (offset),
3、這個 task 裡的總記錄數。
MaxCompute
https://www.alibabacloud.com/zh/product/maxcompute掃描結果是很多 tasks,master 把這些 tasks 放進一個 TODO 隊列裡。這個隊列不一定需要是 master 程序裡的資料結構,可以是放在 etcd 裡的 —— 因為 etcd 是不死的,是以 master 即使被高優先級作業搶占了,這個資訊也不會丢失;可以通過在資源富餘時重新開機 master 程序來恢複作業狀态。
掃描和劃分資料的同時,master 開始請 Kubernetes 啟動 workers,總數不超過使用者指定的數量 N(最大并發度)。每當一個 worker 啟動起來了,master 會收到 Kubernetes 發來的通知;master 在一個 etcd 資料結構裡記錄“活着”的 workers。
掃描和劃分資料結束之後,master 就依次從 TODO 隊列裡取出 task,通過 gRPC 發給某一個活着的 worker,同時 master 把這個 task 挪進 DOING 隊列裡。接收到 task 的 worker 負責打開檔案(或者表),并且從指定的 offset 開始依次讀取記錄,并且更新本地模型。根據使用者選擇的 asynchronous 或者 synchronous 算法,workers 會通過調用 parameter server 或者 AllReduce 來協調更新全局模型。
當一個 worker 處理完了接收到的 task,它通過 gRPC 傳回一個表示成功的标記;master 就把這個 task 從 DOING 隊列挪到 DONE 隊列了。當所有 task 都從 TODO 挪進了 DONE, 則說明一個 epoch 完成了。
如果一個 worker 失敗了(比如被更高優先級作業搶占了),則 master 的 gRPC call 會 timeout;此時,master 把對應的 task 從 DOING 隊列挪回 TODO 隊列了。下一次有 worker 完成 task 時,master 會把這個 task 再發出去。這裡有一個細節:有的 task 可能被某個 worker 使用了一部分,也是以影響到了模型更新;此時 worker 被搶占,那麼這部分已經被處理的資料會因為 task 的下一次分發,被重複使用。不過這個并不影響機器學習訓練要求資料統計一緻性的假設。而且其他動态資料劃分方法造成的資料複用情況可能更嚴重。
Worker 調用 TensorFlow Eager Execution
ElasticDL worker 接收到的一個 task 通常包括多個 minibatches。對于每個 task, worker 打開對應的檔案或者表,随後做如下操作:
-
讀取一個 mini-batch 的訓練資料。
用本地模型 (local model) 作為參數調用使用者定義的 forward 函數以計算 cost。如果 模型很大,則部分參數可能來自于 parameter server。
給定 cost,worker 利用 TensorFlow eager execution 的 GradientTape 機制,進行 backward 計算,得到梯度 (gradient)。
如果是 synchronous SGD,此時 worker 調用 AllReduce 實作 FTlib 來同步 gradients 并且更新模型。如果是 asynchronous SGD,worker 不定時的向 parameter server 上傳 gradients,也不定時地從 parameter server 擷取全局模型參數。
高效訓練的優化
相對于 2019 年秋季 ElasticDL 在 Google Developer Day 上亮相時的狀态,最近幾個月 ElasticDL 項目針對性能優化做了很多工作。當時 ElasticDL 使用 Redis 作為 parameter server。現在有了自己的用 Go 語言寫的 parameter server。相對于 Redis, ElasticDL parameter server 可以做一些深度學習計算,進而減少 worker 和 parameter server 之間通信的次數。
這個變化和其他優化工作一起讓同樣的訓練作業,總體訓練時間下降了約 13 倍。最近一個基于 DeepFM 模型的試驗展示,用兩個 parameter server 程序和四個 workers 程序來訓練,10 個 epochs 的總體時間從 1350 秒(ElasticDL 的 2019年9月版本)下降到 106 秒(2020年2月版本)。這些優化政策包括:
-
在 parameter server 上惰性初始化 (lazy initialize) embedding vectors —— 在使用到 vector 的時候才初始化。
把一個 embedding table 拆分到多個 parameter server 程序裡以均衡存儲與通信負載。
worker 從 PS 請求 embedding vectors 時,先濾除重複的 embedding ID,隻取回不同 ID 的 vectors,進而減少通信量。
worker 向 PS 發送梯度時,先把相同 ID 的梯度進行合并(調用 TensorFlow 的 embedding vector combination 函數),進而減少通信量。
彈性排程提升叢集使用率
ElasticDL 實作的彈性排程和剛性排程 (Gang Scheduling) 是對應的。剛性排程的簡潔不求甚解的描述是:一個作業裡的 n 個程序,運作時如果有一個程序挂了(比如被更高優先級的作業搶占了資源),則整個作業挂掉。等資源足夠再啟動所有的 n 個程序了, 則可以重新開機(或者從最近的 checkpoint 恢複)。
- Gang Scheduling https://en.wikipedia.org/wiki/Gang_scheduling
上文提到的幾種分布式運作 TensorFlow 作業的方式都使用了 Kubeflow 項目提供的 Kubernetes operators,支援在 Kubernetes 上分布式地運作 TensorFlow 作業。因為 TensorFlow runtime 目前支援一定程度的容錯,是以作業執行過程中,如果有一些 workers 挂了,剩下的可以繼續。不過不支援因為日後資源富餘,恢複 workers 數量。XGBoost、MXNet 社群也習慣于複用 Kubeflow 的 Kubernetes operator。用 MPI 寫的程式也可以用 Kubeflow 拉起。
而彈性排程 (Elastic Scheduling) 實作的是訓練作業運作過程中,程序數量的變化不影響作業進行。具體的說,如果一個或者幾個程序被高優先級的作業搶占,剩下的程序不受影響地繼續進行。如果将來資源豐沛了,系統可以加幾個程序,此時作業仍然不受影響地繼續運作。
上文簡述了 ElasticDL 實作彈性排程的機制,包括動态資料配置設定以及由 master 來啟動、監控、和管理 workers,而不依賴 Kubernetes operator。本節展示三個 benchmark 試驗,幫助大家直覺地了解 ElasticDL 對叢集使用率和研發效率的同時提升。
實驗一:多個AI訓練作業并發
考慮兩個 AI 訓練作業需要的資源總和略超過叢集的情況:如果沒有 elastic scheduling,則兩個作業順序執行。第二個作業的發起人需要等很久——使用者體驗不好。并且任何時刻隻有一個作業在運作——叢集資源用不滿。而如果有 elastic scheduling,則兩個作業并發執行,雖然後啟動的作業拿不到期待的全部資源,但是也馬上就開始執行了——使用者體驗好,而且因為作業并發叢集被用滿。
我們做了一個實驗來驗證上述好處。這個實驗可以在 ASI 叢集和開源 Kubernetes 叢集上複現。實驗結果如下圖。
Figure 1: overlap jobs
上圖對應的實驗裡,我們用 gang scheduling 的方式送出了兩個訓練作業,每個作業都需要 13 個 CPU。而 Google Cloud 上租用的實驗叢集總 CPU 數是 24, 不足同時運作兩個作業,是以依次運作它們。可以看到第一個作業在 395 秒時結束。随後叢集花了一點時間排程,然後開始運作第二個作業,直到 795 秒時結束。
下圖對應的實驗裡,我們用 ElasticDL 來執行同樣的兩個訓練作業。第一個作業送出之後的 30 秒,我們送出了第二個作業。第二個作業馬上就開始運作,用滿了叢集剩下的資源,而不需要等到第一個作業結束。在 395 秒時,第一個作業結束。随後,在 580 秒時,第二個作業也結束了。因為彈性排程,使得兩個作業盡量同時運作,是以總結束時間比也上圖要早。
總結
-
使用者等待作業啟動時間幾乎是 0。這對于 AI 工作很重要,因為使用者最關注的是第一個疊代盡快開始—— 如果第一個疊代失敗了,很可能是使用者程式的 bug。另外,深度學習模型往往需要手動調優,學習率、optimizer、activation 等配置如果不合理,往往在前幾個疊代就能發現;是以第一個疊代能立刻開始,對模型調優的工作效率提高有很大幫助。
叢集使用率高。第二個實驗 (elastic scheduling) 執行期間,有一段時間叢集使用率是 100%;其他時間也不低于第一個實驗 (gang scheduling)。
作業完成更快。第二個試驗裡,兩個作業用了約 580 秒;第一個實驗裡需要約 795 秒。
實驗二:AI作業和線上服務混布
運作各種線上服務的生産叢集,通常需要留出餘量資源,以應付突然增長的使用者請求量。我們希望利用這些“餘量”來做 AI 訓練,進而提升叢集使用率。下面實驗驗證:通過用較低優先級運作 ElasticDL 訓練作業,在使用者請求增加的時候,Kubernetes 自動擴容線上服務 (NGINX);此時 ElasticDL 作業自動釋放資源,配合線上服務的擴容。當流量高峰過去之後,Kubernetes 自動縮容 NGINX 服務,此時,ElasticDL 自動利用釋放的資源。
Figure 2: auto react
圖中紫色曲線是 NGINX 服務使用的 CPU 數量,随使用者請求數量變化。綠色曲線是 ElasticDL 訓練作業使用的 CPU 數量,随 NGINX 的資源需求自動變化。藍色曲線是叢集的總體資源使用率——保持在 90% 以上。
實驗三:訓練時更改 worker 數量不影響收斂性
有使用者擔心訓練過程中 worker 的數量發生變化,會導緻不收斂。實際情況下從未發生這類問題。用 ElasticDL 和用 gang scheduling 分别訓練 Wide & Deep model 和 xDeepFM model, 收斂曲線如下:
Figure 3: wide-n-deep training converges
Figure 4: xdeepfm training converges
可以看到,采用 gang scheduling 持續用 4 個或者 8 個 workers,和用 ElasticDL 并且 worker 數量在 4 到 8 之間變化,得到的收斂曲線很難分辨。差别在自然誤差範圍之内。
螞蟻金服從事的金融行業涉及支付、微貸、和保險等業務。和搜尋、廣告、推薦不同,金融業務的流程要複雜得多——包括對使用者信用的預判以及和其他金融機構的關聯——每一個使用者請求對應很多處理步驟;而搜尋、廣告、推薦業務裡針對每個使用者請求的 AI 處理步驟少得多。行業特點導緻螞蟻金服要訓練的模型的類型繁多,呈現更長尾的特點。也對工具提升研發效率提出了高要求。ElasticDL 正是針對這些特點設計的。
同時,對叢集的使用率提升是各行各業都關注的。在很多公司和行業,AI 叢集的使用率通常在 30% 以下。當通過全面實作彈性排程,把叢集使用率提升到 90% 左右時,相當于空手套白狼地把叢集規模擴大了為原來的三倍多。是以節省的硬體投資可能高達數千萬甚至數億元人民币。
ElasticDL 的設計和實作依托了 TensorFlow 2.x 提供的高效率的模型描述 API。也依賴了 TensorFlow eager execution 提供的 GradientTape 機制 —— 使得 ElasticDL 可以在不改變 TensorFlow runtime 的情況下,結合 Kubernetes 實作徹底的彈性排程(程序數可增也可減),進而實作了減少作業啟動 的等待時間,提升叢集使用率,和提升研發效率的效果。
目前 ElasticDL 在阿裡系結合 PAI 平台在推廣。PAI 平台提供的拖拽式程式設計模式進一步降低了端到端機器學習流程的研發門檻。希望接下來 ElasticDL 團隊可以有更多結合業務實踐的分享。