天天看點

彙量科技在Spark上 建構推薦算法Pipeline的實踐

内容簡要:

一、關于彙量科技

二、一個典型的推薦算法實驗流程

三、問題和挑戰

四、在Spark上建構推薦算法Pipeline

一、關于彙量科技(Mobvista)

彙量科技是:

• 一站式的移動廣告服務和資料統計分析服務;

• 日均千億次的線上廣告個性化比對;

• 全流量 DNN 模型排序;

• 建構了一站式機器學習平台MindAlpha。

推薦算法的資料部分較為複雜,而語音圖像資料集是一個靜态的資料集。

推薦算法試驗流程解析:

1)  資料準備

對資料進行處理、結合;

2)  特征樣本生成

将資料進行特征拼接、Label拼接;

3)  特征存儲

存儲特征樣本,包含線上學習樣本和離線學習樣本;

4)  模型訓練

對樣本進行模型訓練,包括模型疊代、參數調優;

5)  線上評估

進行線上評估,如果發現問題傳回之前的流程。例如參數優化問題,則傳回模型調優,資料問題則傳回第一步資料準備,從頭開始。

推薦算法與廣告搜尋類似,首先解決資料的問題,然後解決算法的問題。

推薦算法主要存在以下四個問題:

1.資料源多,特征處理流程繁瑣

比如有實時特征、離線特征等。

2.離線學習、線上學習轉換

線上學習與離線學習之間做無縫的轉換。

3.模型訓練與資料處理的銜接

資料源有很多不同類型的存儲,算法需要适配,是以需要模型訓練與資料處理的銜接。

4.上下遊依賴多且頻繁變動

線上算法有很多流量并行的實驗,整個上下遊的依賴比較繁瑣,這方面出錯的造成的問題比較嚴重。

四、在Spark上建構推薦算法 Pipeline

(一)Why Spark?

l  選擇Spark的原因:

1)資料處理功能強大;

2)支援多種離線、實時資料源;

3)DataFrame的抽象;

4)支援多種排程系統和硬體;

5)PySpark。

(二)特征工程資料流

在解決特征資料流的問題上,最開始沒有建構排版,業務算法工程師們一般是自己做資料拼接。這種拼接沒有規範,直接拷貝代碼輸出資料源,在Spark裡面進行 Join,每個實驗形成一個資料集。

這種特征資料流存在許多問題:

• 多個實驗各自産出資料;

• 99% 的列是重複的;

• 極大計算和存儲浪費;

• 一小時的資料無法在一小時内處理完。

根據上面的問題,我們總結出特征資料流的解決方案與存儲方案。

1.特征資料流的解決方案:

•統一列存儲;

将同樣的資料進行歸置,避免多次計算。

•基礎共用特征列;

特征列共用,包括标簽。

•各實驗的特征列;

每個實驗添加自己的特征列,共用作業直接産出,其他作業隻産出自己實驗所需資料。

•模型訓練通過列名配置讀取DataFrame;

實驗資料産出後做模拟訓練,通過列名配置,從資料源裡讀取DataFrame。

2. 存儲方案選型

l  Kudu 實時更新

Online Learning

l 天級 ORC 備份

離線實驗

目前選用的是Kudu,含行級别和列級别,可以實時插入,近似于流式地寫入Kudu。寫完可以根據使用者設定的排程周期,當時間視窗累計完成,觸發一個訓練任務,這就是Online Learning準實時學習的方案。

Kudu更新之後是天級ORC備份,将Kudu一天的資料備份到 OSS上。這裡有兩個目的,第一是備份,第二是緩Kudu本身的壓力。

Kudu由本地磁盤搭建,本地磁盤存在成本大與性能瓶頸的問題。将資料存放到雲存儲上,可以借用雲存儲的服務能力做離線實驗,實驗時可以選擇Kudu資料,也可以直接在OSS讀曆史資料。

(三)打通大資料和算法架構

1.Parameter Server on PySpark

l 為什麼重新實作一套 Parameter Server?

l Python First API

n 與 Spark 排程內建

l 與 DataFrame 內建

1)Python First API

Python First API在做大規模離散深度學習訓練時,需要解決Embedding參數稀疏的問題。

Embedding定義了幾種Python接口,再向下可以分裝成Embedding操作的API。對稀疏參數更新的問題,傳統解決方法是放在Front Server節點上計算。

做Online Learning的時候,特征的個數是未知的,希望直接用原始的字元串或者用哈希值的方式做特征索引,特征的大小無法提前固定, Tenderflow架構不支援。

我們支援之後做内部轉換,将每段哈希值映射到一個連續範圍内,使得分片在比較小的範圍内,它的索引最大不會超過分片總長度,使得可以直接用于各種接口。

2.與 Spark 排程內建

對于Parameter Server來說由兩個部分,一個是Server節點,一個是Work節點。Spark從2.4開始有Rdd.barrier接口,接口的定義是所有的Excutor程序要麼不起要麼全起。

Spark存在猜測執行的機制,開啟Rdd.barrier模式後,不會重複執行已有的程序,将原本大資料處理的方式換成機器學習的方式。

Rdd.barrier可以用于起Parameter Server作業,但要起Server與Worker兩個節點。通過上述操作,完成了排程內建。

3. 與DataFrame 內建

學習架構完成後開始訓練,傳統方法是使用UDF讀取資料,Spark在2.3版本後提供了Pandas UDF,以Batch的形式将資料直接傳到Python,然後轉成Pandas的Data Frame。

由上圖可看到,訓練的代碼均為UDF,執行訓練直接調UDF,将Spark Columns傳進去即可。  

下面舉例一個更複雜更全面的Demo,功能是在離線實時預估一緻性監控On Spark Streaming。

實作監控的主要步驟:

1)事件的回流;

2)特征的回流;

3)做Window的GroupBy。

(四)解決資料上下遊排程關系

完成資料、算法與訓練後,最後要解決資料上下遊排程關系。

在算法場景下的資料上下遊排程與傳統大資料排程有一定差別,存在以下兩個問題:

問題一,最開始解決特征工程資料流時,共享部分拆成一個作業,由實驗自己産出。每個實驗自己産出額外需要的部分,此時會發現,其他實驗需要依賴共用處于最上遊的特征作業。

問題二,假設最初沒有實驗,線上是Baseline基線模型。算法工程師做實驗時,例如加一個特征,就會基于Baseline的資料,再加入工程師自己的資料,兩個資料之間自然就有上下遊關系。後續基于這個Baseline又有其他實驗,這樣就有許多分支,再往後如果有一個實驗效果不錯,則成為新的Baseline,後續新實驗就要基于新的Baseline重新進行實驗,下圖為分支管理方式。

實驗依賴關系自動化

l  每個時刻,總是有一個Baseline (基線) 模型以及若幹小流量模型(Master + 多個Branch);

l  小流量模型可能更新為基線模型(Merge);

l  此時各個實驗資料依賴上遊需要調整(Rebase);

l  基于Airflow;

l  通過一個配置檔案自動生成Airflow DAG;

l  支援Fork、Merge、Rebase等操作。

基于上下遊關系,我們将它們做成上下遊的管理機制,每一個人做實驗的時候,會去更新配置檔案,描述實驗的上遊是誰。然後配置檔案會自動重新整理,重新整理出來之後生成新的配置檔案,就是上下遊排程關系。

(五)未來方向

• PS on PySpark 2021 Q1開源

• Server/Worker異構資源配置設定(#SPARK-27495)

• 在 PS on PySpark上支援GNN