天天看點

Alink漫談(十) :線性回歸實作 之 資料預處理

Alink 是阿裡巴巴基于實時計算引擎 Flink 研發的新一代機器學習算法平台,是業界首個同時支援批式算法、流式算法的機器學習平台。本文和下文将介紹線性回歸在Alink中是如何實作的,希望可以作為大家看線性回歸代碼的Roadmap。

Alink漫談(十) :線性回歸實作 之 資料預處理

目錄

    • 0x00 摘要
    • 0x01 概念
      • 1.1 線性回歸
      • 1.2 優化模型
      • 1.3 損失函數&目标函數
      • 1.4 最小二乘法
    • 0x02 示例代碼
    • 0x03 整體概述
    • 0x04 基礎功能
      • 4.1 損失函數
        • 4.1.1 導數和偏導數
        • 4.1.2 方向導數
        • 4.1.3 Hessian矩陣
        • 4.1.4 平方損失函數 in Alink
      • 4.2 目标函數
        • 4.2.1 梯度
        • 4.2.2 梯度下降法
        • 4.2.3 目标函數 in Alink
        • 4.2.4 一進制目标函數 in Alink
        • 4.2.4.1 依據一組采樣點計算梯度
        • 4.2.4.2 根據一個采樣點更新梯度
      • 4.3 優化函數
    • 0x05 資料準備
      • 5.1 擷取label資訊
      • 5.2 把輸入轉換成三元組
      • 5.3 擷取統計變量
      • 5.4 對輸入資料做标準化和插值
    • 0xFF 參考

因為Alink的公開資料太少,是以以下均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會随時更新。

本系列目前已有十篇,歡迎大家指點

線性回歸是利用數理統計中回歸分析,來确定兩種或兩種以上變量間互相依賴的定量關系的一種統計分析方法,運用十分廣泛。其表達形式為y = w'x+e,e為誤差服從均值為0的正态分布。

線上性回歸中,目标值與特征之間存在着線性相關的關系。即假設這個方程是一個線性方程,一個多元一次方程。

基本形式:給定由 d 個屬性描述的示例 ,線性模型試圖學得一個通過屬性的線性組合來進行預測的函數,即:

\[f(x)=w_1x_1 +w_2x_2 ... +w_dx_d+b

\]

其中w為參數,也稱為權重,可以了解為x1,x2...和 xd 對f(x)的影響度。

一般形式為:

\[f(x)=w^Tx+b

假如我們依據這個公式來預測 f(x),公式中的x是我們已知的,然而w,b的取值卻不知道,隻要我們把w,b的取值求解出來,模型就得以确定。我們就可以依據這個公式來做預測了。

那麼如何依據訓練資料求解 w 和 b 的最優取值呢?關鍵是衡量 f 和 y 之間的差别。這就牽扯到另外一個概念:損失函數(Loss Function)。

假如有一個模型 f(x),如何判斷這個模型是否優秀?這種定性的判斷可以通過一個成為經驗誤差風險的數值來進行衡量,也就是模型 f 在所有訓練樣本上所犯錯誤的總和 E(x)。

我們通過在訓練集上最小化經驗損失來訓練模型。換言之,通過調節 f 的參數 w,使得經驗誤差風險 E(x) 不斷下降,最終達到最小值的時候,我們就獲得了一個 “最優” 的模型。

但是如果按照上面的定義,E(x) 是一組示性函數的和,是以是不連續不可導的函數,不易優化。為了解決這個問題,人們提出了“損失函數”的概念。損失函數就是和誤差函數有一定關系(比如是誤差函數的上界),但是具有更好的數學性質(比如連續,可導,凸性等),比較容易進行優化。是以我們就可以對損失函數來優化。

損失函數如果連續可導,是以我們可以用梯度下降法等一階算法,也可以用牛頓法,拟牛頓法等二階算法。當優化算法收斂後,我們就得到一個不錯的模型。如果損失函數是一個凸函數,我們就可以得到最優模型。

典型的優化方法:

一階算法 二階算法
确定性算法 梯度下降法 投影次梯度下降 近端梯度下降 Frank-Wolfe算法 Nesterov加速算法 坐标下降法 對偶坐标上升法 牛頓法,拟牛頓法
随機算法 随機梯度下降法 随機坐标下降法 随機對偶坐标上升法 随機方差減小梯度法 随機拟牛頓法

是以我們可以知道,優化LinearRegression模型 f 的手段一定是:确定損失函數,用 x,y 作為輸入訓練以求得損失函數最小值,進而确定 f 的參數 w。過程大緻如下:

  1. 處理輸入,把 x, y 轉換成算法需要的格式。
  2. 找一個合适的預測函數,一般表示為 h 函數,該函數就是我們需要找的分類函數,它用來預測輸入資料的判斷結果。
  3. 構造一個Cost函數(損失函數),該函數表示預測的輸出(h)與訓練資料類别(y)之間的偏差,可以是二者之間的差(h-y)或者是其他的形式。綜合考慮所有訓練資料的 “損失”,将Cost求和或者求平均,記為J(θ)函數,表示所有訓練資料預測值與實際類别的偏差。
  4. 顯然,損失函數 J(θ) 函數的值越小表示預測函數越準确(即h函數越準确),是以這一步需要做的是找到 J(θ) 函數的最小值。注意,損失函數是關于 θ 的函數!也就是說,對于損失函數來講,θ不再是函數的參數,而是損失函數的自變量!
  5. 準備模型中繼資料,建立模型。

先概括說明:

  • 損失函數:計算的是一個樣本的誤差;
  • 代價函數:是整個訓練集上所有樣本誤差的平均,經常和損失函數混用;
  • 目标函數:代價函數 + 正則化項;

再詳細闡釋:

假設我們用 f(X) 來拟合真實值Y。這個輸出的f(X)與真實值Y可能是相同的,也可能是不同的,為了表示我們拟合的好壞,我們就用一個函數來度量拟合的程度。這個函數就稱為損失函數(loss function),或者叫代價函數(cost function)。

損失函數用來衡量算法的運作情況,估量模型的預測值與真實值的不一緻程度,是一個非負實值函數,通常使用 L(Y,f(x)) 來表示。損失函數越小,模型的魯棒性就越好。損失函數是經驗風險函數的核心部分。

目标函數是一個相關但更廣的概念,對于目标函數來說在有限制條件下的最小化就是損失函數(loss function)。

因為f(x)可能會過度學習曆史資料,導緻它在真正預測時效果會很不好,這種情況稱為過拟合(over-fitting)。這樣得到的函數會過于複雜。是以我們不僅要讓經驗風險最小化,還要讓結構風險最小化。這個時候就定義了一個函數 J(x),這個函數專門用來度量模型的複雜度,在機器學習中也叫正則化(regularization)。常用的有 L1, L2範數。

L1 正則的本質是為模型增加了“模型參數服從零均值拉普拉斯分布”這一先驗知識。

L2 正則的本質是為模型增加了“模型參數服從零均值正态分布”這一先驗知識。

L1 正則化增加了所有權重 w 參數的絕對值之和逼迫更多 w 為零,也就是變稀疏( L2 因為其導數也趨 0, 奔向零的速度不如 L1 給力了)。L1 正則化的引入就是為了完成特征自動選擇的光榮使命,它會學習地去掉無用的特征,也就是把這些特征對應的權重置為 0。

L2 正則化中增加所有權重 w 參數的平方之和,逼迫所有 w 盡可能趨向零但不為零(L2 的導數趨于零)。因為在未加入 L2 正則化發生過拟合時,拟合函數需要顧忌每一個點,最終形成的拟合函數波動很大,在某些很小的區間裡,函數值的變化很劇烈,也就是某些 w 值非常大。為此,L2 正則化的加入就懲罰了權重變大的趨勢。

到這一步我們就可以說我們最終的優化函數是:min(L(Y, f(x) + J(x)) ,即最優化經驗風險和結構風險,而這個函數就被稱為目标函數。

在回歸問題中,通過目标函數來求解最優解,常用的是平方誤差(最小二乘線性回歸)代價函數。損失函數則是平方損失函數。

均方誤差是回歸任務中最常用的性能度量,是以可以使均方誤差最小。基于均方誤差最小化來進行模型求解的方法稱為“最小二乘法”。線上性回歸中,最小二乘法就是找到一條直線,使所有樣本到直線的 "歐式距離和" 最小。于是線性回歸中損失函數就是平方損失函數。

有了這些基礎概念,下面我們就開始動手分析Alink的代碼。

首先,我們給出線性回歸的示例。

public class LinearRegressionExample {
    static Row[] vecrows = new Row[] {
            Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),
            Row.of("$3$0:1.0 1:3.0 2:3.0", "1.0 3.0 3.0", 1.0, 3.0, 3.0, 6.7),
            Row.of("$3$0:1.0 1:2.0 2:4.0", "1.0 2.0 4.0", 1.0, 2.0, 4.0, 6.9),
            Row.of("$3$0:1.0 1:3.0 2:4.0", "1.0 3.0 4.0", 1.0, 3.0, 4.0, 8.0)
    };
    static String[] veccolNames = new String[] {"svec", "vec", "f0", "f1", "f2", "label"};
    static BatchOperator vecdata = new MemSourceBatchOp(Arrays.asList(vecrows), veccolNames);
    static StreamOperator svecdata = new MemSourceStreamOp(Arrays.asList(vecrows), veccolNames);

    public static void main(String[] args) throws Exception {
        String[] xVars = new String[] {"f0", "f1", "f2"};
        String yVar = "label";
        String vec = "vec";
        String svec = "svec";
        LinearRegression linear = new LinearRegression()
                .setLabelCol(yVar)  // 這裡把變量都設定好了,後續會用到
                .setFeatureCols(xVars)
                .setPredictionCol("linpred");

        Pipeline pl = new Pipeline().add(linear);
        PipelineModel model = pl.fit(vecdata);

        BatchOperator result = model.transform(vecdata).select(
                new String[] {"label", "linpred"});

        List<Row> data = result.collect();
    }
}
           

輸出是

svec|vec|f0|f1|f2|label|linpred
----|---|--|--|--|-----|-------
$3$0:1.0 1:7.0 2:9.0|1.0 7.0 9.0|1.0000|7.0000|9.0000|16.8000|16.8148
$3$0:1.0 1:3.0 2:4.0|1.0 3.0 4.0|1.0000|3.0000|4.0000|8.0000|7.8521
$3$0:1.0 1:3.0 2:3.0|1.0 3.0 3.0|1.0000|3.0000|3.0000|6.7000|6.7739
$3$0:1.0 1:2.0 2:4.0|1.0 2.0 4.0|1.0000|2.0000|4.0000|6.9000|6.959
           

根據前文我們可以知道,在回歸問題中,通過優化目标函數來求解最優解,常用的是平方誤差(最小二乘線性回歸)代價函數。損失函數則是平方損失函數。

對應到Alink,優化函數或者優化器是拟牛頓法的L-BFGS算法,目标函數是UnaryLossObjFunc,損失函數是SquareLossFunc。線性回歸訓練總體邏輯是LinearRegTrainBatchOp。是以我們下面一一論述。

LinearRegression 訓練 用到LinearRegTrainBatchOp,而LinearRegTrainBatchOp的基類是BaseLinearModelTrainBatchOp。是以我們來看BaseLinearModelTrainBatchOp。

public class LinearRegression extends Trainer <LinearRegression, LinearRegressionModel> implements LinearRegTrainParams <LinearRegression>, LinearRegPredictParams <LinearRegression> {
   @Override
   protected BatchOperator train(BatchOperator in) {
      return new LinearRegTrainBatchOp(this.getParams()).linkFrom(in);
   }
}
           

BaseLinearModelTrainBatchOp.linkFrom 代碼如下,注釋中給出了清晰的邏輯 :

大體是:

  • 擷取算法參數,label資訊;
  • 準備,轉換資料到 Tuple3 format <weight, label, feature vector>;
  • 獲得統計資訊,比如向量大小,均值和方差;
  • 對訓練資料做标準化和插值;
  • 使用L-BFGS算法,通過對損失函數求最小值進而對模型優化;
  • 準備模型中繼資料;
  • 建立模型;
public T linkFrom(BatchOperator<?>... inputs) {
    BatchOperator<?> in = checkAndGetFirst(inputs);
    // Get parameters of this algorithm.
    Params params = getParams();
    // Get type of processing: regression or not
    boolean isRegProc = getIsRegProc(params, linearModelType, modelName);
    // Get label info : including label values and label type.
    Tuple2<DataSet<Object>, TypeInformation> labelInfo = getLabelInfo(in, params, isRegProc);
    // Transform data to Tuple3 format.//weight, label, feature vector.
    DataSet<Tuple3<Double, Double, Vector>> initData = transform(in, params, labelInfo.f0, isRegProc);
    // Get statistics variables : including vector size, mean and variance of train data.
    Tuple2<DataSet<Integer>, DataSet<DenseVector[]>>
        statInfo = getStatInfo(initData, params.get(LinearTrainParams.STANDARDIZATION));
    // Do standardization and interception to train data.
    DataSet<Tuple3<Double, Double, Vector>> trainData = preProcess(initData, params, statInfo.f1);
    // Solve the optimization problem.
    DataSet<Tuple2<DenseVector, double[]>> coefVectorSet = optimize(params, statInfo.f0,
        trainData, linearModelType, MLEnvironmentFactory.get(getMLEnvironmentId()));
    // Prepare the meta info of linear model.
    DataSet<Params> meta = labelInfo.f0
        .mapPartition(new CreateMeta(modelName, linearModelType, isRegProc, params))
        .setParallelism(1);
    // Build linear model rows, the format to be output.
    DataSet<Row> modelRows;
    String[] featureColTypes = getFeatureTypes(in, params.get(LinearTrainParams.FEATURE_COLS));
    modelRows = coefVectorSet
        .mapPartition(new BuildModelFromCoefs(labelInfo.f1,
            params.get(LinearTrainParams.FEATURE_COLS),
            params.get(LinearTrainParams.STANDARDIZATION),
            params.get(LinearTrainParams.WITH_INTERCEPT), featureColTypes))
        .withBroadcastSet(meta, META)
        .withBroadcastSet(statInfo.f1, MEAN_VAR)
        .setParallelism(1);
    // Convert the model rows to table.
    this.setOutput(modelRows, new LinearModelDataConverter(labelInfo.f1).getModelSchema());
    return (T)this;
}
           

我們後續還會對此邏輯進行細化。

我們首先介紹下相關基礎功能和相關概念,比如損失函數,目标函數,梯度等。

損失函數涉及到若幹概念。

導數也是函數,是函數的變化率與位置的關系。導數代表了在自變量變化趨于無窮小的時候,函數值的變化與自變量的變化的比值。幾何意義是這個點的切線。實體意義是該時刻的(瞬時)變化率。

導數反映的是函數y=f(x)在某一點處沿x軸正方向的變化率。直覺地看,也就是在x軸上某一點處,如果f’(x)>0,說明f(x)的函數值在x點沿x軸正方向是趨于增加的;如果f’(x)<0,說明f(x)的函數值在x點沿x軸正方向是趨于減少的。

一進制導數表征的是:一進制函數 f(x)與自變量 x 在某點附近變化的比率(變化率,斜率)。

如果是多元函數呢?則為偏導數。偏導數是多元函數“退化”成一進制函數時的導數,這裡“退化”的意思是固定其他變量的值,隻保留一個變量,依次保留每個變量,則N元函數有N個偏導數。偏導數為函數在每個位置處沿着自變量坐标軸方向上的導數(切線斜率)。二進制函數的偏導數表征的是:函數 F(x,y) 與自變量 x(或y) 在某點附近變化的比率(變化率)。

導數和偏導數的定義中,均是沿坐标軸正方向讨論函數的變化率。那麼當我們讨論函數沿任意方向的變化率時,也就引出了方向導數的定義,即:某一點在某一趨近方向上的導數值。

方向導數就是偏導數合成向量與方向向量的内積。方向導數的本質是一個數值,簡單來說其定義為:一個函數沿指定方向的變化率。

在一進制函數求解的問題中,我們可以很愉快的使用牛頓法求駐點。但在機器學習的優化問題中,我們要優化的都是多元函數,x往往不是一個實數,而是一個向量,是以将牛頓求根法利用到機器學習中時,x 是一個向量, y 也是一個向量,對 x 求導以後得到的是一個矩陣,就是Hessian矩陣。

在數學中,海森矩陣(Hessian matrix 或 Hessian)是一個自變量為向量的實值函數的二階偏導數組成的方塊矩陣。多元函數的二階導數就是一個海森矩陣。

前面提到,線性回歸中損失函數就是平方損失函數。我們來看看實作。後續實作将調用此類的 loss 和 derivative,具體遇到時候再講。

UnaryLossFunc是接口,代表一進制損失函數。它定義的每個函數都有兩個輸入 (eta and y),Alink把這兩個輸入的差作為損失函數的一進制變量。基本API是求損失,求導數,求二階導數。

public interface UnaryLossFunc extends Serializable {
	// Loss function.
	double loss(double eta, double y);
	// The derivative of loss function.
	double derivative(double eta, double y);
	// The second derivative of the loss function.
	double secondDerivative(double eta, double y);
}
           

平方損失函數具體實作如下:

public class SquareLossFunc implements UnaryLossFunc {

   @Override
   public double loss(double eta, double y) {
      return 0.5 * (eta - y) * (eta - y);
   }

   @Override
   public double derivative(double eta, double y) {
      return eta - y;
   }

   @Override
   public double secondDerivative(double eta, double y) {
      return 1;
   }
}
           

這裡涉及的概念是梯度,梯度下降法。

對于模型優化,我們要選擇最優的 θ,使得 f(x) 最接近真實值。這個問題就轉化為求解最優的 θ,使損失函數 J(θ) 取最小值。那麼如何解決這個轉化後的問題呢?這又牽扯到一個概念:梯度下降(Radient Descent)。

是以我們首先要溫習下梯度。

  • 向量的定義是有方向(direction)有大小(magnitude)的量。
  • 梯度其實是一個向量,即有方向有大小;其定義為:一個多元函數對于其自變量分别求偏導數,這些偏導數所組成的向量就是函數的梯度。
  • 梯度即函數在某一點最大的方向導數,函數沿梯度方向函數有最大的變化率。
  • 梯度的第一層含義就是“方向導數的最大值”。
  • 目前位置的梯度方向,為函數在該位置處方向導數最大的方向,也是函數值上升最快的方向,反方向為下降最快的方向;
  • 梯度的幾何含義就是:沿向量所在直線的方向變化率最大。

梯度下降法是一個一階最優化算法,它的核心思想是:要想最快找到一個函數的局部極小值,必須沿函數目前點對應“梯度”(或者近似梯度)的反方向(下降)進行規定步長“疊代”搜尋。沿梯度(斜率)的反方向移動,這就是“梯度下降法”。

既然在變量空間的某一點處,函數沿梯度方向具有最大的變化率,那麼在優化目标函數的時候,自然是沿着負梯度方向去減小函數值,以此達到我們的優化目标。

梯度下降中的下降,意思是讓函數的未知數随着梯度的方向運動。什麼是梯度的方向呢?把這一點帶入到梯度函數中,結果為正,那我們就把這一點的值變小一些,同時就是讓梯度變小些;當這一點帶入梯度函數中的結果為負的時候,就給這一點的值增大一些。

如何沿着負梯度方向減小函數值呢?既然梯度是偏導數的集合,同時梯度和偏導數都是向量,那麼參考向量運算法則,我們在每個變量軸上減小對應變量值即可。

梯度下降就是讓梯度中所有偏導函數都下降到最低點的過程.(劃重點:下降)。都下降到最低點了,那每個未知數(或者叫次元)的最優解就得到了,是以他是解決函數最優化問題的算法。

“最小二乘法”和“梯度下降法”,前者用于“搜尋最小誤差”,後者用于“用最快的速度搜尋”,二者常常配合使用。對最小二乘法的參數調優就轉變為了求這個二進制函數的極值問題,也就是說可以應用“梯度下降法”了。

在最小二乘函數中,已擁有的條件是一些樣本點和樣本點的結果,就是矩陣X和每一條X樣本的lable值y。X是矩陣,y是向量。是以我們要知道,梯度下降中求偏導數的未知數不是x和y,而是x的參數w。

目标函數的基類是OptimObjFunc,其提供API 比如計算梯度,損失,hessian矩陣,以及依據采樣點更新梯度和hessian矩陣。 其幾個派生類如下,從注釋中可以看到使用範圍。

我們可以看到正則化(regularization) L1, L2範數,這是相比損失函數增加的子產品。

public abstract class OptimObjFunc implements Serializable {
    protected final double l1;
    protected final double l2; // 正則化(regularization) L1, L2範數。
    protected Params params;   
  .....
}

// Unary loss object function.
public class UnaryLossObjFunc extends OptimObjFunc

// The OptimObjFunc for multilayer perceptron.
public class AnnObjFunc extends OptimObjFunc
  
// Accelerated failure time Regression object function.
public class AftRegObjFunc extends OptimObjFunc

// Softmax object function.
public class SoftmaxObjFunc extends OptimObjFunc 
           

對于線性模型,BaseLinearModelTrainBatchOp 中會根據模型類型來生成目标函數,可以看到在生成目标函數同時,也相應設定了不同的損失函數,其中 SquareLossFunc 就是我們之前提到的。

public static OptimObjFunc getObjFunction(LinearModelType modelType, Params params) {
    OptimObjFunc objFunc;
    // For different model type, we must set corresponding loss object function.
    switch (modelType) {
        case LinearReg:
            // 我們這裡!
            objFunc = new UnaryLossObjFunc(new SquareLossFunc(), params);
            break;
        case SVR:
            double svrTau = params.get(LinearSvrTrainParams.TAU);
            objFunc = new UnaryLossObjFunc(new SvrLossFunc(svrTau), params);
            break;
        case LR:
            objFunc = new UnaryLossObjFunc(new LogLossFunc(), params);
            break;
        case SVM:
            objFunc = new UnaryLossObjFunc(new SmoothHingeLossFunc(), params);
            break;
        case Perceptron:
            objFunc = new UnaryLossObjFunc(new PerceptronLossFunc(), params);
            break;
        case AFT:
            objFunc = new AftRegObjFunc(params);
            break;
        default:
            throw new RuntimeException("Not implemented yet!");
    }
    return objFunc;
}
           

一進制目标函數就是我們線性回歸用到的目标函數,其隻有一個新增變量 :unaryLossFunc。就是一進制損失函數。

/**
 * Unary loss object function.
 */
public class UnaryLossObjFunc extends OptimObjFunc {
    private UnaryLossFunc unaryLossFunc;
}
           

一進制目标函數提供了很多功能,我們這裡用到主要是:

  • calcGradient :根據一組采樣點計算梯度,這是從基類OptimObjFunc內建的。
  • updateGradient :根據一個采樣點更新梯度;
  • calcSearchValues :為線性搜尋計算損失;

對于本文,這裡更新的是損失函數的梯度。

再次啰嗦下,損失函數用來度量拟合的程度,進而評估模型拟合的好壞,記為 J(θ)。注意,損失函數是關于 θ 的函數!也就是說,對于損失函數來講,θ不再是函數的參數,而是損失函數的自變量!

當我們計算損失時,是将每個樣本中的特征 xi 和對應的目标變量真實值 yi 帶入損失函數,此時,損失函數中就隻剩下 θ 是未知的。

損失函數的梯度即對 θi 求偏導,由于損失函數是關于 θ 的函數,是以,θ 的取值不同,得出來的的梯度向量也是不同的。借用“下山”的比喻來解釋,θ 的不同取值,相當于處于山上的不同位置,每一個位置都會計算出一個梯度向量▽J(θ)。

這裡的 l1, l2 就是之前提到的正則化(regularization) L1, L2範數。

/**
 * Calculate gradient by a set of samples.
 *
 * @param labelVectors train data.
 * @param coefVector   coefficient of current time.
 * @param grad         gradient.
 * @return weight sum
 */
public double calcGradient(Iterable<Tuple3<Double, Double, Vector>> labelVectors,
                           DenseVector coefVector, DenseVector grad) {
    double weightSum = 0.0;
    for (int i = 0; i < grad.size(); i++) {
        grad.set(i, 0.0);
    }
  
// 對輸入的樣本集合labelVectors逐個計算梯度  
    for (Tuple3<Double, Double, Vector> labelVector : labelVectors) {
        if (labelVector.f2 instanceof SparseVector) {
           ((SparseVector)(labelVector.f2)).setSize(coefVector.size());
        }
      
// 以這個樣本為例 
labelVector = {Tuple3@9895} "(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
 f0 = {Double@9903} 1.0
 f1 = {Double@9904} 16.8
 f2 = {DenseVector@9905} "1.0 1.0 1.4657097546055162 1.4770978917519928"
  
        weightSum += labelVector.f0; // labelVector.f0是權重
        updateGradient(labelVector, coefVector, grad);
    }
    if (weightSum > 0.0) {
        grad.scaleEqual(1.0 / weightSum);
    }
// l2正則化    
    if (0.0 != this.l2) {
        grad.plusScaleEqual(coefVector, this.l2 * 2);
    }
// l1正則化   
    if (0.0 != this.l1) {
        double[] coefArray = coefVector.getData();
        for (int i = 0; i < coefVector.size(); i++) {
            grad.add(i, Math.signum(coefArray[i]) * this.l1);
        }
    }
    return weightSum;
}
           

這裡 labelVector.f0是權重,labelVector.f1是 y,labelVector.f2是 x-vec 四維向量,coefVector是w系數向量。

  • getEta是點積,即 x向量 與 目前w系數的點積,就是目前計算的 y。
  • labelVector.f0 * unaryLossFunc.derivative(eta, labelVector.f1); 就是調用SquareLossFunc.derivative 函數來計算一階導數。
  • updateGrad.plusScaleEqual(labelVector.f2, div); 就是在原有梯度基礎上更新梯度
public class UnaryLossObjFunc extends OptimObjFunc {
    /**
     * Update gradient by one sample.
     *
     * @param labelVector a sample of train data.
     * @param coefVector  coefficient of current time.
     * @param updateGrad  gradient need to update.
     */
    @Override
    protected void updateGradient(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector, DenseVector updateGrad) {
        // 點積,就是目前計算出來的y
        double eta = getEta(labelVector, coefVector); 
        // 一階導數。labelVector.f0是權重
        double div = labelVector.f0 * unaryLossFunc.derivative(eta, labelVector.f1); 
        // 點乘之後還需要相加。labelVector.f2 就是x—vec,比如 1.0 1.0 1.4657097546055162 1.4770978917519928
        updateGrad.plusScaleEqual(labelVector.f2, div);
    }
  
    private double getEta(Tuple3<Double, Double, Vector> labelVector, DenseVector coefVector) {
        // 點積,表示第 i 次疊代中節點上的第 k 個特征向量與特征權重分量的點乘。coefVector中第 c 項表示為第 i 次疊代中特征權重向量在第 c 列節點上的分量
        return MatVecOp.dot(labelVector.f2, coefVector);
    }
}

/**
* Plus with another vector scaled by "alpha".
*/
public void plusScaleEqual(Vector other, double alpha) {
	if (other instanceof DenseVector) {
		BLAS.axpy(alpha, (DenseVector) other, this);
	} else {
		BLAS.axpy(alpha, (SparseVector) other, this);
	}
}
           

Alink中提供給了一系列并行優化函數,比如GD, SGD, LBFGS, OWLQN, NEWTON method。

其基類是Optimizer。

public abstract class Optimizer {
    protected final DataSet<?> objFuncSet; // 具體目标函數,計算梯度和損失
    protected final DataSet<Tuple3<Double, Double, Vector>> trainData; //訓練資料
    protected final Params params; //參數
    protected DataSet<Integer> coefDim; //dimension of features.
    protected DataSet<DenseVector> coefVec = null; //最終系數w
    .......
}
           

線性回歸主要用到了LBFGS算法。

public class Lbfgs extends Optimizer 
           

具體調用如下

public static DataSet<Tuple2<DenseVector, double[]>> optimize(.....) {
    // Loss object function
    DataSet<OptimObjFunc> objFunc = session.getExecutionEnvironment()
        .fromElements(getObjFunction(modelType, params));

    if (params.contains(LinearTrainParams.OPTIM_METHOD)) {
        LinearTrainParams.OptimMethod method = params.get(LinearTrainParams.OPTIM_METHOD);
        return OptimizerFactory.create(objFunc, trainData, coefficientDim, params, method).optimize();
    } else if (params.get(HasL1.L_1) > 0) {
        return new Owlqn(objFunc, trainData, coefficientDim, params).optimize();
    } else {
        // 我們的程式将運作到這裡
        return new Lbfgs(objFunc, trainData, coefficientDim, params).optimize();
    }
}
           

機器學習基本優化套路是:

準備資料 ----> 優化函數 ----> 目标函數 ----> 損失函數
           

對應我們這裡是

BaseLinearModelTrainBatchOp.linkFrom(整體邏輯) -----> Lbfgs(繼承Optimizer) ----> UnaryLossObjFunc(繼承OptimObjFunc) ----> SquareLossFunc(繼承UnaryLossFunc)
           

看完完底層功能,我們再次回到線性回歸總體流程。

總結 BaseLinearModelTrainBatchOp.linkFrom 的基本流程如下:(發現某些媒體對于清單排版支援不好,是以加上序号)。

首先再給出輸入一個例子:

Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),

這裡後面 4 項對應列名是

"f0", "f1", "f2", "label"

  • 1)擷取到label的資訊,包括label數值和種類。 labelInfo = getLabelInfo() 這裡有一個 distinct 操作,是以會去重。最後得到label的可能取值範圍 :0,1,類型是 Double。
  • 2)用transform函數把輸入轉換成三元組Tuple3<weight, label, feature vector>。具體說,會把輸入中的三個特征"f0", "f1", "f2" 轉換為一個向量 vec, 我們以後稱之為x-vec。重點就在于特征變成了一個向量。是以這個三元組可以認為是 <權重, y-value, x-vec>。
  • 3)用statInfo = getStatInfo() 擷取統計變量,包括vector size, mean和variance。這裡流程比較複雜。
    • 3.1)用trainData.map{return value.f2;}來擷取訓練資料中的 x-vec。
    • 3.2)調用StatisticsHelper.summary來對 x-vec 做處理
      • 3.2.1)調用 summarizer
        • 3.2.1.1)調用 mapPartition(new VectorSummarizerPartition(bCov))
          • 3.2.1.1.1)調用VectorSummarizerPartition.mapPartition,其周遊清單,清單中的每一個變量 sv 是 x-vec。srt = srt.visit(sv),會根據每一個新輸入重新計算count,sum,squareSum,normL1..,這樣就得到了本partiton中輸入每列的這些統計數值。
        • 3.2.1.2)調用 reduce(VectorSummarizerUtil.merge(value1, value2)) 來歸并每一個partition的結果。
      • 3.2.2)調用map(BaseVectorSummarizer summarizer),其實調用到DenseVectorSummarizer,就是生成一個DenseVectorSummary向量,裡面是count,sum,squareSum,normL1,min,max,numNonZero。
    • 3.3)調用 coefficientDim = summary.map
    • 3.4)調用 meanVar = coefficientDim.map,最後得到 Tuple2.of(coefficientDim, meanVar)
  • 4)preProcess(initData, params, statInfo.f1) 用3) 計算的結果 對輸入資料做标準化和插值 standardization and interception。上面得到的 meanVar 将會作為參數傳入。這裡是對 x-vec 做标準化。比如原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0",進行标準化之後,x-vec 變成了 4 項 :{ 第1項是固定值 "1.0 ", 是以4 項 是 "1.0 1.0 1.4657097546055162 1.4770978917519928" },是以轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"。即weight 是1.0,y-value是16.8,後續4個是x-vec。
  • 以上完成了對資料的處理。
  • 5)調用 optimize(params, statInfo.f0, trainData, linearModelType) 通過對損失函數求最小值進而對模型優化。(使用L-BFGS算法,會單獨拿出來講解)
  • 6)調用 mapPartition(new CreateMeta()) 來準備模型中繼資料。
  • 7)調用 mapPartition(new BuildModelFromCoefs) 來建立模型。

可以看到,資料準備占據了很大部分,下面我們看看資料準備的幾個步驟。

此處代碼對應上面基本流程的 1)

因為之前有一個distinct操作,是以會去重。最後得到label的可能取值範圍 :0,1,類型是 Double。

private Tuple2<DataSet<Object>, TypeInformation> getLabelInfo(BatchOperator in,
                                                                  Params params,
                                                                  boolean isRegProc) {
        String labelName = params.get(LinearTrainParams.LABEL_COL);
        // Prepare label values
        DataSet<Object> labelValues;
        TypeInformation<?> labelType = null;
        if (isRegProc) {
            // 因為是回歸,是以是這裡
            labelType = Types.DOUBLE;
            labelValues = MLEnvironmentFactory.get(in.getMLEnvironmentId())
                .getExecutionEnvironment().fromElements(new Object());
        } else {
          .....
        }
        return Tuple2.of(labelValues, labelType);
}
           

此處代碼對應上面基本流程的 2) 。

用transform函數把輸入轉換成三元組Tuple3<weight, label, feature vector>。具體說,會把輸入中的三個特征"f0", "f1", "f2" 轉換為一個向量 vec, 我們以後稱之為x-vec。重點就在于特征變成了一個向量。是以這個三元組可以認為是 <權重, y-value, x-vec>。

private DataSet<Tuple3<Double, Double, Vector>> transform(BatchOperator in,
                                                          Params params,
                                                          DataSet<Object> labelValues,
                                                          boolean isRegProc) {
    ......
    // 擷取Schema
    TableSchema dataSchema = in.getSchema();
    // 擷取各種index 
    int labelIdx = TableUtil.findColIndexWithAssertAndHint(dataSchema.getFieldNames(), labelName);
    ......
    int weightIdx = weightColName != null ? TableUtil.findColIndexWithAssertAndHint(in.getColNames(), weightColName) : -1;
    int vecIdx = vectorColName != null ? TableUtil.findColIndexWithAssertAndHint(in.getColNames(), vectorColName) : -1;
    // 用transform函數把輸入轉換成三元組Tuple3<weight, label, feature vector>
    return in.getDataSet().map(new Transform(isRegProc, weightIdx, vecIdx, featureIndices, labelIdx)).withBroadcastSet(labelValues, LABEL_VALUES);
}
           

這裡對應的變量列印出來為

params = {Params@2745} "Params {featureCols=["f0","f1","f2"], labelCol="label", predictionCol="linpred"}"
labelValues = {DataSource@2845} 
isRegProc = true
featureColNames = {String[3]@2864} 
 0 = "f0"
 1 = "f1"
 2 = "f2"
labelName = "label"
weightColName = null
vectorColName = null
dataSchema = {TableSchema@2866} "root\n |-- svec: STRING\n |-- vec: STRING\n |-- f0: DOUBLE\n |-- f1: DOUBLE\n |-- f2: DOUBLE\n |-- label: DOUBLE\n"
featureIndices = {int[3]@2878} 
 0 = 2
 1 = 3
 2 = 4
labelIdx = 5
weightIdx = -1
vecIdx = -1
           

具體在runtime時候,會進入到Transform.map函數。我們可以看到,會把輸入中的三個特征"f0", "f1", "f2",轉換為一個向量 vec, 我們以後稱之為x-vec。

private static class Transform extends RichMapFunction<Row, Tuple3<Double, Double, Vector>> {
    @Override
    public Tuple3<Double, Double, Vector> map(Row row) throws Exception {
        // 擷取權重
        Double weight = weightIdx != -1 ? ((Number)row.getField(weightIdx)).doubleValue() : 1.0;
        // 擷取label
        Double val = FeatureLabelUtil.getLabelValue(row, this.isRegProc,
            labelIdx, this.positiveLableValueString);
        if (featureIndices != null) {
            // 擷取x-vec
            DenseVector vec = new DenseVector(featureIndices.length);
            for (int i = 0; i < featureIndices.length; ++i) {
                vec.set(i, ((Number)row.getField(featureIndices[i])).doubleValue());
            }
            // 建構三元組
            return Tuple3.of(weight, val, vec);
        } else {
            Vector vec = VectorUtil.getVector(row.getField(vecIdx));
            return Tuple3.of(weight, val, vec);
        }
    }
}
           

如果對應原始輸入

Row.of("$3$0:1.0 1:7.0 2:9.0", "1.0 7.0 9.0", 1.0, 7.0, 9.0, 16.8),

,則程式中各種變量為:

row = {Row@9723} "$3$0:1.0 1:7.0 2:9.0,1.0 7.0 9.0,1.0,7.0,9.0,16.8"
weight = {Double@9724} 1.0
val = {Double@9725} 16.8
vec = {DenseVector@9729} "1.0 7.0 9.0"
vecIdx = -1
featureIndices = {int[3]@9726} 
 0 = 2
 1 = 3
 2 = 4
           

用getStatInfo() 對輸入資料做标準化和插值 standardization and interception。

此處代碼對應上面基本流程的 3)

  1. 用statInfo = getStatInfo() 擷取統計變量,包括vector size, mean和variance。這裡流程比較複雜。
private Tuple2<DataSet<Integer>, DataSet<DenseVector[]>> getStatInfo(
    DataSet<Tuple3<Double, Double, Vector>> trainData, final boolean standardization) {
    if (standardization) {
        DataSet<BaseVectorSummary> summary = StatisticsHelper.summary(trainData.map(
            new MapFunction<Tuple3<Double, Double, Vector>, Vector>() {
                @Override
                public Vector map(Tuple3<Double, Double, Vector> value) throws Exception {
                    return value.f2; //擷取訓練資料中的 x-vec
                }
            }).withForwardedFields());

        DataSet<Integer> coefficientDim = summary.map(new MapFunction<BaseVectorSummary, Integer>() {
            public Integer map(BaseVectorSummary value) throws Exception {
                return value.vectorSize(); // 擷取dimension
            }
        });

        DataSet<DenseVector[]> meanVar = summary.map(new MapFunction<BaseVectorSummary, DenseVector[]>() {
            public DenseVector[] map(BaseVectorSummary value) {
                if (value instanceof SparseVectorSummary) {
                    // 計算min, max
                    DenseVector max = ((SparseVector)value.max()).toDenseVector();
                    DenseVector min = ((SparseVector)value.min()).toDenseVector();
                    for (int i = 0; i < max.size(); ++i) {
                        max.set(i, Math.max(Math.abs(max.get(i)), Math.abs(min.get(i))));
                        min.set(i, 0.0);
                    }
                    return new DenseVector[] {min, max};
                } else {
                    // 計算standardDeviation
                    return new DenseVector[] {(DenseVector)value.mean(),
                        (DenseVector)value.standardDeviation()};
                }
            }
        });
        return Tuple2.of(coefficientDim, meanVar);
    } 
}
           

這裡對應基本流程的 4) 。

對輸入資料做标準化和插值 standardization and interception。上面得到的 meanVar 作為參數傳入。這裡是對 x-vec 做标準化。

比如原始輸入Row是

"(1.0,16.8,1.0 7.0 9.0)"

,其中 x-vec 是

"1.0 7.0 9.0"

,進行标準化之後,x-vec 變成了 4 項,第一項是固定值 "1.0 ", 4 項 是

"1.0 1.0 1.4657097546055162 1.4770978917519928"

,是以轉換後的Row是

"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"

為什麼第一項是固定值 "1.0 " ?因為按照線性模型

f(x)=w^Tx+b

,我們應該得出一個常數 b,這裡設定 "1.0 ",就是 b 的初始值。

private DataSet<Tuple3<Double, Double, Vector>> preProcess(
    return initData.map(
        new RichMapFunction<Tuple3<Double, Double, Vector>, Tuple3<Double, Double, Vector>>() {
            private DenseVector[] meanVar;

            @Override
            public Tuple3<Double, Double, Vector> map(Tuple3<Double, Double, Vector> value){
// value = {Tuple3@9791} "(1.0,16.8,1.0 7.0 9.0)"
                Vector aVector = value.f2;
// aVector = {DenseVector@9792} "1.0 7.0 9.0"
                if (aVector instanceof DenseVector) {
                    DenseVector bVector;
                    if (standardization) {
                        if (hasInterceptItem) {
                            bVector = new DenseVector(aVector.size() + 1);
                            bVector.set(0, 1.0); // 設定了固定值
                            for (int i = 0; i < aVector.size(); ++i) {
                                // 對輸入資料做标準化和插值
                                bVector.set(i + 1, (aVector.get(i) - meanVar[0].get(i)) / meanVar[1].get(i));
                            }
                        } 
                    } 
// bVector = {DenseVector@9814} "1.0 1.0 1.4657097546055162 1.4770978917519928"
                    return Tuple3.of(value.f0, value.f1, bVector);
                } 
            }
        }).withBroadcastSet(meanVar, MEAN_VAR);
}

// 這裡是對 x-vec 做标準化。比如原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0",進行标準化之後,x-vec 變成了 4 項,第一項是 "1.0 ",是 "1.0 1.0 1.4657097546055162 1.4770978917519928",是以轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"
           

至此,輸入處理完畢。

比如原始輸入Row是"(1.0,16.8,1.0 7.0 9.0)",其中 x-vec 是"1.0 7.0 9.0"。

進行标準化之後,x-vec 變成了 4 項 :{ 第1項是固定值 "1.0 ", 是以4 項 是 "1.0 1.0 1.4657097546055162 1.4770978917519928" },

轉換後的Row是"(1.0,16.8,1.0 1.0 1.4657097546055162 1.4770978917519928)"。即weight 是1.0,y-value是16.8,後續4個是x-vec。

下面我們可以開始進行優化模型了,敬請期待下文。

終于了解了方向導數與梯度

導數,方向導數,梯度(Gradient)與梯度下降法(Gradient Descent)的介紹(非原創)

梯度向量與梯度下降法

直覺了解梯度,以及偏導數、方向導數和法向量等

梯度(Gradient)與梯度下降法(Gradient Descent)

梯度與梯度下降法

梯度下降算法過程詳細解讀

https://www.zhihu.com/question/25627482/answer/321719657)

Hessian矩陣以及在圖像中的應用

https://blog.csdn.net/weixin_39445556/article/details/84502260)

《分布式機器學習算法、理論與實踐_劉鐵岩》

https://zhuanlan.zhihu.com/p/29672873)

https://www.zhihu.com/question/36425542

https://zhuanlan.zhihu.com/p/32821110)

https://blog.csdn.net/hei653779919/article/details/106409818)

CRF L-BFGS Line Search原理及代碼分析

步長與學習率

https://blog.csdn.net/IMWTJ123/article/details/88709023)

線性回歸、梯度下降(Linear Regression、Gradient Descent)

機器學習系列(三)——目标函數和損失函數