天天看點

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

前言

從EMR-3.21.0 版本開始将提供Spark Streaming SQL的預覽版功能,支援使用SQL來開發流式分析作業。結果資料可以實時寫入Tablestore。

本文以LogHub為資料源,收集ECS上的日志資料,通過Spark Streaming SQL進行聚合後,将流計算結果資料實時寫入Tablestore,展示一個簡單的日志監控場景。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

場景設計

假設有一個商品表Goods,商品資訊開放給使用者浏覽,使用者浏覽完以後會産生以下格式的日志資料:

{
  "RequestId":"c85df119-f6db-449f-89bb-6773d2468f89",
  "Time":2019-07-30 12:05:28,
  "GoodsName":"shoes",
  "OperationType":"query"
}           

我們需要将原始日志資料,根據GoodsName、OperationType和時間,聚合成一分鐘一個點的監控資料。用于監控各商品的通路情況。

最終需要的監控資料格式如下:

GoodsName OperationType Time OperationCount
shoes query 2019-07-30 12:05 5060
2019-07-30 12:06 8001
2019-07-30 12:07 9607

過程如下圖所示,比較簡單。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

技術選型

本文主要來看一下結果資料存放資料庫的選型。對于本文的監控場景,結果資料的量級,取決于商品數量。真實情況下,可能還要增加商品的規格、顔色等監控名額,結果資料量會比較大。此外,對于時間粒度比較小的監控資料,一般都隻需要保留最近的,時間比較久的曆史資料需要删除。

将Tablestore與傳統關系型資料庫MySQL進行對比,Tablestore有以下優勢:

  • 支援海量資料,無縫擴充

    表格存儲通過資料分片和負載均衡技術,實作了無縫擴充。

  • 支援資料自動過期

    資料生命周期(Time To Live,簡稱 TTL)是資料表的一個屬性,即資料的存活時間,機關為秒。表格存儲會在背景對超過存活時間的資料進行清理,以減少使用者的資料存儲空間,降低存儲成本。監控場景下比較适用,不需要手動去删除資料。

預備工作

建立EMR叢集

開通EMR之前,先要對雲賬号進行實名認證,然後建立預設的EMR角色并授予 AliyunEMRDefaultRole和AliyunEmrEcsDefaultRole這兩個角色。

EMR暫時還不支援在官網控制台上配置寫入Tablestore的任務,需要登入到MER叢集的機器上去操作,是以開通EMR叢集的時候,請選擇自定義購買,并在最後一步打開挂載公網、遠端登入、密碼方式登入,自己設定一個密碼,如下圖。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

開通完以後,進入到ECS控制台,會看到有一台Master節點機器有彈性IP。後面步驟中,操作EMR Spark Streaming需要使用這個IP位址遠端登入到機器上。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

開通ECS和日志服務

詳細步驟請參考官方文檔。

其中ilogtail配置如下

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

日志收集到LogHub以後,可以通過官網控制台檢視。本文示例中收集到的資料如下:

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

建立Tablestore結果表

詳細開通步驟請參考官方文檔。最終建立出來的表Schema如下:

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

Count列,作為屬性列,不需要定義在主鍵中。

資料處理流程

1、下載下傳支援資料源需要的jar包

下載下傳位址

https://github.com/aliyun/aliyun-emapreduce-sdk/blob/master-2.x/jars/datasources/latest/emr-datasources_shaded_2.11-1.7.0.jar

2、進入streaming-sql執行環境

下載下傳完jar包以下,在包所在目錄執行以下指令進入互動式開發環境。

streaming-sql --master yarn-client --jars ./emr-datasources_shaded_2.11-1.7.0.jar --driver-class-path ./emr-datasources_shaded_2.11-1.7.0.jar           

注意:jar包要上傳到叢集機器上,不支援遠端引用oss檔案。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

3、建立LogHub資料源表

建立LogHub資料源表之前,需要手動開通日志服務的project和logStore,并将日志資料收集好,具體參考【預備工作】一節中,開通LogHub的官方文檔。

然後再到EMR叢集機器的互動式執行環境中建立LogHub資料源表,示例如下:

CREATE DATABASE IF NOT EXISTS helloemr;
USE helloemr;
CREATE TABLE loghub_source(GoodsName string,OperationType string,RequestId string,__time__ timestamp)
USING loghub
OPTIONS (
sls.project = '{your project name}',
sls.store = '{store name}',
access.key.id = '{access-key}',
access.key.secret = '{access-key-secret}',
endpoint = 'http://{your project name}.cn-hangzhou-intranet.log.aliyuncs.com');           

上面的sql語句,建立一個database為helloemr,tablename為loghub_source的表,shema中有GoodsName、OperationType、Time、RequestId、__time__五個字段,其中__time__字段是在配置ilogtail的時候解析日志中的Time字段得到的,等同于Time字段。

需要注意的是,Endpoint請使用内網域名,公網域名速度上會慢很多。

4、建立Tablestore結果表

同樣的,建立Tablestore結果表之前,也要先到官網控制台建到tablestore的執行個體和表。

建立EMR的Tablestore結果表,示例如下

CREATE DATABASE IF NOT EXISTS helloemr;
USE helloemr;
CREATE TABLE tablestore_sink
USING tablestore
OPTIONS(
endpoint="https://sparkStreaming.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id="{access-key}",
access.key.secret="{access-key-secret}",
table.name="tablestore_sink",
instance.name="sparkStreaming",
catalog='{"columns":{"GoodsName":{"col":"GoodsName","type":"string"},"OperationType":{"col":"OperationType","type":"string"},"Time":{"col":"Time","type":"long"},"Count":{"col":"Count","type":"long"}}}');           

以上sql代碼,建立了一個表名為tablestore_sink的表,執行個體名是sparkStreaming。注意:Endpoint請使用vpc域名。

建立成功以後,使用desc tablestore_sink; 檢視表結構如下。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

5、結果資料寫入到Tablestore

通過GoodsName、OperationType聚合一分鐘内的請求次數。這裡要用到Spark Streaming的滾動視窗函數,取window.start作為聚合後的時間。

USE helloemr;
SET streaming.query.name=loghub_source;
SET spark.sql.streaming.checkpointLocation.loghub_source=/home/helloemr;
SET spark.sql.streaming.query.trigger.loghub_source=ProcessingTime;
SET spark.sql.streaming.query.trigger.intervalMs.loghub_source=10000;
INSERT INTO tablestore_sink
SELECT GoodsName,OperationType,count(*) as Count,to_unix_timestamp(window.start, 'yyyy-MM-dd HH:mm:ss') as Time from loghub_source
where delay(__time__)<"2 minute" 
GROUP BY TUMBLING (__time__, interval 1 minute),GoodsName,OperationType;           

示例SQL中隻填了幾個必填參數,具體可以參考作業模闆。其中checkpointLocation代表本次流式查詢作業的checkpoint路徑,需要設定一個絕對路徑值。

最終,Tablestore中的結果資料如下:

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

性能調優

Spark Streaming 是基于Spark的流式處理引擎,其基本原理是把輸入資料以某一時間間隔批量的處理,當批處理間隔縮短到秒級時,便可以用于處理實時資料流。它本質上是微批處理。

使用上一節的示例代碼,實際測試下來,一次作業需要耗費1秒左右的時間。在實際執行個體中,由于源表、目标資料表、資料量大小以及Sql的複雜程度不同,耗費的時間也會不同。

資料讀取

本文使用LogHub為資料源,Shard數量多,Spark Streaming的作業并發度也會多,但需要設定合理的Shard數,具體請參考日志服務分區設定。

同時,建立LogHub資料源表的時候,請使用内網Endpoint。

Spark Streaming作業調優

設定合理的批處理時間

trigger.intervalMs代表批次間隔,機關毫秒,預設為0L。運作任務的時候,當間隔時間比一次任務的運作時間短的時候,任務會列印WARN日志。一般這個值的大小如果能夠使得Streaming作業剛好處理好上一個的批處理的資料,那麼這個就是最優值。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

增加作業資源

EMR官網控制台裡面有的監控大盤,可以看到作業占用的資源情況,可以根據實際情況調整配置設定給作業的資源大小。

資料寫入

對于表格存儲(Tablestore)來說,合理的主鍵設計,是提高寫入性能的關鍵因素。具體可以參考表格存儲最佳實踐。一個設計良好的主鍵,需要避免通路壓力集中在一個小範圍的連續的分片鍵上,也就是說避免熱點分片。設計良好的表結構,整張表的通路壓力能夠均勻的分散在各個分片上,這樣才能充分利用後端伺服器的能力。

如果是建立的Tablestore表,而且資料寫入量比較大,最好聯系下@表格存儲技術支援,對表進行預分區。可以提高初始狀态下的寫入性能。

歡迎加入

表格存儲(Tablestore)推出了很多貼近使用者場景的

文章

示例代碼

,歡迎大家加入我們的釘釘公開交流群一起讨論,群号:11789671。

海量監控日志基于EMR Spark Streaming SQL進行實時聚合前言場景設計技術選型預備工作資料處理流程性能調優歡迎加入

繼續閱讀