天天看點

Postgresql - PipelineDB - Sliding Windows

由于 Continuous Views 是随着時間連續地和增量地更新的,是以在更新 Continuous Views 的結果時,PipelineDB有能力考慮目前時間。包含WHERE子句和與目前時間相關的時間元件的查詢稱為sliding-window 查詢。滑動WHERE子句篩選或接受的事件集會随着時間不斷變化。

sliding WHERE子句 兩個重要組成部分:

clock_timestamp ( ):一個内置函數,它總是傳回目前的時間戳。

arrival_timestamp:所有傳入事件的一個特殊屬性,包含PipelineDB接收它們的時間,就像描述在Arrival Ordering的一樣。

但是,沒有必要顯式地添加引用這些值的WHERE子句。PipelineDB在内部執行此操作,并且隻需要在Continuous Views 的定義中指定sw存儲參數。

盡管sliding windows 是SQL資料庫的新概念,但是PipelineDB沒有使用任何新的或專有的視窗文法。相反,PipelineDB 使用标準PostgreSQL 9.5文法。

PipelineDB允許使用者在定義滑動視窗連續視圖時手動構造滑動視窗WHERE子句。

這個連續視圖上的SELECT結果隻包含最後一分鐘内看到的特定使用者。也就是說,重複的SELECT将包含不同的行,即使連續視圖沒有顯式更新。

每次計算clock_timestamp() - interval '1 minute' 時,它将傳回與過去1分鐘相對應的時間戳。如果給定事件的arrival_timestamp 在過去大于1分鐘,則添加arrival_timestamp 和 > 意味着該謂詞将計算為true。由于每次讀取新事件時都會評估謂詞,是以這有效地給了我們一個1分鐘寬的 sliding window 。

實驗:

CREATE FOREIGN TABLE stream (x integer, y integer) SERVER pipelinedb;

CREATE VIEW v_sw_test_01 WITH (sw = '1 minute') AS SELECT user_id::integer FROM stream;

寫一個python腳本,向stream中插入資料。

import psycopg2

conn = psycopg2.connect("host=localhost user=mytest")

for i in xrange(51,100):

SQL = "INSERT INTO stream VALUES (%s,2);" % i

cur = conn.cursor()

cur.execute(SQL)

conn.commit()

time.sleep(23)

邊執行腳本,邊檢視v_sw_test_01視圖中的資料,發現目前存在的資料,隻是最近一分鐘插入的資料。

mytest=# select * from recent_users;

x

----

77

78

79

(3 rows)

過一會兒

mytest=# select * from recent_users;

x

----

78

79

(2 rows)

再過一會兒

mytest=# select * from recent_users;

x

----

78

79

80

(3 rows)

實際上,在内部,PipelineDB将将此查詢重寫為以下内容:

CREATE VIEW v_sw_test_01 AS

SELECT x::integer FROM stream

WHERE (arrival_timestamp > clock_timestamp() - interval '1 minute');

============================================================

Sliding Aggregates

Sliding-window 查詢還與聚合函數一起工作。滑動聚合通過盡可能多地聚集它們的輸入而工作,但不丢失需要知道如何随着時間的推移從視窗中移除資訊的粒度。這種部分聚合對使用者來說是透明的,隻有完全聚集的結果才會在滑動視窗聚集中可見。

CREATE VIEW v_count_sw_test_ WITH (sw = '1 minute') AS SELECT COUNT(*) FROM stream;

每次在這個 Continuous Views 上運作SELECT時,它傳回的計數将隻是最後一分鐘内看到的事件的計數。例如,如果事件停止進入,則每次在 Continuous Views 上運作SELECT時,計數都會減少。

CREATE VIEW sensor_temps WITH (sw = '5 minutes') AS SELECT sensor::integer, AVG(temp::numeric) FROM sensor_stream GROUP BY sensor;

5分鐘平均溫度是多少

CREATE VIEW uniques WITH (sw = '30 days') AS SELECT COUNT(DISTINCT user::integer) FROM user_stream;

在過去的30天裡,我們看到了多少個不重複的使用者

============================================================

Temporal Invalidation

顯然, continuous views 中的滑動視窗行在一定時間之後變得無效,因為它們已經太老而不能包含在 continuous views 的結果中。這樣的行必須是垃圾收集,這可能以兩種方式發生:

Background invalidation:類似于PostgreSQL的自動清空器的背景程序定期運作并從sliding-window continuous views中實體删除任何過期的行。

Read-time invalidation:當使用SELECT讀取 continuous view 時,在生成結果的同時動态地丢棄任何太舊而不能包括在結果中的資料。這確定即使仍然存在無效行,它們實際上也不包含在任何查詢結果中。

============================================================

step_factor

在内部,盡可能聚合支援 sliding-window 查詢的物化表。但是,不能将行聚合到與查詢的最終輸出相同的粒度級别,因為當資料超出視窗時,必須從聚合結果中删除資料。

例如,按小時聚合的 sliding-window 查詢實際上可能具有磁盤上的分鐘級聚合資料,是以隻有最後60分鐘包含在傳回給讀取器的最終聚合結果中。這些用于 sliding-window 查詢的内部更細粒度的聚合級别稱為“步驟”。“覆寫”視圖被放置在這些步驟聚合之上,以便在讀取時執行最終聚合。

步驟聚合是決定 sliding-window 查詢讀取性能的重要因素,因為每個最終滑動視窗聚合組在内部由多個步驟組成。每個滑動視窗聚合組将具有的步驟數量可以通過step_factor參數進行調整:

step_factor

一個介于1和50之間的整數,将 sliding-window 步長的大小指定為由sw給出的視窗大小的百分比。如果使用較小的step_factor,則在資料離開視窗時提供更大的粒度,代價是磁盤上的物化表更大。更大的step_factor将減少磁盤上的物化表大小,但代價是減少視窗外粒度。

CREATE VIEW v_sw_test_03 WITH (sw = '1 hour', step_factor = 50) AS SELECT COUNT(*) FROM stream;