很多場景中都會應用到模式比對如:使用者異常行為實時監測、銀行卡異地監控、下單未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句進行複雜事件處理,我們先看個FlinkSQL中如何識别
- 在FlinkSQL client中建立一個測試表Ticket 其schema 如下
Ticket
|-- symbol: String # 股票的代号
|-- price: Long # 股票的價格
|-- tax: Long # 股票應納稅額
|-- rowtime: TimeIndicatorTypeInfo(rowtime) # 更改這些值的時間點
Flink SQL> CREATE TABLE Ticket (
> symbol string,
> price int,
> tax int,
> rowtime TIMESTAMP(3),
> WATERMARK FOR rowtime AS rowtime --模式比對必須要有水位線
> ) WITH (
> 'connector' = 'filesystem',
> 'format' = 'csv',
> 'path' = 'file:///mnt/ps/SAS/BigData/file/ticket.csv'
> );
[INFO] Execute statement succeed.
Flink SQL>
- 為了簡化,我們隻考慮單個股票
的傳入資料。其中的行是連續追加的。查詢資料如下ACME
symbol price tax rowtime
ACME 12 1 2021-09-01 09:00:00.000
ACME 17 2 2021-09-01 09:00:01.000
ACME 19 1 2021-09-01 09:00:02.000
ACME 21 3 2021-09-01 09:00:03.000
ACME 25 2 2021-09-01 09:00:04.000
ACME 18 1 2021-09-01 09:00:05.000
ACME 15 1 2021-09-01 09:00:06.000
ACME 14 2 2021-09-01 09:00:07.000
ACME 24 2 2021-09-01 09:00:08.000
ACME 25 2 2021-09-01 09:00:09.000
ACME 19 1 2021-09-01 09:00:10.000
- 現在的任務是找出一個單一股票價格不斷下降的時期
SELECT
*
FROM Ticket
MATCH_RECOGNIZE ( --隻能用于追加表
PARTITION BY symbol --按symbol分組,相同資料會在一個節點進行計算
ORDER BY rowtime --同一組下按事件時間進行排序
MEASURES --定義輸出
START_ROW.rowtime AS start_tstamp,
LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
LAST(PRICE_UP.rowtime) AS end_tstamp
ONE ROW PER MATCH --比對成功輸出一條
AFTER MATCH SKIP TO LAST PRICE_UP --從比對成功的事件序列中最後一個對應價格上升的事件開始比對下一次
PATTERN (START_ROW PRICE_DOWN+ PRICE_UP) --定義3個事件:開始行 價格下降 價格回升(+号代表一個或多個資料)
DEFINE --定義事件的具體含義
PRICE_DOWN AS --上一條價格下降事件的價格為空并且下降事件的價格小于開始行的價格或者下降事件的價格小于上一條的價格
(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
PRICE_UP AS --價格回升事件的價格大于上一條價格下降價格
PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
) MR;
--結果如下 從2021-09-01 09:00:04.000開始下降,直到2021-09-01 09:00:08.00回漲
symbol start_tstamp bottom_tstamp end_tstamp
ACME 2021-09-01 09:00:04.000 2021-09-01 09:00:07.000 2021-09-01 09:00:08.000
- 離線SQL如何去實作呢
- 先生成示例資料源
with tb1 as (
select
symbol,
price,
tax,
rowtime
from values('ACME',12,1,'2021-09-01 09:00:00'),
('ACME',17,2,'2021-09-01 09:00:01'),
('ACME',19,1,'2021-09-01 09:00:02'),
('ACME',21,3,'2021-09-01 09:00:03'),
('ACME',25,2,'2021-09-01 09:00:04'),
('ACME',18,1,'2021-09-01 09:00:05'),
('ACME',15,1,'2021-09-01 09:00:06'),
('ACME',14,2,'2021-09-01 09:00:07'),
('ACME',24,2,'2021-09-01 09:00:08'),
('ACME',25,2,'2021-09-01 09:00:09'),
('ACME',19,1,'2021-09-01 09:00:10')
t(symbol,price,tax,rowtime)
)
- 通過計算上一條和下一條的股票價格差判斷是否連續下降
tb2 as (
select
symbol,
price,
tax,
rowtime,
price-lag(price,1,price) over(partition by symbol order by rowtime) lag_price_diff,
lead(price,1,price) over(partition by symbol order by rowtime)-price lead_price_diff,
lead(rowtime,1,rowtime) over(partition by symbol order by rowtime) as lead_rowtime
from tb1
)
--結果展示如下
symbol price tax rowtime lag_price_diff lead_price_diff
ACME 12 1 2021-09-01 09:00:00 0 5
ACME 17 2 2021-09-01 09:00:01 5 2
ACME 19 1 2021-09-01 09:00:02 2 2
ACME 21 3 2021-09-01 09:00:03 2 4
ACME 25 2 2021-09-01 09:00:04 4 -7
ACME 18 1 2021-09-01 09:00:05 -7 -3
ACME 15 1 2021-09-01 09:00:06 -3 -1
ACME 14 2 2021-09-01 09:00:07 -1 10
ACME 24 2 2021-09-01 09:00:08 10 1
ACME 25 2 2021-09-01 09:00:09 1 -6
ACME 19 1 2021-09-01 09:00:10 -6 0
- 內插補點為負值即價格下降,根據此進行劃分标簽
tb3 as (
select
symbol,
price,
tax,
rowtime,
lag_price_diff,
lead_price_diff,
lead_rowtime,
sum(if(lag_price_diff>0,1,0)) over(partition by symbol order by rowtime) flag
from tb2
where lag_price_diff<0 or lead_price_diff < 0
)
--結果如下
symbol price tax rowtime lag_price_diff lead_price_diff lead_rowtime flag
ACME 25 2 2021-09-01 09:00:04 4 -7 2021-09-01 09:00:05 1
ACME 18 1 2021-09-01 09:00:05 -7 -3 2021-09-01 09:00:06 1
ACME 15 1 2021-09-01 09:00:06 -3 -1 2021-09-01 09:00:07 1
ACME 14 2 2021-09-01 09:00:07 -1 10 2021-09-01 09:00:08 1
ACME 25 2 2021-09-01 09:00:09 1 -6 2021-09-01 09:00:10 2
ACME 19 1 2021-09-01 09:00:10 -6 0 2021-09-01 09:00:10 2
- 标簽flag為2的不符合連續下降,隻有一條內插補點位置,不是連續下降,需要進行過濾
tb4 as (
select
symbol,
price,
tax,
rowtime,
lead_rowtime,
flag,
sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct
from tb3
)
--結果如下
symbol price tax rowtime lead_rowtime flag ct
ACME 25 2 2021-09-01 09:00:04 2021-09-01 09:00:05 1 3
ACME 18 1 2021-09-01 09:00:05 2021-09-01 09:00:06 1 3
ACME 15 1 2021-09-01 09:00:06 2021-09-01 09:00:07 1 3
ACME 14 2 2021-09-01 09:00:07 2021-09-01 09:00:08 1 3
ACME 25 2 2021-09-01 09:00:09 2021-09-01 09:00:10 2 1
ACME 19 1 2021-09-01 09:00:10 2021-09-01 09:00:10 2 1
- 根據真實情況即連續的定義對資料進行過濾,統計結果
select
symbol,
--flag,
min(rowtime) start_tstamp,
max(rowtime) bottom_tstamp,
max(lead_rowtime) end_tstamp
from tb4
where ct > 1
group by symbol,flag;
--結果如下
symbol start_tstamp bottom_tstamp end_tstamp
ACME 2021-09-01 09:00:04 2021-09-01 09:00:07 2021-09-01 09:00:08
- 下面我們在看一個電商中的場景,使用者浏覽商品後會進行下單,下單後有可能會進行支付,我們需要分析某日某商品進行浏覽、收藏、下單、支付的使用者
- 先生成簡單的示例資料
with tb1 as (
select
user_id,
shop_id,
user_behav,
op_time,
substr(op_time,1,10) dt
from values('1001','A1','浏覽','2021-09-01 17:03:01'),
('1001','A1','收藏','2021-09-01 17:04:12'),
('1001','A2','浏覽','2021-09-01 17:02:02'),
('1001','A2','收藏','2021-09-01 17:03:42'),
('1001','A2','下單','2021-09-01 17:06:25'),
('1002','A1','浏覽','2021-09-01 17:00:32'),
('1002','A1','收藏','2021-09-01 17:03:12'),
('1002','A1','浏覽','2021-09-01 17:03:45'),
('1002','A1','下單','2021-09-01 17:05:41'),
('1002','A1','支付','2021-09-01 17:06:26'),
('1003','A1','浏覽','2021-09-01 17:08:13'),
('1003','A1','浏覽','2021-09-01 17:09:14')
t(user_id,shop_id,user_behav,op_time)
)
- 我們隻看A1店鋪的資料,使用collect_list或者wm_concat(Maxcomputer内置函數,Hive中是concat_wm)進行彙總使用者的行為
tb2 as (
select
dt,
user_id,
-- collect_list(user_behav)
wm_concat(",",user_behav) behavs
from tb1
where shop_id = 'A1'
group by dt,user_id
)
--展示結果如下
dt user_id behavs
2021-09-01 1001 浏覽,收藏
2021-09-01 1002 浏覽,收藏,浏覽,下單,支付
2021-09-01 1003 浏覽,浏覽
- 比對規則使用like,這裡需要下單之後的行為為支付
select
dt,
user_id,
behavs
from tb2
where behavs like '%浏覽%收藏%下單_支付%';
--結果如下
dt user_id behavs
2021-09-01 1002 浏覽,收藏,浏覽,下單,支付
使用離線SQL分析分析比對,主要是按次元把所有行為路徑進行彙總拼接,然後使用字元比對或者複雜的使用正則比對,實際業務分析過程中,如有類似需求,可以參考上述方式。如有更好的方式,歡迎探讨。
拜了個拜