MATCH_RECOGNIZE用于從輸入流中識别符合指定規則的事件,并按照指定的方式輸出。
-
SELECT [ ALL | DISTINCT ]
-
{ * | projectItem [, projectItem ]* }
-
FROM tableExpression
-
[MATCH_RECOGNIZE (
-
[PARTITION BY {partitionItem [, partitionItem]*}]
-
[ORDER BY {orderItem [, orderItem]*}]
-
[MEASURES {measureItem AS col [, measureItem AS col]*}]
-
[ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
-
[AFTER MATCH SKIP]
-
PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN intervalExpression
-
DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS patternDefinationExpression]*}
-
)];
參數說明
-
指定分區的列,可選項。PARTITION BY
-
可以指定多列,但是必須以event time列或者process time列作為排序的首列,可選項。ORDER BY
-
定義如何根據比對成功的輸入事件構造輸出事件。MEASURES
-
對于每一次成功的比對,隻會産生一個輸出事件。ONE ROW PER MATCH
-
除了比對成功的時候産生輸出外,逾時的時候也會産生輸出。逾時時間由ONE ROW PER MATCH WITH TIMEOUT ROWS
語句中的PATTERN
語句定義。WITHIN
-
對于每一次成功的比對,對應于每一個輸入事件,都會産生一個輸出事件。ALL ROW PER MATCH
-
ALL ROW PER MATCH WITH TIMEOUT ROWS
PATTERN
WITHIN
-
為可選項,預設為[ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
。ONE ROW PER MATCH
-
比對成功之後,從比對成功的事件序列中的第一個事件的下一個事件開始進行下一次比對。AFTER MATCH SKIP TO NEXT ROW
-
比對成功之後,從比對成功的事件序列中的最後一個事件的下一個事件開始進行下一次比對。AFTER MATCH SKIP PAST LAST ROW
-
比對成功之後,從比對成功的事件序列中第一個對應于patternItem的事件開始下一次比對。AFTER MATCH SKIP TO FIRST patternItem
-
比對成功之後,從比對成功的事件序列中最後一個對應于patternItem的事件開始下一次比對。AFTER MATCH SKIP TO LAST patternItem
-
定義待識别的事件序列需要滿足的規則,需要定義在()中,由一系列自定義的patternVariable構成。PATTERN
說明:
- patternVariable之間若以空格間隔,表示符合這兩種patternVariable的事件中間不存在其他事件。
- patternVariable之間若以
間隔,表示符合這兩種patternVariable的事件之間可以存在其它事件。->
Quantifier
quantifier
用于指定符合patternVariable定義的事件的出現次數。
參數 | 參數意義 |
---|---|
* | 0次或多次 |
+ | 1次或多次 |
? | 0次或1次 |
{n} | n次 |
{n,} | 大于等于n次 |
{n, m} | 大于等于n次,小于等于m次 |
{,m} | 小于等于m次 |
預設為貪婪比對。比如對于
pattern: A -> B+
,輸入:
a b1, b2, b3
,輸出為:
a b1, a b1 b2, a b1 b2 b3
。可以在quantifier符号後面加
?
來表示非貪婪比對。
- *?
- +?
- {n}?
- {n,}?
- {n, m}?
- {,m}?
此時對于上面例子中的pattern及輸入,産生的輸出為:
a b1, a b2, a b1 b2, a b3, a b2 b3, a b1 b2 b3
注意:
- WITHIN 定義符合規則的事件序列的最大時間跨度。
靜态視窗
格式:
示例:
INTERVAL ‘string’ timeUnit [ TO timeUnit ]
INTERVAL ‘10’ SECOND, INTERVAL ‘45’ DAY, INTERVAL ‘10:20’ MINUTE TO SECOND, INTERVAL ‘10:20.10’ MINUTE TO SECOND, INTERVAL ‘10:20’ HOUR TO MINUTE, INTERVAL ‘1-5’ YEAR TO MONTH
動态視窗
格式:
示例:
INTERVAL intervalExpression
INTERVAL A.windowTime + 10
,其中A為pattern定義中第一個patternVariable。
在intervalExpression的定義中,可以使用pattern定義中出現過的patternVariable。目前隻能使用第一個patternVariable。intervalExpression中可以使用UDF,intervalExpression的結果必須為long,機關為millisecond, 表示視窗的大小。
- DEFINE 定義在PATTERN中出現的patternVariable的具體含義,若某個patternVariable在DEFINE中沒有定義,則認為對于每一個事件,該patternVariable都成立。
在MEASURES和DEFINE語句中,可以使用如下函數。
函數 | 函數意義 |
---|---|
Row Pattern Column References | 形式為: patternVariable.col。表示通路patternVariable所對應的事件的指定的列。 |
PREV | 隻能用在DEFINE語句中,一般與 合用。用于通路指定的pattern所對應的事件之前偏移指定的offset所對應的事件的指定的列。 示例:對于 ,PREV(A.price)表示目前事件的前一個事件的price列的值。注意,DOWN.price等價于PREV(DOWN.price, 0)。 PREV(DOWN.price)等價于PREV(DOWN.price, 1)。 |
FIRST、LAST | 一般與 合用,用于通路指定的pattern所對應的事件序列中的指定偏移位置的事件。 示例:FIRST(A.price, 3)表示pattern A所對應的事件序列中的第3個事件。LAST(A.price, 3)表示pattern A所對應的事件序列中的倒數第3個事件。 |
輸出列:
輸出列 | |
---|---|
ONE ROW PER MATCH | 包括 partition by中指定的列及measures中定義的列。 對于partition by中已經指定的列,在measures中無需重複定義。 |
ONE ROW PER MATCH WITH TIMEOUT ROWS | 除比對成功的時候産生輸出外,逾時的時候也會産生輸出,逾時時間由PATTERN語句中的WITHIN語句定義。 |
注意:
- 定義pattern的時候,最好也定義WITHIN,否則可能會造成state越來越大。
- order by中定義的首列必須為event time列或者process time列。
-
SELECT *
-
FROM Ticker MATCH_RECOGNIZE (
-
PARTITION BY symbol
-
ORDER BY tstamp
-
MEASURES STRT.tstamp AS start_tstamp,
-
LAST(DOWN.tstamp) AS bottom_tstamp,
-
LAST(UP.tstamp) AS end_tstamp
-
ONE ROW PER MATCH
-
AFTER MATCH SKIP TO NEXT ROW
-
PATTERN (STRT DOWN+ UP+) WITHIN INTERVAL '10' SECOND
-
DEFINE
-
DOWN AS DOWN.price < PREV(DOWN.price),
-
UP AS UP.price > PREV(UP.price)
-
) MR
-
ORDER BY MR.symbol, MR.start_tstamp;
測試資料
timestamp(TIMESTAMP) | card_id(VARCHAR) | location(VARCHAR) | action(VARCHAR) |
---|---|---|---|
2018-04-13 12:00:00 | 1 | WW | Tom |
2018-04-13 12:05:00 | WW1 | ||
2018-04-13 12:10:00 | WW2 | ||
2018-04-13 12:20:00 |
測試案例
-
CREATE TABLE datahub_stream (
-
`timestamp` TIMESTAMP,
-
card_id VARCHAR,
-
location VARCHAR,
-
`action` VARCHAR,
-
WATERMARK wf FOR `timestamp` AS withOffset(`timestamp`, 1000)
-
) WITH (
-
type = 'datahub'
-
...
-
);
-
CREATE TABLE rds_out (
-
start_timestamp TIMESTAMP,
-
end_timestamp TIMESTAMP,
-
card_id VARCHAR,
-
event VARCHAR
-
) WITH (
-
type= 'rds'
-
...
-
);
-
--案例描述
-
-- 當相同的card_id在十分鐘内,從兩個不同的location發生刷卡現象,就會觸發報警機制,以便于監測信用卡盜刷等現象
-
-- 定義計算邏輯
-
insert into rds_out
-
select
-
`start_timestamp`,
-
`end_timestamp`,
-
card_id, `event`
-
from datahub_stream
-
MATCH_RECOGNIZE (
-
PARTITION BY card_id -- 按card_id分區,将相同卡号的資料分到同一個計算節點上。
-
ORDER BY `timestamp` -- 在視窗内,對事件時間進行排序。
-
MEASURES --定義如何根據比對成功的輸入事件構造輸出事件。
-
e2.`action` as `event`,
-
e1.`timestamp` as `start_timestamp`, --第一次的事件時間為start_timestamp。
-
LAST(e2.`timestamp`) as `end_timestamp`--最新的事件時間為end_timestamp。
-
ONE ROW PER MATCH --比對成功輸出一條。
-
AFTER MATCH SKIP TO NEXT ROW--比對後跳轉到下一行。
-
PATTERN (e1 e2+) WITHIN INTERVAL '10' MINUTE -- 定義兩個事件,e1和e2。
-
DEFINE --定義在PATTERN中出現的patternVariable的具體含義。
-
e1 as e1.action = 'Tom', --事件一的action标記為Tom。
-
e2 as e2.action = 'Tom' and e2.location <> e1.location --事件二的action标記為Tom,且事件一和事件二的location不一緻。
-
);
測試結果
start_timestamp(TIMESTAMP) | end_timestamp(TIMESTAMP) | event(VARCHAR) | |
---|---|---|---|
2018-04-13 20:00:00.0 | 2018-04-13 20:05:00.0 | ||
2018-04-13 20:10:00.0 |
本文轉自實時計算——
複雜事件處理(CEP)語句