天天看點

Flink ML 機器學習算法介紹(二)-特征工程

作者:程式你好
Flink ML 機器學習算法介紹(二)-特征工程

Feature Engineering 特征工程

資料與特征工程決定了模型的上限,改進算法隻不過是逼近這個上限而已。

特征工程是機器學習中非常重要的一部分,它是将原始資料轉換為模型可了解的形式。一般來說,特征工程包括特征建構、特征提取和特征選擇三個部分1其中,特征建構是指從原始資料中建構新的特征,例如,将時間戳轉換為小時、星期幾等等。特征提取是指從原始資料中提取有用的特征,例如,從文本中提取關鍵詞。而特征選擇則是指從所有的特征中選擇最有用的一些特征,以便于訓練模型。

特征工程是機器學習中最重要的步驟之一,需要耗費大量時間。目的是準備一個最适合機器學習算法的輸入資料集,并增強機器學習模型的性能。

Flink ML 2.2版本中大幅增加了這方面的算法支援,總共有33個特征工程方面的算法,涵蓋了各個方面的内容。

1、Binarizer

二值化
Flink ML 機器學習算法介紹(二)-特征工程

顧名思義,Binarizer在機器學習中是一種将連續特征值離散化的方法,通過設定門檻值将資料二值化(将特征值設定為0或1)。

Binarizer算法有許多應用,其中一些應用包括将連續的特征值二值化,如在圖像進行中将連續的像素值資料離散化為黑白像素值資料。 另一個應用是将多類标簽轉換為二進制标簽的過程,通過将每個類别轉換為一個二進制向量,其中隻有一個元素為1,其餘元素均為0.

Binarizer可以處理DenseVector、SparseVector或數值類型的連續特征。

輸入

參數名稱 類型 預設值 描述
inputCols Number/Vector null 需要進行二值化的Number/Vectors

輸出

參數名稱 類型 預設值 描述
outputCols Number/Vector null 二值化的Number/Vectors

參數

名稱 預設值 類型 是否必填 描述
inputCols null String[] yes 輸入列名稱
outputCols null String[] yes 輸出列名稱
thresholds null Double[] yes 用于将連續特征二值化的門檻值

示例代碼:

import org.apache.flink.ml.feature.binarizer.Binarizer;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;


/** Simple program that creates a Binarizer instance and uses it for feature engineering. */
public class BinarizerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        DataStream<Row> inputStream =
                env.fromElements(
                        Row.of(
                                1,
                                Vectors.dense(1, 2),
                                Vectors.sparse(
                                        17, new int[] {0, 3, 9}, new double[] {1.0, 2.0, 7.0})),
                        Row.of(
                                2,
                                Vectors.dense(2, 1),
                                Vectors.sparse(
                                        17, new int[] {0, 2, 14}, new double[] {5.0, 4.0, 1.0})),
                        Row.of(
                                3,
                                Vectors.dense(5, 18),
                                Vectors.sparse(
                                        17, new int[] {0, 11, 12}, new double[] {2.0, 4.0, 4.0})));


        Table inputTable = tEnv.fromDataStream(inputStream).as("f0", "f1", "f2");


        // Creates a Binarizer object and initializes its parameters.
        Binarizer binarizer =
                new Binarizer()
                        .setInputCols("f0", "f1", "f2")
                        .setOutputCols("of0", "of1", "of2")
                        .setThresholds(0.0, 0.0, 0.0);


        // Transforms input data.
        Table outputTable = binarizer.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();


            Object[] inputValues = new Object[binarizer.getInputCols().length];
            Object[] outputValues = new Object[binarizer.getInputCols().length];
            for (int i = 0; i < inputValues.length; i++) {
                inputValues[i] = row.getField(binarizer.getInputCols()[i]);
                outputValues[i] = row.getField(binarizer.getOutputCols()[i]);
            }


            System.out.printf(
                    "Input Values: %s\tOutput Values: %s\n",
                    Arrays.toString(inputValues), Arrays.toString(outputValues));
        }
    }
}           

2、Bucketizer

Flink ML 機器學習算法介紹(二)-特征工程

Bucketizer中文意思是“資料分桶”,是來源于 Spark MLlib 中的算法,用于将連續的數值型特征轉換為離散型特征。例如,将人分為年齡段分為老人、中年人、青年人、兒童,将溫度分為高、中、低等幾個檔次等等。

輸入

參數名稱 類型 預設值 描述
inputCols Number null 要進行資料分桶的連續特征值

輸出

參數名稱 類型 預設值 描述
outputCols Double null 離散化特征

參數

名稱 預設值 類型 是否必填 描述
inputCols null String[] yes 輸入列名稱
outputCols null String[] yes 輸出列名稱
handleInvalid "error" Double no 處理無效條目的政策。支援的值:‘error’,‘skip’,‘keep’。
splitsArray null Double[] yes 将連續特征映射到資料桶中的分割點數組

示例代碼:

import org.apache.flink.ml.common.param.HasHandleInvalid;
import org.apache.flink.ml.feature.bucketizer.Bucketizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;


/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
public class BucketizerExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        DataStream<Row> inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
        Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");


        // Creates a Bucketizer object and initializes its parameters.
        Double[][] splitsArray =
                new Double[][] {
                    new Double[] {-0.5, 0.0, 0.5},
                    new Double[] {-1.0, 0.0, 2.0},
                    new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
                    new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
                };
        Bucketizer bucketizer =
                new Bucketizer()
                        .setInputCols("f1", "f2", "f3", "f4")
                        .setOutputCols("o1", "o2", "o3", "o4")
                        .setSplitsArray(splitsArray)
                        .setHandleInvalid(HasHandleInvalid.SKIP_INVALID);


        // Uses the Bucketizer object for feature transformations.
        Table outputTable = bucketizer.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();


            double[] inputValues = new double[bucketizer.getInputCols().length];
            double[] outputValues = new double[bucketizer.getInputCols().length];
            for (int i = 0; i < inputValues.length; i++) {
                inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
                outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
            }


            System.out.printf(
                    "Input Values: %s\tOutput Values: %s\n",
                    Arrays.toString(inputValues), Arrays.toString(outputValues));
        }
    }
}

           

3、CountVectorizer(計數向量化器)

CountVectorizer是屬于常見的特征數值計算類,是一個文本特征提取方法。對于每一個訓練文本,它隻考慮每種詞彙在該訓練文本中出現的頻率。

CountVectorizer可以将文本轉換為向量,以便進行機器學習和自然語言處理。它可以用于提取文本中的特征,例如單詞計數,以便進行分類和聚類。

輸入

參數名稱 類型 預設值 描述
inputCols String[] "input" 字元串數組

輸出

參數名稱 類型 預設值 描述
outputCols SparseVector "output" 标記計數的向量

參數

名稱 預設值 類型 是否必填 描述
inputCols "input" String no 輸入列名稱
outputCols "output" String no 輸出列名稱
minTF 1.0 Double no 過濾以忽略文檔中的罕見單詞。對于每個文檔,忽略出現次數/計數小于給定門檻值的術語。如果這是一個大于等于1的整數,則指定了計數(術語必須在文檔中出現的次數);如果這是[0,1)中的雙倍,則指定了分數(文檔的标記計數)
binary false Boolean no 二進制切換以控制輸出向量值。如果為True,則将所有非零計數(應用minTF過濾器後)設定為1.0。
vocabularySize 2^18 Interger no 詞彙表的最大值大小。CountVectorizer将建構一個詞彙表,該詞彙表僅考慮按語料庫中的術語頻率排序的前vocabulary size個術語。
minDF 1.0 Double no 指定術語必須出現在不同文檔中的最小數量,才能包含在詞彙表中。如果這是大于等于1的整數,則指定術語必須出現在的文檔數;如果這是[0,1)中的雙倍,則指定文檔的分數。
maxDF 2^63 - 1 Double no 指定術語必須出現在不同文檔中的最大數量,才能包含在詞彙表中。出現超過門檻值的術語将被忽略。如果這是大于等于1的整數,則指定術語可能出現在的文檔數的最大值;如果這是[0,1)中的雙倍,則指定術語可能出現在的文檔分數的最大值。

示例代碼:

import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
import org.apache.flink.ml.linalg.SparseVector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;


/**
 * Simple program that trains a {@link CountVectorizer} model and uses it for feature engineering.
 */
public class CountVectorizerExample {


    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input training and prediction data.
        DataStream<Row> dataStream =
                env.fromElements(
                        Row.of((Object) new String[] {"a", "c", "b", "c"}),
                        Row.of((Object) new String[] {"c", "d", "e"}),
                        Row.of((Object) new String[] {"a", "b", "c"}),
                        Row.of((Object) new String[] {"e", "f"}),
                        Row.of((Object) new String[] {"a", "c", "a"}));
        Table inputTable = tEnv.fromDataStream(dataStream).as("input");


        // Creates an CountVectorizer object and initialize its parameters
        CountVectorizer countVectorizer = new CountVectorizer();


        // Trains the CountVectorizer model
        CountVectorizerModel model = countVectorizer.fit(inputTable);


        // Uses the CountVectorizer model for predictions.
        Table outputTable = model.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            String[] inputValue = (String[]) row.getField(countVectorizer.getInputCol());
            SparseVector outputValue = (SparseVector) row.getField(countVectorizer.getOutputCol());
            System.out.printf(
                    "Input Value: %-15s \tOutput Value: %s\n",
                    Arrays.toString(inputValue), outputValue.toString());
        }
    }
}           

4、DCT

DCT算法是一種特征工程算法,它可以将1D實向量的離散餘弦變換應用于特征提取。在圖像進行中,DCT算法可以用于壓縮圖像,而在音頻進行中,DCT算法可以用于壓縮音頻。在機器學習中,DCT算法可以用于特征提取和資料壓縮。

DCT算法的應用場景包括信号處理、資料壓縮、圖像處理和音頻處理,在信号進行中,DCT算法可以用于降噪和濾波。在資料壓縮中,可以用于壓縮圖像和音頻。在圖像進行中,可以用于圖像壓縮、圖像增強和圖像分析。 在音頻進行中,可以用于音頻壓縮、音頻增強和音頻分析。

輸入

參數名稱 類型 預設值 描述
inputCols Vector "input" Input vector to be cosine transformed.

輸出

參數名稱 類型 預設值 描述
outputCols Vector "output" Cosine transformed output vector.

參數

名稱 預設值 類型 是否必填 描述
inputCols null String[] no 輸入列名稱
outputCols null String[] no 輸出列名稱
inverse false Boolean no 這是一個關于DCT的參數,它訓示是否執行反向DCT(true)或正向DCT(false)

示例代碼:

import org.apache.flink.ml.feature.dct.DCT;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


import java.util.Arrays;
import java.util.List;


/** Simple program that creates a DCT instance and uses it for feature engineering. */
public class DCTExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        List<Vector> inputData =
                Arrays.asList(
                        Vectors.dense(1.0, 1.0, 1.0, 1.0), Vectors.dense(1.0, 0.0, -1.0, 0.0));
        Table inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("input");


        // Creates a DCT object and initializes its parameters.
        DCT dct = new DCT();


        // Uses the DCT object for feature transformations.
        Table outputTable = dct.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();


            Vector inputValue = row.getFieldAs(dct.getInputCol());
            Vector outputValue = row.getFieldAs(dct.getOutputCol());


            System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
        }
    }
}           

5、ElementwiseProduct

ElementwiseProduct是來源于Spark MLlib中的算法,它執行每個輸入向量與提供的“權重”向量的Hadamard積(Hadamard product是矩陣的一種運算,它是将兩個矩陣中對應位置的元素相乘得到一個新的矩陣)的輸出。ElementwiseProduct函數使用Hadamard乘積将每個輸入向量與給定縮放向量相乘。如果輸入向量的大小不等于縮放向量的大小,則轉換器将抛出IllegalArgumentException異常。

ElementwiseProduct 在機器學習中的應用場景比較廣泛,主要用于特征交叉,将兩個特征向量進行點乘得到新的特征向量。

例如,在推薦系統中,可以使用 ElementwiseProduct 将使用者的曆史行為和物品的屬性進行交叉,得到新的特征向量,進而提高推薦系統的準确性。

輸入

參數名稱 類型 預設值 描述
inputCols Vector "input" 需要進行縮放的特征

輸出

參數名稱 類型 預設值 描述
outputCols Vector "output" 縮放過後的特征

參數

名稱 預設值 類型 是否必填 描述
inputCols null String no 輸入列名稱
outputCols null String no 輸出列名稱
scalingVec null String yes 縮放向量

示例代碼:

import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;


/**
 * Simple program that creates an ElementwiseProduct instance and uses it for feature engineering.
 */
public class ElementwiseProductExample {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


        // Generates input data.
        DataStream<Row> inputStream =
                env.fromElements(
                        Row.of(0, Vectors.dense(1.1, 3.2)), Row.of(1, Vectors.dense(2.1, 3.1)));


        Table inputTable = tEnv.fromDataStream(inputStream).as("id", "vec");


        // Creates an ElementwiseProduct object and initializes its parameters.
        ElementwiseProduct elementwiseProduct =
                new ElementwiseProduct()
                        .setInputCol("vec")
                        .setOutputCol("outputVec")
                        .setScalingVec(Vectors.dense(1.1, 1.1));


        // Transforms input data.
        Table outputTable = elementwiseProduct.transform(inputTable)[0];


        // Extracts and displays the results.
        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
            Row row = it.next();
            Vector inputValue = (Vector) row.getField(elementwiseProduct.getInputCol());
            Vector outputValue = (Vector) row.getField(elementwiseProduct.getOutputCol());
            System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
        }
    }
}           

繼續閱讀