天天看點

模式比對在SQL中應用

很多場景中都會應用到模式比對如:使用者異常行為實時監測、銀行卡異地監控、下單未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句進行複雜事件處理,我們先看個FlinkSQL中如何識别

  1. 在FlinkSQL client中建立一個測試表Ticket 其schema 如下
Ticket
     |-- symbol: String                           # 股票的代号
     |-- price: Long                              # 股票的價格
     |-- tax: Long                                # 股票應納稅額
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # 更改這些值的時間點      
Flink SQL> CREATE TABLE Ticket (
>             symbol string,
>             price int,
>             tax int,
>             rowtime TIMESTAMP(3),
>             WATERMARK FOR rowtime AS rowtime --模式比對必須要有水位線
>          ) WITH (
>           'connector' = 'filesystem',
>           'format' = 'csv',
>           'path' = 'file:///mnt/ps/SAS/BigData/file/ticket.csv'
> );
[INFO] Execute statement succeed.

Flink SQL>       
  1. 為了簡化,我們隻考慮單個股票 

    ACME

     的傳入資料。其中的行是連續追加的。查詢資料如下
symbol                          price                            tax                        rowtime
ACME                             12                              1        2021-09-01 09:00:00.000
ACME                             17                              2        2021-09-01 09:00:01.000
ACME                             19                              1        2021-09-01 09:00:02.000
ACME                             21                              3        2021-09-01 09:00:03.000
ACME                             25                              2        2021-09-01 09:00:04.000
ACME                             18                              1        2021-09-01 09:00:05.000
ACME                             15                              1        2021-09-01 09:00:06.000
ACME                             14                              2        2021-09-01 09:00:07.000
ACME                             24                              2        2021-09-01 09:00:08.000
ACME                             25                              2        2021-09-01 09:00:09.000
ACME                             19                              1        2021-09-01 09:00:10.000      
  1. 現在的任務是找出一個單一股票價格不斷下降的時期
SELECT 
    *
FROM Ticket
    MATCH_RECOGNIZE (   --隻能用于追加表
        PARTITION BY symbol --按symbol分組,相同資料會在一個節點進行計算
        ORDER BY rowtime    --同一組下按事件時間進行排序
        MEASURES    --定義輸出
            START_ROW.rowtime AS start_tstamp,
            LAST(PRICE_DOWN.rowtime) AS bottom_tstamp,
            LAST(PRICE_UP.rowtime) AS end_tstamp
        ONE ROW PER MATCH   --比對成功輸出一條
        AFTER MATCH SKIP TO LAST PRICE_UP   --從比對成功的事件序列中最後一個對應價格上升的事件開始比對下一次
        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)    --定義3個事件:開始行 價格下降 價格回升(+号代表一個或多個資料)
        DEFINE  --定義事件的具體含義
            PRICE_DOWN AS   --上一條價格下降事件的價格為空并且下降事件的價格小于開始行的價格或者下降事件的價格小于上一條的價格
                (LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR
                    PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1),
            PRICE_UP AS --價格回升事件的價格大于上一條價格下降價格
                PRICE_UP.price > LAST(PRICE_DOWN.price, 1)
    ) MR;
--結果如下 從2021-09-01 09:00:04.000開始下降,直到2021-09-01 09:00:08.00回漲
symbol      start_tstamp                  bottom_tstamp                     end_tstamp
ACME        2021-09-01 09:00:04.000        2021-09-01 09:00:07.000        2021-09-01 09:00:08.000      
  • 離線SQL如何去實作呢
  1. 先生成示例資料源
with tb1 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime
    from values('ACME',12,1,'2021-09-01 09:00:00'),
               ('ACME',17,2,'2021-09-01 09:00:01'),
               ('ACME',19,1,'2021-09-01 09:00:02'),
               ('ACME',21,3,'2021-09-01 09:00:03'),
               ('ACME',25,2,'2021-09-01 09:00:04'),
               ('ACME',18,1,'2021-09-01 09:00:05'),
               ('ACME',15,1,'2021-09-01 09:00:06'),
               ('ACME',14,2,'2021-09-01 09:00:07'),
               ('ACME',24,2,'2021-09-01 09:00:08'),
               ('ACME',25,2,'2021-09-01 09:00:09'),
               ('ACME',19,1,'2021-09-01 09:00:10')
               t(symbol,price,tax,rowtime)
)      
  1. 通過計算上一條和下一條的股票價格差判斷是否連續下降
tb2 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime,
        price-lag(price,1,price) over(partition by symbol order by rowtime) lag_price_diff,
        lead(price,1,price) over(partition by symbol order by rowtime)-price lead_price_diff,
        lead(rowtime,1,rowtime) over(partition by symbol order by rowtime) as lead_rowtime
    from tb1
)
--結果展示如下
symbol  price   tax rowtime lag_price_diff  lead_price_diff
ACME        12      1       2021-09-01 09:00:00 0       5
ACME        17      2       2021-09-01 09:00:01 5       2
ACME        19      1       2021-09-01 09:00:02 2       2
ACME        21      3       2021-09-01 09:00:03 2       4
ACME        25      2       2021-09-01 09:00:04 4       -7
ACME        18      1       2021-09-01 09:00:05 -7  -3
ACME        15      1       2021-09-01 09:00:06 -3  -1
ACME        14      2       2021-09-01 09:00:07 -1  10
ACME        24      2       2021-09-01 09:00:08 10  1
ACME        25      2       2021-09-01 09:00:09 1       -6
ACME        19      1       2021-09-01 09:00:10 -6  0
      
  1. 內插補點為負值即價格下降,根據此進行劃分标簽
tb3 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime,
        lag_price_diff,
        lead_price_diff,
        lead_rowtime,
        sum(if(lag_price_diff>0,1,0)) over(partition by symbol order by rowtime) flag
    from tb2
    where lag_price_diff<0 or lead_price_diff < 0
)
--結果如下
symbol  price   tax rowtime lag_price_diff  lead_price_diff lead_rowtime    flag
ACME    25  2   2021-09-01 09:00:04 4     -7    2021-09-01 09:00:05 1
ACME    18  1   2021-09-01 09:00:05 -7  -3  2021-09-01 09:00:06 1
ACME    15  1   2021-09-01 09:00:06 -3  -1  2021-09-01 09:00:07 1
ACME    14  2   2021-09-01 09:00:07 -1  10  2021-09-01 09:00:08 1
ACME    25  2   2021-09-01 09:00:09 1     -6    2021-09-01 09:00:10 2
ACME    19  1   2021-09-01 09:00:10 -6  0     2021-09-01 09:00:10   2      
  1. 标簽flag為2的不符合連續下降,隻有一條內插補點位置,不是連續下降,需要進行過濾
tb4 as (
    select 
        symbol, 
        price,
        tax, 
        rowtime,
        lead_rowtime,
        flag,
        sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct
    from tb3
)
--結果如下
symbol  price   tax rowtime lead_rowtime    flag    ct
ACME    25  2   2021-09-01 09:00:04 2021-09-01 09:00:05 1   3
ACME    18  1   2021-09-01 09:00:05 2021-09-01 09:00:06 1   3
ACME    15  1   2021-09-01 09:00:06 2021-09-01 09:00:07 1   3
ACME    14  2   2021-09-01 09:00:07 2021-09-01 09:00:08 1   3
ACME    25  2   2021-09-01 09:00:09 2021-09-01 09:00:10 2   1
ACME    19  1   2021-09-01 09:00:10 2021-09-01 09:00:10 2   1      
  1. 根據真實情況即連續的定義對資料進行過濾,統計結果
select 
    symbol,
    --flag,
    min(rowtime) start_tstamp,
    max(rowtime) bottom_tstamp,
    max(lead_rowtime) end_tstamp
from tb4
where ct > 1
group by symbol,flag;
--結果如下
symbol  start_tstamp                bottom_tstamp               end_tstamp
ACME        2021-09-01 09:00:04 2021-09-01 09:00:07 2021-09-01 09:00:08
      
  • 下面我們在看一個電商中的場景,使用者浏覽商品後會進行下單,下單後有可能會進行支付,我們需要分析某日某商品進行浏覽、收藏、下單、支付的使用者
  1. 先生成簡單的示例資料
with tb1 as (
    select 
        user_id,
        shop_id,
        user_behav,
        op_time,
        substr(op_time,1,10) dt
    from values('1001','A1','浏覽','2021-09-01 17:03:01'),
               ('1001','A1','收藏','2021-09-01 17:04:12'),
               ('1001','A2','浏覽','2021-09-01 17:02:02'),
               ('1001','A2','收藏','2021-09-01 17:03:42'),
               ('1001','A2','下單','2021-09-01 17:06:25'),
               ('1002','A1','浏覽','2021-09-01 17:00:32'),
               ('1002','A1','收藏','2021-09-01 17:03:12'),
               ('1002','A1','浏覽','2021-09-01 17:03:45'),
               ('1002','A1','下單','2021-09-01 17:05:41'),
               ('1002','A1','支付','2021-09-01 17:06:26'),
               ('1003','A1','浏覽','2021-09-01 17:08:13'),
               ('1003','A1','浏覽','2021-09-01 17:09:14')
               t(user_id,shop_id,user_behav,op_time)       
)      
  1. 我們隻看A1店鋪的資料,使用collect_list或者wm_concat(Maxcomputer内置函數,Hive中是concat_wm)進行彙總使用者的行為
tb2 as (
    select 
        dt,
        user_id,
        -- collect_list(user_behav)
        wm_concat(",",user_behav) behavs
    from tb1
    where shop_id = 'A1'
    group by dt,user_id
)
--展示結果如下
dt  user_id behavs
2021-09-01  1001    浏覽,收藏
2021-09-01  1002    浏覽,收藏,浏覽,下單,支付
2021-09-01  1003    浏覽,浏覽      
  1. 比對規則使用like,這裡需要下單之後的行為為支付
select 
    dt,
    user_id,
    behavs
from tb2
where behavs like '%浏覽%收藏%下單_支付%';
--結果如下
dt  user_id behavs
2021-09-01  1002    浏覽,收藏,浏覽,下單,支付      

使用離線SQL分析分析比對,主要是按次元把所有行為路徑進行彙總拼接,然後使用字元比對或者複雜的使用正則比對,實際業務分析過程中,如有類似需求,可以參考上述方式。如有更好的方式,歡迎探讨。

拜了個拜