創作人:田雪松
審稿人:李捷
業務背景介紹
當代商業組織面臨的最基本挑戰,是網際網路已經不再是一個替代或可選管道,它已經成為許多企業最主要的、甚至是惟一的銷售平台。網上店面在現實中往往比實體店面還要重要,是以人們就必須要像監視實體店面一樣,監控網上應用。
監控系統通常會以推送(Push)或拉取(Pull)的方式,從服務或應用中擷取監控名額(Metrics),并在監控名額出現異常時,發送警報或恢複服務,以實作網上店面實時可用的目标。
監控系統在提升了應用可用性同時,還在日積月累中形成海量監控資料,而從這些監控資料中挖掘出巨大的商業價值。監控名額會包含一些特定業務場景下的業務資料,借助大資料分析工具對它們進行處理和模組化後,就可以輔助人們作出正确的市場決策。
本章要介紹的案例就是基于 Elasticsearch 對監控資料進行商業挖掘的一次嘗試,我們内部稱這套系統為 Prophet(大預言家)。Prophet 通過機器學習對曆史監控資料進行處理,并建立預測模型,最終可以實作對單次使用者請求處理的全面預測,包括處理結果是否成功、處理執行時長等等。
Prophet預測系統架構
Prophet 雖然隻是監控資料最終的分析處理服務,但支撐 Prophet 實作智能預測的整個系統卻涵蓋了四個主要部分:
- 第一部分是産生海量監控資料的監控系統,它從被監控的業務系統中拉取監控資料,并提供實時告警服務。
- 第二部分則是将監控資料,從監控系統中導入到 Elasticsearch 叢集的資料系統,它還負責對監控資料進行清洗、轉換;
- 第三部分是用于存儲曆史監控資料的 Elasticsearch 叢集,我們借助 Elasticsearch 冷熱模型,延長了監控資料的儲存時間;
- 而最後一個才是最終處理這些資料的 Prophet 服務,它通過 Spark 運算叢集,實作智能預測功能,未來還會添加更多資料分析功能。
四個系統之間的資料流動關系如圖 1-1 所示:
圖1-1 Prophet周邊生态
除了 Elasticsearch 以外,HDFS 也是存儲曆史監控資料的一種不錯選項,并且 HDFS 更容易與 Hadoop 的大資料生态系統內建。
我們之是以采用 Elasticsearch 主要還是被 Elasticsearch 強大的檢索能力所吸引。同時,Elastic Stack 家族中的 Kibana 還提供了強大的資料可視化能力,至于資料分析則可以利用 elasticsearch-hadoop 插件整合 Hadoop 生态。
使用 Elasticsearch 存儲監控資料
在 Prophet 誕生之前,我們的業務系統已經使用 Prometheus 做到了實時監控,同時還通過 Grafana 對監控資料做了可視化處理。
但 Prometheus 監控資料存儲,在其内置 TSDB(Time Serie Database)中,Prometheus TSDB 中并沒有分片和副本等概念,而隻是簡單地将資料儲存在本地硬碟上。這意味着 Prometheus 無法支援資料高可用,同時也意味着 Prometheus TSDB 中的資料不能儲存太久,否則本地硬碟遲早會被積累的資料撐爆。是以 Prometheus 預設隻儲存監控資料 15 天,超過這個時間後監控資料就會被直接删除。
對于一個監控系統來說這并不算什麼大的問題,因為監控系統通常對實時資料更感興趣,它隻要能夠實時快速地反映,被監控系統的異常就足夠了。而曆史資料分析和處理,并不能算是監控系統的職責範疇,應該交由 Elasticsearch 這樣更專業的資料檢索與分析工具來實作。
從另一個角度來說,由于預測系統,必須要基于曆史資料建立預測模型,而如果直接在 Prometheus TSDB 上進行高頻度的模型運算,則有可能會對監控系統本身造成性能上的影響。從系統監控與資料分析的職責角度來看,監控系統的穩定性無疑比資料分析更為重要。是以預測系統使用的曆史資料,應該與 Prometheus 隔離開來,這就要求監控資料得從 Prometheus 中同步到 Elasticsearch 。一旦資料從 Prometheus 進入到 Elasticsearch ,我們就可以利用 Elasticsearch 冷熱架構對資料進行優化以儲存更長時間。
Prometheus 與 Elasticsearch 的資料同步
Prometheus 支援一種稱為遠端讀寫(Remote Read/Write)的功能,可以在 Prometheus 拉取監控資料時将資料寫入到遠端的資料源中,或者從遠端資料源中讀取監控資料做處理。
Prometheus 目前支援的遠端資料源多達幾十種,所有資料源都支援遠端寫入,但并不是所有資料源都同時支援遠端讀取,比如我們這裡要使用的 Elasticsearch 就隻支援遠端寫入。
注:7.10版本是支援的,可以通過 metricbeat 對 Prometheus 進行遠端拉取,參見:[__https://www.elastic.co/guide/en/beats/metricbeat/current/metricbeat-metricset-prometheus-query.html__ ]
Prometheus 為遠端讀取和寫入,分别定義了獨立的通信與編碼協定,第三方存儲元件需要向Prometheus 提供相容該協定的 HTTP 接口位址。但由于衆多第三方存儲元件接收資料的方式并不相同,支援 Prometheus 讀寫協定的接口位址并不一定存在。為此,Prometheus 采用了擴充卡的方式屏蔽不同存儲元件之間的差異,
如圖1-2所示:
圖1-2 Prometheus 遠端讀寫
隻要提供了擴充卡,Prometheus 讀寫資料就可以轉換成第三方存儲元件支援的協定和編碼。Elasticsearch 也需要使用擴充卡,這個擴充卡就是由 Elastic Stack 家族中的 Beat 元件擔任。早期 Prometheus 官方推薦的擴充卡是 Infonova 開發的 Prometheusbeat,而在最新版本中已經轉而推薦 Elastic Stack 官方的 Metricbeat。
無論是 Prometheusbeat 還是 Metricbeat,它們都可以向 Prometheus 開放 HTTP 監聽位址,并且在接收到 Prometheus 監控資料後将它們存儲到 Elasticsearch 中。Metricbeat 采用 Module 的形式開啟面向 Prometheus 的 HTTP 接口,具體配置如示例 1.1 所示:
- module: prometheus
metricsets: ["remote_write"]
host: "localhost"
port: "9201"
示例1.1 Metricbeat 配置
添加示例所示配置内容後重新開機 Metricbeat,它就會在本機 9201端口開始監聽 Prometheus 的寫入資料。與此同時還要将這個監聽位址告訴 Prometheus,這樣它才知道在拉取到資料後向哪裡寫入。具體來說就是在 prometheus.yml中 添加如下内容:
remote_write:
- url: "https://localhost:9201/write"
示例1.2 Prometheus 配置
使用 MetricBeat 元件儲存監控資料,監控名額的标簽(Label)會轉換為 Elasticsearch 的文檔字段。Metricbeat 還會附加一些 Metricbeat 自身相關的資料,是以文檔中的字段會比實際監控名額的标簽要多一些。
除了名額名稱和标簽以外,還包括主機位址、作業系統等資訊,這可能會導緻樣本資料量急劇膨脹。但 Prometheus 并不支援在寫入遠端存儲前過濾樣本,是以隻能通過 Beat 元件處理。
[注:最高效的做法是在 metricbeat 上設定 drop_fields 的 processor,直接過濾,避免無效的網絡傳輸]
比較理想的方法是将 Beat 元件的輸出設定為 Logstash,然後再在 Logstash 中做資料過濾,最後再由 Logstash 轉存到 Elasticsearch 中。
在我們的系統中,除了要過濾以上 Metricbeat 附加的字段以外,還需要要組合、修正一些業務相關的字段。是以為了更好的處理資料,我們在整個資料系統中加入了 Logstash 元件.
最終整個資料系統的大緻結構如圖 1-3 所示:
圖1-3 資料系統
Prometheus 在拉取到監控名額時,将資料同時發送給 Metricbeat 或 Prometheusbeat,它們在這裡職責就是一個監控資料接收的端點。
Metricbeat 或 Prometheusbeat 再将資料發送給 Logstash,而 Logstash 則負責清洗和整理監控資料,并将它們存儲到 Elasticsearch 中。我們将整個監控系統和資料系統部署在 Kubernetes 叢集中,而最終用于機器學習的資料源 Elasticsearch 則被部署為獨立的叢集。
這樣就将監控系統與資料分析系統隔離開來,進而保證了監控系統,不會受到大量資料分析運算的影響而出現故障。實際上圖 1-3 就是對圖 1-1中 所展示的監控系統、資料系統以及 Elasticsearch 叢集的細化,但在實際應用中還有許多細節沒有展現出來,需要讀者根據項目實際情況做适當調整。
內建 Elasticsearch 與 Spark
事實上,新版 Kibana 元件中已經具備了非常強大的機器學習能力,但這項功能還未開放在基礎授權中。此外我們的業務系統還有一些自身的特殊性,是以最終我們決定采用 Spark 編寫代碼實作這套智能預測系統。
一種顯而易見的辦法是利用 Elasticsearch 用戶端接口,先将資料從 Elasticsearch 中讀取出來再傳給 Spark 進行分析處理。但 Spark 資料處理是先将任務分解成子任務,再将它們分發到不同計算節點上并行處理,以此提升海量資料分析處理的速度。而任務分解的同時是資料也要分解,否則最終資料讀取,就會成為所有子任務的瓶頸,這種分解後的資料在 Spark 中稱為分區(Partition)。每個任務在各自的資料分區上運算處理,最終再将各自的處理結果按 Map/Reduce 的思想合并起來。
Elasticsearch 中的分片(Shard)剛好與 Spark 分區相契合,如果能讓任務運算于分片之上,這無疑可以更充分地利用 Elasticsearch 存儲特征。但如果是先從 Elasticsearch 讀取資料再發送給Spark做處理,資料就是經由分片合并後的資料,Spark 在運算時就需要對資料再次進行分區。
Elasticsearch 包含一個開源的 elasticsearch-hadoop 項目,可以很好地解決資料分區問題。elasticsearch-hadoop 可以與包括 Spark 在内的 Hadoop 生态良好內建,Spark 分區資料也可以與 Elasticsearch 分片資料直接對應起來。我們所要做隻是将 elasticsearch-hadoop 的 Spark 依賴添加到項目中,并使用其提供的接口将資料從 Elasticsearch 中讀取進來即可。智能預測系統采用 Java 語言開發,是以使用 Maven 引入如下依賴:
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-${spark.major.version}_${scala.version}</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
示例1.3 引入依賴
如示例 1.3 所示,elasticsearch-hadoop 的 Spark 依賴為 elasticsearch-spark。由于需要用到Spark ML 相關庫,示例也将 spark-sql 和 spark-mllib 依賴也一并引入了進來。elasticsearch-spark 在
org.elasticsearch.spark.rdd.api.java
包中定義了類 JavaEsSpark,它提供了大量用于讀取和寫入 Elasticsearch 的靜态方法。其中,靜态方法
esRDD
用于從 Elasticsearch 中讀取索引,而
saveToEs
則用于向 Elasticsearch 中存儲資料。
示例展示了從 Elasticsearch 中讀取索引資料的代碼片段:
List<Row> jobs = esRDD(sparkContext, metricIndex, queryString).values().map(
map -> {
Object[] values = new Object[featureFields.length+1];
for (int i = 0; i < featureFields.length; i++) {
values[i] = map.get(featureFields[i]);
}
values[featureFields.length] = map.get(labelField);
return RowFactory.create(values);
}
).collect();
StructField[] structFields = new StructField[featureFields.length + 1];
for (int i = 0; i < featureFields.length; i++) {
structFields[i] = new StructField(featureFields[i], DataTypes.StringType, true, Metadata.empty());
}
structFields[featureFields.length] = new StructField(labelField, DataTypes.StringType, true, Metadata.empty());
StructType schema = new StructType(structFields);
Dataset<Row> dataset = sparkSession.createDataFrame(jobs, schema);
示例1.4 使用 Spark 讀入資料
示例中,
sparkContext
是
SparkContext
執行個體,代表了與 Spark 運算環境相關的一些基本資訊。MetricIndex 則是
esRDD
方法要讀取文檔資料的索引名稱,而 queryString 則是檢索文檔的查詢條件。
esRDD
傳回的類型為
JavaPairRDD
,這是類似于一個 Pair 的集合。
每一個 Pair 代表索引中的一個文檔,Pair 的 key 是字元串類型,代表了目前文檔的辨別符;而 value 則是一個
Map
類型,代表了整個文檔資料。通過
JavaPairRDD
的 keys 方法将隻傳回所有文檔的辨別符,而通過 values 方法則會隻傳回所有文檔的 Map 對象。
Map 的鍵為文檔字段名稱,而值則為文檔字段對應的具體數值。示例中的代碼就是通過調用 values 方法隻擷取文檔的 Map 對象,然後再通過 map 和 collect 方法将它們轉換成 Row 的集合。
分類與回歸
機器學習本質上是基于統計學原理,建構數學模型的過程,而建構出來的模型又可以用于資料分析與資料預測。機器學習一般分為監督學習和無監督學習兩大類,它們分别針對有标注(Label)資料和無标注資料建構數學模型。
标注可以認為是對資料在某一次元上的識别,比如讓計算機識别聲音或圖像中的文字,就需要先對它們做标注,然後才可以根據标注對聲音或圖像建構數學模型。具體來說,人們先要提供一組與文字關聯好的音頻或圖檔,計算機再根據這些标注好的資料,找出從聲音頻率、圖檔像素到文字的映射關系。由于不同人對文字的發音和書寫存在着巨大的差異,是以需要通過海量音頻和圖像進行統計分析,找出這種映射關系中最為本質的轉換模型。是以監督學習的本質是學習輸入到輸出的統計學規律,隻不過這些映射關系并不像加減法那樣直覺,需要借助計算機統計與分析才能完成。
無監督學習則是根據無标注資料進行數學模組化的過程,它不存在輸入與輸出,本質上是學習資料中天然的統計規律或潛在結構。比如使用者的消費行為就很難根據他們的年齡、性别、學曆等特征做區分,而且由于涉及隐私這些資料也不一定會在系統中查到。換句話說,即使兩個人的硬性特征完全相同,但在人生經曆、思想認識等方面存在着的差異,依然可能使他們的消費行為不同。是以這時就隻能通過使用者消費行為産生的資料對他們進行聚類,相同聚類的使用者往往有着相類似的消費習慣,聚類結果也就可以應用于商品推薦系統之中了。
本章介紹的智能預測系統是一種通過監督學習,建構預測模型的機器學習系統,本質上就是發現已知條件與未知結果之間的統計規律。我們之是以要建立這樣一個系統,是因為在以往對業務系統日志的統計分析中發現,在某些特定條件下某些使用者請求一定會失敗。這讓我們堅信在已知請求條件與未知運作結果之間,一定存在着某種映射關系,我們希望借由這樣一個系統找到這種神秘的映射關系。
在監控系統的資料中原本就包含了請求執行結果,這個資料就可以作為整個請求與處理相關資料的标注。在監督學習中,作為輸入的資料一般稱為特征(Feature),而輸出的結果則一般稱為标注(Label)。在示例中定義的 featureFields 定義的就是索引上那些可以作為特征的字段,而 labelField 則是索引上可以作為标注的字段。示例中的代碼就是将特征字段的值和标注字段的值從 Map 中取出,并将它們統一放置到一個數組中并轉換為 Spark 運算需要使用的 DataFrame。
正如前面所介紹的那樣,監督學習的本質是學習輸入到輸出的統計學規律。如果從輸入到輸出的産出結果是有限的,那麼這時機器學習生成的模型稱為分類器(Classifier),而它解決的問題就是分類問題。比如我們正在介紹智能預測系統,它通過已知的請求條件預測執行結果,輸入是在請求執行前已知的請求條件,而輸出則是該請求的執行結果。
執行結果隻有兩種結果,要麼成功要麼失敗,是以計算機學習出來的數學模型就是一個分類器。除了分類問題以外,監督學習可以解決的另一種類問題是回歸問題。與分類問題不同,回歸問題輸出的不是有限數量的結果,而是數值連續的結果,它更像我們在數學中學習到正常函數。
同樣是以請求條件作為輸入,如果預測的不是執行結果而是執行時間,那麼這時要解決的就不再是分類問題而是回歸問題。因為請求的執行時間并不是離散的,它可能是任意一個非負的數值。
Spark機器學習支援多種實作分類與回歸問題的算法,包括邏輯回歸、決策樹、随機森林、梯度提升樹等等。我們很難在一個章節中将上述算法完全講解清楚,但所幸應用這些算法做預測也并不需要深入了解算法原理。本章僅以梯度提升樹為例解決分類問題,也就是通過梯度提升樹實作對請求執行結果的預測,讀者可根據本章所列方法實作對請求執行時間的預測。
Spark 在實作機器學習時一般會将資料分成兩組,一組用于建構數學模型,而另一組則用于驗證已建構的模型。
示例中的代碼就是将示例中生成的資料按 8:2 分成兩組,8 成的資料用于生成模型,而 2 成的資料用于驗證模型:
Dataset<Row>[] splits = featurized.randomSplit(new double[]{0.8, 0.2});
GBTClassifier gbt = new GBTClassifier()
.setLabelCol(LABEL_COL)
.setFeaturesCol(FEATURES_COL)
.setMaxIter(10);
GBTClassificationModel model = gbt.fit(splits[0]);
Dataset<Row> predictions = model.transform(splits[1]);
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(LABEL_COL)
.setPredictionCol(PREDICTION_COL)
.setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
log.info("Test Error = " + (1.0 - accuracy));
model.write().overwrite().save(path);
示例1.5 生成預測模型
如示例所示,GBTClassifier 就是基于 GBT 算法的分類器,通過它可以生成基于 GBT 算法的分類模型。setLabelCol 和 setFeaturesCol 分别設定了資料的标注和特征,而 setMaxIter 則設定了生成模型時疊代的次數。有了 GBT 分類器後,調用其 fit 方法并将8成的資料 splits[0] 傳入就會觸發模型建立的運算。
模型生成的運算通常比較慢,具體時間長度取決于參與運算的資料量大小,但一般都是分鐘級别的。由于模型一旦生成往往可以複用,是以在資料變化不是特别劇烈的情況下,可以将模型儲存到硬碟。這樣在需要預測時就不用再通過比對資料生成模型,通過硬碟中儲存的現成模型資料就可迅速生成模型。是以在我們的實作中,會每天定時更新一次模型到硬碟,但在執行個體應用模型時,都是直接從硬碟中加載模型以節省運算時間。
示例展示了從硬碟中加載模型,并根據輸入的特征向量進行預測的代碼片段:
GBTClassificationModel model = GBTClassificationModel.load(path);
Dataset<Row> predictions = model.transform(featurized);
predictions.show(false);
示例1.6 實作預測
示例中使用的 tranform 方法在這裡就是最終用于預測的方法,傳入的 featurized 則是經過向量化的已知資料。
當然,以上示例代碼中省略大量的代碼細節,在實際應用中還有許多更為具體的問題要解決。根據我們應用的效果來看,預測結果的準确率可以達到 95% 以上。
回歸類問題的處理過程與本章所介紹内容類似,感興趣的讀者可根據本章内容嘗試實作對請求處理時間的預測。