作者:伯箫,阿裡雲進階開發工程師。現在在阿裡雲表格存儲團隊,負責管控系統的開發,對NOSQL類資料庫系統有一些了解。
前言
從EMR-3.21.0 版本開始将提供Spark Streaming SQL的預覽版功能,支援使用SQL來開發流式分析作業。結果資料可以實時寫入Tablestore。
本文以LogHub為資料源,收集ECS上的日志資料,通過Spark Streaming SQL進行聚合後,将流計算結果資料實時寫入Tablestore,展示一個簡單的日志監控場景。
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyUmZygTZjNGO5IWO0ADN0EmZ1ADO1ETOiJmY2UWZkdTMkZjZmRWYz8CXt92Yu4GZjlGbh5SZslmZxl3Lc9CX6MHc0RHaiojIsJye.png)
場景設計
假設有一個商品表Goods,商品資訊開放給使用者浏覽,使用者浏覽完以後會産生以下格式的日志資料:
"RequestId":"c85df119-f6db-449f-89bb-6773d2468f89",
"Time":2019-07-30 12:05:28,
"GoodsName":"shoes",
"OperationType":"query"
}
我們需要将原始日志資料,根據GoodsName、OperationType和時間,聚合成一分鐘一個點的監控資料。用于監控各商品的通路情況。
最終需要的監控資料格式如下:
過程如下圖所示,比較簡單。
技術選型
本文主要來看一下結果資料存放資料庫的選型。對于本文的監控場景,結果資料的量級,取決于商品數量。真實情況下,可能還要增加商品的規格、顔色等監控名額,結果資料量會比較大。此外,對于時間粒度比較小的監控資料,一般都隻需要保留最近的,時間比較久的曆史資料需要删除。
将Tablestore與傳統關系型資料庫MySQL進行對比,Tablestore有以下優勢:
-
支援海量資料,無縫擴充
表格存儲通過資料分片和負載均衡技術,實作了無縫擴充。
-
支援資料自動過期
資料生命周期(Time To Live,簡稱 TTL)是資料表的一個屬性,即資料的存活時間,機關為秒。表格存儲會在背景對超過存活時間的資料進行清理,以減少使用者的資料存儲空間,降低存儲成本。監控場景下比較适用,不需要手動去删除資料。
預備工作
建立EMR叢集
開通EMR之前,先要對雲賬号進行實名認證,然後建立預設的EMR角色并授予 AliyunEMRDefaultRole和AliyunEmrEcsDefaultRole這兩個角色。
EMR暫時還不支援在官網控制台上配置寫入Tablestore的任務,需要登入到MER叢集的機器上去操作,是以開通EMR叢集的時候,請選擇自定義購買,并在最後一步打開挂載公網、遠端登入、密碼方式登入,自己設定一個密碼,如下圖。
開通完以後,進入到ECS控制台,會看到有一台Master節點機器有彈性IP。後面步驟中,操作EMR Spark Streaming需要使用這個IP位址遠端登入到機器上。
開通ECS和日志服務
詳細步驟請參考官方文檔。
其中ilogtail配置如下
日志收集到LogHub以後,可以通過官網控制台檢視。本文示例中收集到的資料如下:
建立Tablestore結果表
詳細開通步驟請參考官方文檔。最終建立出來的表Schema如下:
Count列,作為屬性列,不需要定義在主鍵中。
資料處理流程
1、下載下傳支援資料源需要的jar包
下載下傳位址
link2、進入streaming-sql執行環境
下載下傳完jar包以下,在包所在目錄執行以下指令進入互動式開發環境。
注意:jar包要上傳到叢集機器上,不支援遠端引用oss檔案。
3、建立LogHub資料源表
建立LogHub資料源表之前,需要手動開通日志服務的project和logStore,并将日志資料收集好,具體參考【預備工作】一節中,開通LogHub的官方文檔。
然後再到EMR叢集機器的互動式執行環境中建立LogHub資料源表,示例如下:
USE helloemr;
CREATE TABLE loghub_source(GoodsName string,OperationType string,RequestId string,__time__ timestamp)
USING loghub
OPTIONS (
sls.project = '{your project name}',
sls.store = '{store name}',
access.key.id = '{access-key}',
access.key.secret = '{access-key-secret}',
endpoint = 'http://{your project name}.cn-hangzhou-intranet.log.aliyuncs.com');
上面的sql語句,建立一個database為helloemr,tablename為loghub_source的表,shema中有GoodsName、OperationType、Time、RequestId、__time__五個字段,其中__time__字段是在配置ilogtail的時候解析日志中的Time字段得到的,等同于Time字段。
需要注意的是,Endpoint請使用内網域名,公網域名速度上會慢很多。
4、建立Tablestore結果表
同樣的,建立Tablestore結果表之前,也要先到官網控制台建到tablestore的執行個體和表。
建立EMR的Tablestore結果表,示例如下
USE helloemr;
CREATE TABLE tablestore_sink
USING tablestore
OPTIONS(
endpoint="https://sparkStreaming.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="{access-key}",
access.key.secret="{access-key-secret}",
table.name="tablestore_sink",
instance.name="sparkStreaming",
catalog='{"columns":{"GoodsName":{"col":"GoodsName","type":"string"},"OperationType":{"col":"OperationType","type":"string"},"Time":{"col":"Time","type":"long"},"Count":{"col":"Count","type":"long"}}}');
以上sql代碼,建立了一個表名為tablestore_sink的表,執行個體名是sparkStreaming。注意:Endpoint請使用vpc域名。
建立成功以後,使用desc tablestore_sink; 檢視表結構如下。
5、結果資料寫入到Tablestore
通過GoodsName、OperationType聚合一分鐘内的請求次數。這裡要用到Spark Streaming的滾動視窗函數,取window.start作為聚合後的時間。
SET streaming.query.name=loghub_source;
SET spark.sql.streaming.checkpointLocation.loghub_source=/home/helloemr;
SET spark.sql.streaming.query.trigger.loghub_source=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.loghub_source=10000;
INSERT INTO tablestore_sink
SELECT GoodsName,OperationType,count(*) as Count,to_unix_timestamp(window.start, 'yyyy-MM-dd HH:mm:ss') as Time from loghub_source
where delay(__time__)<"2 minute"
GROUP BY TUMBLING (__time__, interval 1 minute),GoodsName,OperationType;
示例SQL中隻填了幾個必填參數,具體可以參考作業模闆。其中checkpointLocation代表本次流式查詢作業的checkpoint路徑,需要設定一個絕對路徑值。
最終,Tablestore中的結果資料如下:
性能調優
Spark Streaming 是基于Spark的流式處理引擎,其基本原理是把輸入資料以某一時間間隔批量的處理,當批處理間隔縮短到秒級時,便可以用于處理實時資料流。它本質上是微批處理。
使用上一節的示例代碼,實際測試下來,一次作業需要耗費1秒左右的時間。在實際執行個體中,由于源表、目标資料表、資料量大小以及Sql的複雜程度不同,耗費的時間也會不同。
資料讀取
本文使用LogHub為資料源,Shard數量多,Spark Streaming的作業并發度也會多,但需要設定合理的Shard數,具體請參考日志服務分區設定。
同時,建立LogHub資料源表的時候,請使用内網Endpoint。
Spark Streaming作業調優
設定合理的批處理時間
trigger.intervalMs代表批次間隔,機關毫秒,預設為0L。運作任務的時候,當間隔時間比一次任務的運作時間短的時候,任務會列印WARN日志。一般這個值的大小如果能夠使得Streaming作業剛好處理好上一個的批處理的資料,那麼這個就是最優值。
增加作業資源
EMR官網控制台裡面有的監控大盤,可以看到作業占用的資源情況,可以根據實際情況調整配置設定給作業的資源大小。
資料寫入
對于表格存儲(Tablestore)來說,合理的主鍵設計,是提高寫入性能的關鍵因素。具體可以參考表格存儲最佳實踐。一個設計良好的主鍵,需要避免通路壓力集中在一個小範圍的連續的分片鍵上,也就是說避免熱點分片。設計良好的表結構,整張表的通路壓力能夠均勻的分散在各個分片上,這樣才能充分利用後端伺服器的能力。
如果是建立的Tablestore表,而且資料寫入量比較大,最好聯系下@表格存儲技術支援,對表進行預分區。可以提高初始狀态下的寫入性能。