维表是一张不断变化的表(不更新的表是变化表的一种特例)。
如何查询或JOIN一张不断变化的表? 如果用传统的JOIN语法来表达
JOIN dim_table ON xxx
,会导致多次运行得到的结果不一致。所以在查询或JOIN维表的时候,需要明确指名要查看的是维表的哪个时刻的快照。因此,需要引入 SQL:2011 的Temporal Table语义。
维表DDL
Flink SQL中没有专门为维表设计的DDL语法,使用标准的
CREATE TABLE语法即可,同时需额外增加一行
PERIOD FOR SYSTEM_TIME
的声明。这行声明定义了维表的变化周期,即表明该表是一张会变化的表。
维表DDL示例
-
CREATE TABLE white_list (
-
id varchar,
-
name varchar,
-
age int,
-
PRIMARY KEY (id), -- 用作维表的话,必须有声明的主键
-
PERIOD FOR SYSTEM_TIME
-
) with (
-
type = 'xxx',
-
...
-
)
维表具体声明方式和参数请参见
维表创建。
注意:声明一个维表时,必须指明主键。维表JOIN时,ON的条件必须包含所有主键的等值条件。
维表JOIN
维表是一张不断变化的表,因此在JOIN维表的时候,需指明这条记录关联维表哪个时刻的快照。目前仅支持关联当前时刻的维表(未来会支持关联左表rowtime所对应的维表快照)。
维表JOIN语法
-
SELECT column-names
-
FROM table1 [AS <alias1>]
-
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
-
ON table1.column-name1 = table2.key-name1
- 维表支持
和INNER JOIN
,不支持LEFT JOIN
或RIGHT JOIN
FULL JOIN
- JOIN维表时,在维表后必须加上
FOR SYSTEM_TIME AS OF PROCTIME()
说明:
含义是:JOIN维表当前时刻所看到的每条数据。FOR SYSTEM_TIME AS OF PROCTIME()
- 维表JOIN工作模式是左表的一条记录到达时,在维表中查询并关联上匹配的数据。如果维表插入了一条数据能匹配上之前左表的数据时,JOIN的结果流不会发出更新的数据以弥补之前的未匹配。
- JOIN行为只发生在处理时间(processing time),即使维表中的数据都被删了,之前JOIN流已经发出的关联上的数据也不会被撤回或改变。
例如,事件流JOIN白名单维表的SQL如下。
-
SELECT e.*, w.*
-
FROM event AS e
-
JOIN white_list FOR SYSTEM_TIME AS OF PROCTIME() AS w
-
ON e.id = w.id
注意:
- 维表JOIN的ON条件中一定要有包括维表primary key的等值条件,因为需要根据key来查维表。ON条件中可以有其他条件,例如
,其他条件需要包含所有维表主键的等值条件。
ON event.id = white_list.id AND event.name = white_list.name
- 维表和维表不能做JOIN。DDL声明的字段、主键等需要和真实表里面定义的一致。
维表示例
测试数据
nameinfo:
id(bigint) | name(VARCHAR) | age(bigint) |
---|---|---|
1 | lilei | 22 |
2 | hanmeimei | 20 |
3 | libai | 28 |
phoneNumber:
phoneNumber(bigint) | |
---|---|
dufu | 18867889855 |
baijuyi | 18867889856 |
18867889857 | |
18867889858 |
测试语句
-
CREATE TABLE datahub_input1 (
-
id BIGINT,
-
name VARCHAR,
-
age BIGINT
-
) WITH (
-
type='datahub'
-
);
-
create table phoneNumber(
-
name VARCHAR,
-
phoneNumber bigint,
-
primary key(name),
-
PERIOD FOR SYSTEM_TIME
-
)with(
-
type='rds'
-
);
-
CREATE table result_infor(
-
id bigint,
-
phoneNumber bigint,
-
name VARCHAR
-
)with(
-
type='rds'
-
);
-
INSERT INTO result_infor
-
SELECT
-
t.id
-
,w.phoneNumber
-
,t.name
-
FROM datahub_input1 as t
-
JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
-
ON t.name = w.name;
测试结果
name(varchar) | ||
---|---|---|
本文转自实时计算——
维表JOIN语句