由于 Continuous Views 是随着時間連續地和增量地更新的,是以在更新 Continuous Views 的結果時,PipelineDB有能力考慮目前時間。包含WHERE子句和與目前時間相關的時間元件的查詢稱為sliding-window 查詢。滑動WHERE子句篩選或接受的事件集會随着時間不斷變化。
sliding WHERE子句 兩個重要組成部分:
clock_timestamp ( ):一個内置函數,它總是傳回目前的時間戳。
arrival_timestamp:所有傳入事件的一個特殊屬性,包含PipelineDB接收它們的時間,就像描述在Arrival Ordering的一樣。
但是,沒有必要顯式地添加引用這些值的WHERE子句。PipelineDB在内部執行此操作,并且隻需要在Continuous Views 的定義中指定sw存儲參數。
盡管sliding windows 是SQL資料庫的新概念,但是PipelineDB沒有使用任何新的或專有的視窗文法。相反,PipelineDB 使用标準PostgreSQL 9.5文法。
PipelineDB允許使用者在定義滑動視窗連續視圖時手動構造滑動視窗WHERE子句。
這個連續視圖上的SELECT結果隻包含最後一分鐘内看到的特定使用者。也就是說,重複的SELECT将包含不同的行,即使連續視圖沒有顯式更新。
每次計算clock_timestamp() - interval '1 minute' 時,它将傳回與過去1分鐘相對應的時間戳。如果給定事件的arrival_timestamp 在過去大于1分鐘,則添加arrival_timestamp 和 > 意味着該謂詞将計算為true。由于每次讀取新事件時都會評估謂詞,是以這有效地給了我們一個1分鐘寬的 sliding window 。
實驗:
CREATE FOREIGN TABLE stream (x integer, y integer) SERVER pipelinedb;
CREATE VIEW v_sw_test_01 WITH (sw = '1 minute') AS SELECT user_id::integer FROM stream;
寫一個python腳本,向stream中插入資料。
import psycopg2
conn = psycopg2.connect("host=localhost user=mytest")
for i in xrange(51,100):
SQL = "INSERT INTO stream VALUES (%s,2);" % i
cur = conn.cursor()
cur.execute(SQL)
conn.commit()
time.sleep(23)
邊執行腳本,邊檢視v_sw_test_01視圖中的資料,發現目前存在的資料,隻是最近一分鐘插入的資料。
mytest=# select * from recent_users;
x
----
77
78
79
(3 rows)
過一會兒
mytest=# select * from recent_users;
x
----
78
79
(2 rows)
再過一會兒
mytest=# select * from recent_users;
x
----
78
79
80
(3 rows)
實際上,在内部,PipelineDB将将此查詢重寫為以下内容:
CREATE VIEW v_sw_test_01 AS
SELECT x::integer FROM stream
WHERE (arrival_timestamp > clock_timestamp() - interval '1 minute');
============================================================
Sliding Aggregates
Sliding-window 查詢還與聚合函數一起工作。滑動聚合通過盡可能多地聚集它們的輸入而工作,但不丢失需要知道如何随着時間的推移從視窗中移除資訊的粒度。這種部分聚合對使用者來說是透明的,隻有完全聚集的結果才會在滑動視窗聚集中可見。
CREATE VIEW v_count_sw_test_ WITH (sw = '1 minute') AS SELECT COUNT(*) FROM stream;
每次在這個 Continuous Views 上運作SELECT時,它傳回的計數将隻是最後一分鐘内看到的事件的計數。例如,如果事件停止進入,則每次在 Continuous Views 上運作SELECT時,計數都會減少。
CREATE VIEW sensor_temps WITH (sw = '5 minutes') AS SELECT sensor::integer, AVG(temp::numeric) FROM sensor_stream GROUP BY sensor;
5分鐘平均溫度是多少
CREATE VIEW uniques WITH (sw = '30 days') AS SELECT COUNT(DISTINCT user::integer) FROM user_stream;
在過去的30天裡,我們看到了多少個不重複的使用者
============================================================
Temporal Invalidation
顯然, continuous views 中的滑動視窗行在一定時間之後變得無效,因為它們已經太老而不能包含在 continuous views 的結果中。這樣的行必須是垃圾收集,這可能以兩種方式發生:
Background invalidation:類似于PostgreSQL的自動清空器的背景程序定期運作并從sliding-window continuous views中實體删除任何過期的行。
Read-time invalidation:當使用SELECT讀取 continuous view 時,在生成結果的同時動态地丢棄任何太舊而不能包括在結果中的資料。這確定即使仍然存在無效行,它們實際上也不包含在任何查詢結果中。
============================================================
step_factor
在内部,盡可能聚合支援 sliding-window 查詢的物化表。但是,不能将行聚合到與查詢的最終輸出相同的粒度級别,因為當資料超出視窗時,必須從聚合結果中删除資料。
例如,按小時聚合的 sliding-window 查詢實際上可能具有磁盤上的分鐘級聚合資料,是以隻有最後60分鐘包含在傳回給讀取器的最終聚合結果中。這些用于 sliding-window 查詢的内部更細粒度的聚合級别稱為“步驟”。“覆寫”視圖被放置在這些步驟聚合之上,以便在讀取時執行最終聚合。
步驟聚合是決定 sliding-window 查詢讀取性能的重要因素,因為每個最終滑動視窗聚合組在内部由多個步驟組成。每個滑動視窗聚合組将具有的步驟數量可以通過step_factor參數進行調整:
step_factor
一個介于1和50之間的整數,将 sliding-window 步長的大小指定為由sw給出的視窗大小的百分比。如果使用較小的step_factor,則在資料離開視窗時提供更大的粒度,代價是磁盤上的物化表更大。更大的step_factor将減少磁盤上的物化表大小,但代價是減少視窗外粒度。
CREATE VIEW v_sw_test_03 WITH (sw = '1 hour', step_factor = 50) AS SELECT COUNT(*) FROM stream;