天天看點

使用Spark SQL進行流式機器學習計算(上)1. 什麼是流式機器學習2. 機器學習模型擷取途徑3. 系統示範4. 小結

作者:餘根茂,阿裡巴巴計算平台事業部EMR團隊的技術專家,參與了Hadoop,Spark,Kafka等開源項目的研發工作。目前主要專注于EMR流式計算産品的研發工作。

今天來和大家聊一下如何使用Spark SQL進行流式資料的機器學習處理。本文主要分為以下幾個章節:

  • 什麼是流式機器學習
  • 機器學習模型擷取途徑
  • 系統示範

1. 什麼是流式機器學習

通常,當我們聽到有人提到實時資料機器學習時,其實他們是讨論:

  • 他們希望有一個模型,這個模型利用最近曆史資訊來進行預測分析。舉一個天氣的例子,如果最近幾天都是晴天,那麼未來幾天極小機率會出現雨雪和低溫天氣
  • 這個模型還需要是可更新的。當資料流經系統時,模型是可以随之進化更新。舉個例子,随着業務規模的擴大,我們希望零售銷售模型仍然保持準确。

第一個例子我們可以将它歸為時序預測。第二個例子中,模型需要更新或者重新訓練,這是一個non-stationarity問題。時序預測和non-stationarity資料分布是兩類不同的問題。本文主要關注第二類問題,對于這類問題,一般的解決方案主要有:

  • 增量式算法:有一些算法支援通過資料逐漸學習。也就是說,每次進來一些新的資料時,模型會被更新。SVM,神經網絡等算法都有增量式版本,此外貝葉斯網絡也可以用作增量學習。
  • 周期重新學習:一個更加直接的方法就是用一批最新資料重新訓練我們的模型。這種方法可以用到的絕大多數的算法上。

2. 機器學習模型擷取途徑

實時機器學習應用分成兩塊,一部分是模型實時訓練,另一部分是資料實時預測分析。現實中,我們可能沒法實作模型的實時訓練,隻能退而求其次地使用已經訓練好模型。這些模型可能會周期性地使用曆史資料訓練更新一次。是以,我們可以根據實際的算法和模型時效性要求,來選擇實時訓練模型還是使用預訓練好的模型。

  • 模型算法支援增量訓練:可以選擇用流式資料實時訓練更新
  • 模型算法不自持增量訓練:可以選擇用離線資料預先訓練好模式

回到主題上,我們要實作使用Spark SQL進行流式機器學習。前面幾篇文章已經簡單介紹了EMR如何使用Spark SQL進行流式ETL處理。既然要進行機器學習,我們很自然地想到Spark MLlib。DataBricks有篇文檔介紹了在Spark Structured Streaming進行機器學習,大家有興趣的可以看下。如果想将Spark MLlib應用到Spark SQL上,我們可以簡單地将MLlib算法包裝成UDF使用。另外一個模型擷取途徑是利用阿裡雲上的一些線上機器學習服務,我們可以将線上機器學習服務使用UDF封裝後使用。

  • 使用UDF封裝現有的Spark MLlib算法
  • 使用UDF封裝阿裡雲線上機器學習服務

限于篇幅,我會分兩篇文章分别介紹這兩個方式,本文将簡單介紹如何利用Spark MLlib進行流式機器學習。

3. 系統示範

本節,我們将示範一下如何利用邏輯回歸算法進行示範。

3.1 系統架構

下面這張圖展示了整個實時監測系統的架構,前端接LogService資料,實時監測分析結果寫入到RDS,最後通過DataV展示出來。

使用Spark SQL進行流式機器學習計算(上)1. 什麼是流式機器學習2. 機器學習模型擷取途徑3. 系統示範4. 小結

3.2 測試資料集

測試資料集使用Spark自帶的sample_libsvm_data.txt,我們要做的是寫一個資料生成器,将資料集的資料不斷地向SLS中發送,模拟流式資料。

算法模型準備

Spark MLlib提供了大量的機器學習算法實作,可以友善的再RDD或者DataFrame API上使用,但是無法直接用在SQL API上,是以我們需要使用UDF來封裝一下。這裡,我們選用邏輯回歸算法,具體的實作就不細說了,可以參考這裡的代碼:

LogisticRegressionUDF.scala

3.4 部署測試

  • CLI
git clone [email protected]:aliyun/aliyun-emapreduce-sdk.git
cd aliyun-emapreduce-sdk
git checkout -b master-2.x origin/master-2.x
mvn clean package -DskipTests

## 編譯完後, assembly/target目錄下會生成emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar

spark-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar --driver-class-path emr-datasources_shaded_2.11-1.7.0-SNAPSHOT.jar           
  • 建表
spark-sql> USE default;

-- 測試資料源
spark-sql> CREATE TABLE IF NOT EXISTS sls_dataset
USING loghub
OPTIONS (
sls.project = "${logProjectName}",
sls.store = "${logStoreName}",
access.key.id = "${accessKeyId}",
access.key.secret = "${accessKeySecret}",
endpoint = "${endpoint}");

spark-sql> DESC sls_dataset
__logProject__  string  NULL
__logStore__  string  NULL
__shard__ int NULL
__time__  timestamp NULL
__topic__ string  NULL
__source__  string  NULL
label string  NULL
features  string  NULL
__tag__hostname__ string  NULL
__tag__path__ string  NULL
__tag__receive_time__ string  NULL
Time taken: 0.058 seconds, Fetched 11 row(s)

-- 結果資料源
spark-sql> CREATE TABLE IF NOT EXISTS rds_result
USING jdbc2
OPTIONS (
url="${rdsUrl}",
driver="com.mysql.jdbc.Driver",
dbtable="${rdsTableName}",
user="${user}",
password="${password}",
batchsize="100",
isolationLevel="NONE");

spark-sql> DESC rds_result;
acc double  NULL
label double  NULL
time  string  NULL
Time taken: 0.457 seconds, Fetched 3 row(s)           
  • 注冊UDF
CREATE FUNCTION Logistic_Regression AS 'org.apache.spark.sql.aliyun.udfs.ml.LogisticRegressionUDF' USING JAR '${udf_jar_path}';           
  • 送出執行
SET spark.sql.streaming.checkpointLocation.lr_prediction=hdfs:///tmp/spark/lr_prediction;
SET spark.sql.streaming.query.outputMode.lr_prediction=update;
-- 由于DataSource是基于JDBC實作的,是以我們需要設定向RDS表插入資料的SQL
-- 這裡我的RDS表名是`result`
SET streaming.query.lr_prediction.sql=insert into `result`(`time`, `label`, `acc`) values(?, ?, ?);

INSERT INTO 
rds_result 
SELECT 
window.start, 
label, 
sum(if(tb.predict = tb.label, 1, 0)) / count(tb.label) as acc 
FROM(
SELECT 
default.Logistic_Regression("${LR_model_path}", concat_ws(" ", label, features)) as predict, 
label, 
__time__ as time 
FROM sls_dataset) tb 
GROUP BY TUMBLING(tb.time, interval 10 second), tb.label;           

3.5 效果展示

在DataV中配置上面的RDS結果表,使用折線圖檢視label=1的預測準确率,如下:

使用Spark SQL進行流式機器學習計算(上)1. 什麼是流式機器學習2. 機器學習模型擷取途徑3. 系統示範4. 小結

4. 小結

本文簡要介紹了流式機器學習面臨的幾個問題,以及相應的解決方法。并使用Spark SQL結合Spark MLlib示範了一個流式機器學習的案例。下一篇,我會簡要介紹Spark SQL如何結合阿裡雲的線上機器學習服務來進行流式機器學習應用開發。

繼續閱讀