Feature Engineering 特征工程
資料與特征工程決定了模型的上限,改進算法隻不過是逼近這個上限而已。
特征工程是機器學習中非常重要的一部分,它是将原始資料轉換為模型可了解的形式。一般來說,特征工程包括特征建構、特征提取和特征選擇三個部分1其中,特征建構是指從原始資料中建構新的特征,例如,将時間戳轉換為小時、星期幾等等。特征提取是指從原始資料中提取有用的特征,例如,從文本中提取關鍵詞。而特征選擇則是指從所有的特征中選擇最有用的一些特征,以便于訓練模型。
特征工程是機器學習中最重要的步驟之一,需要耗費大量時間。目的是準備一個最适合機器學習算法的輸入資料集,并增強機器學習模型的性能。
Flink ML 2.2版本中大幅增加了這方面的算法支援,總共有33個特征工程方面的算法,涵蓋了各個方面的内容。
1、Binarizer
二值化
顧名思義,Binarizer在機器學習中是一種将連續特征值離散化的方法,通過設定門檻值将資料二值化(将特征值設定為0或1)。
Binarizer算法有許多應用,其中一些應用包括将連續的特征值二值化,如在圖像進行中将連續的像素值資料離散化為黑白像素值資料。 另一個應用是将多類标簽轉換為二進制标簽的過程,通過将每個類别轉換為一個二進制向量,其中隻有一個元素為1,其餘元素均為0.
Binarizer可以處理DenseVector、SparseVector或數值類型的連續特征。
輸入
參數名稱 | 類型 | 預設值 | 描述 |
inputCols | Number/Vector | null | 需要進行二值化的Number/Vectors |
輸出
參數名稱 | 類型 | 預設值 | 描述 |
outputCols | Number/Vector | null | 二值化的Number/Vectors |
參數
名稱 | 預設值 | 類型 | 是否必填 | 描述 |
inputCols | null | String[] | yes | 輸入列名稱 |
outputCols | null | String[] | yes | 輸出列名稱 |
thresholds | null | Double[] | yes | 用于将連續特征二值化的門檻值 |
示例代碼:
import org.apache.flink.ml.feature.binarizer.Binarizer;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/** Simple program that creates a Binarizer instance and uses it for feature engineering. */
public class BinarizerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
DataStream<Row> inputStream =
env.fromElements(
Row.of(
1,
Vectors.dense(1, 2),
Vectors.sparse(
17, new int[] {0, 3, 9}, new double[] {1.0, 2.0, 7.0})),
Row.of(
2,
Vectors.dense(2, 1),
Vectors.sparse(
17, new int[] {0, 2, 14}, new double[] {5.0, 4.0, 1.0})),
Row.of(
3,
Vectors.dense(5, 18),
Vectors.sparse(
17, new int[] {0, 11, 12}, new double[] {2.0, 4.0, 4.0})));
Table inputTable = tEnv.fromDataStream(inputStream).as("f0", "f1", "f2");
// Creates a Binarizer object and initializes its parameters.
Binarizer binarizer =
new Binarizer()
.setInputCols("f0", "f1", "f2")
.setOutputCols("of0", "of1", "of2")
.setThresholds(0.0, 0.0, 0.0);
// Transforms input data.
Table outputTable = binarizer.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Object[] inputValues = new Object[binarizer.getInputCols().length];
Object[] outputValues = new Object[binarizer.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = row.getField(binarizer.getInputCols()[i]);
outputValues[i] = row.getField(binarizer.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s\tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
2、Bucketizer
Bucketizer中文意思是“資料分桶”,是來源于 Spark MLlib 中的算法,用于将連續的數值型特征轉換為離散型特征。例如,将人分為年齡段分為老人、中年人、青年人、兒童,将溫度分為高、中、低等幾個檔次等等。
輸入
參數名稱 | 類型 | 預設值 | 描述 |
inputCols | Number | null | 要進行資料分桶的連續特征值 |
輸出
參數名稱 | 類型 | 預設值 | 描述 |
outputCols | Double | null | 離散化特征 |
參數
名稱 | 預設值 | 類型 | 是否必填 | 描述 |
inputCols | null | String[] | yes | 輸入列名稱 |
outputCols | null | String[] | yes | 輸出列名稱 |
handleInvalid | "error" | Double | no | 處理無效條目的政策。支援的值:‘error’,‘skip’,‘keep’。 |
splitsArray | null | Double[] | yes | 将連續特征映射到資料桶中的分割點數組 |
示例代碼:
import org.apache.flink.ml.common.param.HasHandleInvalid;
import org.apache.flink.ml.feature.bucketizer.Bucketizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
public class BucketizerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
DataStream<Row> inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");
// Creates a Bucketizer object and initializes its parameters.
Double[][] splitsArray =
new Double[][] {
new Double[] {-0.5, 0.0, 0.5},
new Double[] {-1.0, 0.0, 2.0},
new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
};
Bucketizer bucketizer =
new Bucketizer()
.setInputCols("f1", "f2", "f3", "f4")
.setOutputCols("o1", "o2", "o3", "o4")
.setSplitsArray(splitsArray)
.setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
// Uses the Bucketizer object for feature transformations.
Table outputTable = bucketizer.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
double[] inputValues = new double[bucketizer.getInputCols().length];
double[] outputValues = new double[bucketizer.getInputCols().length];
for (int i = 0; i < inputValues.length; i++) {
inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
}
System.out.printf(
"Input Values: %s\tOutput Values: %s\n",
Arrays.toString(inputValues), Arrays.toString(outputValues));
}
}
}
3、CountVectorizer(計數向量化器)
CountVectorizer是屬于常見的特征數值計算類,是一個文本特征提取方法。對于每一個訓練文本,它隻考慮每種詞彙在該訓練文本中出現的頻率。
CountVectorizer可以将文本轉換為向量,以便進行機器學習和自然語言處理。它可以用于提取文本中的特征,例如單詞計數,以便進行分類和聚類。
輸入
參數名稱 | 類型 | 預設值 | 描述 |
inputCols | String[] | "input" | 字元串數組 |
輸出
參數名稱 | 類型 | 預設值 | 描述 |
outputCols | SparseVector | "output" | 标記計數的向量 |
參數
名稱 | 預設值 | 類型 | 是否必填 | 描述 |
inputCols | "input" | String | no | 輸入列名稱 |
outputCols | "output" | String | no | 輸出列名稱 |
minTF | 1.0 | Double | no | 過濾以忽略文檔中的罕見單詞。對于每個文檔,忽略出現次數/計數小于給定門檻值的術語。如果這是一個大于等于1的整數,則指定了計數(術語必須在文檔中出現的次數);如果這是[0,1)中的雙倍,則指定了分數(文檔的标記計數) |
binary | false | Boolean | no | 二進制切換以控制輸出向量值。如果為True,則将所有非零計數(應用minTF過濾器後)設定為1.0。 |
vocabularySize | 2^18 | Interger | no | 詞彙表的最大值大小。CountVectorizer将建構一個詞彙表,該詞彙表僅考慮按語料庫中的術語頻率排序的前vocabulary size個術語。 |
minDF | 1.0 | Double | no | 指定術語必須出現在不同文檔中的最小數量,才能包含在詞彙表中。如果這是大于等于1的整數,則指定術語必須出現在的文檔數;如果這是[0,1)中的雙倍,則指定文檔的分數。 |
maxDF | 2^63 - 1 | Double | no | 指定術語必須出現在不同文檔中的最大數量,才能包含在詞彙表中。出現超過門檻值的術語将被忽略。如果這是大于等于1的整數,則指定術語可能出現在的文檔數的最大值;如果這是[0,1)中的雙倍,則指定術語可能出現在的文檔分數的最大值。 |
示例代碼:
import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
import org.apache.flink.ml.linalg.SparseVector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
/**
* Simple program that trains a {@link CountVectorizer} model and uses it for feature engineering.
*/
public class CountVectorizerExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input training and prediction data.
DataStream<Row> dataStream =
env.fromElements(
Row.of((Object) new String[] {"a", "c", "b", "c"}),
Row.of((Object) new String[] {"c", "d", "e"}),
Row.of((Object) new String[] {"a", "b", "c"}),
Row.of((Object) new String[] {"e", "f"}),
Row.of((Object) new String[] {"a", "c", "a"}));
Table inputTable = tEnv.fromDataStream(dataStream).as("input");
// Creates an CountVectorizer object and initialize its parameters
CountVectorizer countVectorizer = new CountVectorizer();
// Trains the CountVectorizer model
CountVectorizerModel model = countVectorizer.fit(inputTable);
// Uses the CountVectorizer model for predictions.
Table outputTable = model.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
String[] inputValue = (String[]) row.getField(countVectorizer.getInputCol());
SparseVector outputValue = (SparseVector) row.getField(countVectorizer.getOutputCol());
System.out.printf(
"Input Value: %-15s \tOutput Value: %s\n",
Arrays.toString(inputValue), outputValue.toString());
}
}
}
4、DCT
DCT算法是一種特征工程算法,它可以将1D實向量的離散餘弦變換應用于特征提取。在圖像進行中,DCT算法可以用于壓縮圖像,而在音頻進行中,DCT算法可以用于壓縮音頻。在機器學習中,DCT算法可以用于特征提取和資料壓縮。
DCT算法的應用場景包括信号處理、資料壓縮、圖像處理和音頻處理,在信号進行中,DCT算法可以用于降噪和濾波。在資料壓縮中,可以用于壓縮圖像和音頻。在圖像進行中,可以用于圖像壓縮、圖像增強和圖像分析。 在音頻進行中,可以用于音頻壓縮、音頻增強和音頻分析。
輸入
參數名稱 | 類型 | 預設值 | 描述 |
inputCols | Vector | "input" | Input vector to be cosine transformed. |
輸出
參數名稱 | 類型 | 預設值 | 描述 |
outputCols | Vector | "output" | Cosine transformed output vector. |
參數
名稱 | 預設值 | 類型 | 是否必填 | 描述 |
inputCols | null | String[] | no | 輸入列名稱 |
outputCols | null | String[] | no | 輸出列名稱 |
inverse | false | Boolean | no | 這是一個關于DCT的參數,它訓示是否執行反向DCT(true)或正向DCT(false) |
示例代碼:
import org.apache.flink.ml.feature.dct.DCT;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import java.util.Arrays;
import java.util.List;
/** Simple program that creates a DCT instance and uses it for feature engineering. */
public class DCTExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
List<Vector> inputData =
Arrays.asList(
Vectors.dense(1.0, 1.0, 1.0, 1.0), Vectors.dense(1.0, 0.0, -1.0, 0.0));
Table inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("input");
// Creates a DCT object and initializes its parameters.
DCT dct = new DCT();
// Uses the DCT object for feature transformations.
Table outputTable = dct.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Vector inputValue = row.getFieldAs(dct.getInputCol());
Vector outputValue = row.getFieldAs(dct.getOutputCol());
System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
}
}
}
5、ElementwiseProduct
ElementwiseProduct是來源于Spark MLlib中的算法,它執行每個輸入向量與提供的“權重”向量的Hadamard積(Hadamard product是矩陣的一種運算,它是将兩個矩陣中對應位置的元素相乘得到一個新的矩陣)的輸出。ElementwiseProduct函數使用Hadamard乘積将每個輸入向量與給定縮放向量相乘。如果輸入向量的大小不等于縮放向量的大小,則轉換器将抛出IllegalArgumentException異常。
ElementwiseProduct 在機器學習中的應用場景比較廣泛,主要用于特征交叉,将兩個特征向量進行點乘得到新的特征向量。
例如,在推薦系統中,可以使用 ElementwiseProduct 将使用者的曆史行為和物品的屬性進行交叉,得到新的特征向量,進而提高推薦系統的準确性。
輸入
參數名稱 | 類型 | 預設值 | 描述 |
inputCols | Vector | "input" | 需要進行縮放的特征 |
輸出
參數名稱 | 類型 | 預設值 | 描述 |
outputCols | Vector | "output" | 縮放過後的特征 |
參數
名稱 | 預設值 | 類型 | 是否必填 | 描述 |
inputCols | null | String | no | 輸入列名稱 |
outputCols | null | String | no | 輸出列名稱 |
scalingVec | null | String | yes | 縮放向量 |
示例代碼:
import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
import org.apache.flink.ml.linalg.Vector;
import org.apache.flink.ml.linalg.Vectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
/**
* Simple program that creates an ElementwiseProduct instance and uses it for feature engineering.
*/
public class ElementwiseProductExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Generates input data.
DataStream<Row> inputStream =
env.fromElements(
Row.of(0, Vectors.dense(1.1, 3.2)), Row.of(1, Vectors.dense(2.1, 3.1)));
Table inputTable = tEnv.fromDataStream(inputStream).as("id", "vec");
// Creates an ElementwiseProduct object and initializes its parameters.
ElementwiseProduct elementwiseProduct =
new ElementwiseProduct()
.setInputCol("vec")
.setOutputCol("outputVec")
.setScalingVec(Vectors.dense(1.1, 1.1));
// Transforms input data.
Table outputTable = elementwiseProduct.transform(inputTable)[0];
// Extracts and displays the results.
for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
Row row = it.next();
Vector inputValue = (Vector) row.getField(elementwiseProduct.getInputCol());
Vector outputValue = (Vector) row.getField(elementwiseProduct.getOutputCol());
System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
}
}
}