pipelinedb 使用與總結
pipelinedb 介紹
- 它是基于Postgresql資料庫, 可以使用資料庫的庫函數、表達式、存儲過程等功能,而且還支援proxy等分表分庫插件。它可以與任何已經使用PostgreSQL的庫一起工作。
- pipelinedb是為了在流資料上連續進行sql查詢而建構的,這些連續的查詢的輸出存儲在正常表中。是以連續查詢可以被認為是非常高吞吐量、增量更新的物化視圖。
- pipelinedb應用程式的設計目的是在減少流資料集的基數的SQL查詢中勝出。例如:總結和聚合;在滑動時間視窗執行計算;文本搜尋過濾;通過減少輸入流的基數,pipelinedb可以極大地減少需要持久化到磁盤上的資訊量,因為隻有連續查詢的輸出才會被存儲。原始資料一旦被需要讀取的連續查詢所讀取,就會被丢棄。
- 通過pipelinedb傳輸的大部分資料可以被認為是虛拟資料。這種資料虛拟化的概念是流水線操作的核心,也是允許它使用相對較小的硬體足迹高效地處理大量資料的原因。
- pipelinedb的目的是消除許多常見資料處理的ETL階段的必要性。原始資料可以直接流到pipelinedb中,在那裡它可以通過連續不斷的查詢不斷地進行精煉和提煉。這使得在将其精煉的輸出加載到資料庫之前,不需要周期性地處理細粒度的資料,當然,前提是這種處理可以由SQL查詢來定義的話。
pipelinedb 安裝(rpm)
pipelinedb download
-
建立新使用者pipe(非root即可)
rpm -ivh --prefix=~/pipe/pipelinedb pipelinedb-<version>.rpm # (此處安裝目錄使用簡寫,使用者可以根據自己需要和習慣自定義,稱~/pipe/pipelinedb:安裝目錄)
-
初始化資料庫指定目錄
-D 資料庫執行個體目錄 -l 日志檔案目錄 可以在安裝目錄下建立data、log檔案夾,後續安裝指令指定即可
pipeline-init -D <data directory> pipeline-ctl -D <data directory> -l pipelinedb.log start (首次初始化資料庫需要"-D <data directory>"制定執行個體化目錄)
-
資料庫服務狀态操作啟動/關閉
pg_ctl start/stop pipeline-ctl -D <data directory> start/stop (首次初始化資料庫需要"-D <data directory>"制定執行個體化目錄)
-
連接配接pipelinedb資料庫
兩種方式,後一種與Postgresql一樣
- pipeline pipeline
-
設定初始密碼
Configuration
修改配置檔案兩個處與Postgresql修改配置檔案一樣
修改後重新開機即生效
-
pipelinedb.conf
-
對應位置修改為:listen_addresses = '*'
-
-
pg_hba.conf
-
TYPE DATABASE USER ADDRESS METHOD
-
增加:host all all 0.0.0.0/0 truth
-
pipelinedb使用
-
Streams
允許用戶端通過連續的視圖來推送資料。資料流的行,或者說簡單的事件,看起來就像一個普通表的行, 而将資料寫入流的接口與寫入表格的接口是相同的。 也就是說,事件隻存在于流中,直到它們被從該流中讀取的所有連續視圖所消耗。 即便如此,使用者仍然無法從流中進行選擇。流隻作為連續視圖的輸入。
CREATE STREAM stream_name ( [ { column_name data_type [ COLLATE collation ] | LIKE parent_stream } [, ... ] ] ) CREATE STREAM stream (x integer, y integer);
ALTER STREAM stream ADD COLUMN z integer;
- Prepared INSERT
INSERT INTO stream (x, y, z) VALUES (0, 1, 2), (3, 4, 5), (6, 7, 8); # 使用Prepared INSERT 可以實作如上方式的多列插入
PREPARE write_to_stream AS INSERT INTO stream (x, y, z) VALUES ($1, $2, $3); EXECUTE write_to_stream(0, 1, 2); EXECUTE write_to_stream(3, 4, 5); EXECUTE write_to_stream(6, 7, 8);
-
pipelinedb資料庫基本抽象稱為連續視圖
一個連續的視圖很像一個正常視圖,除了它從流和表的組合中選擇作為輸入, 并且随着新資料被寫入到這些輸入中,它會實時地更新。 一旦一個流行被連續的視圖讀取,必須讀取它,它就會被丢棄,它沒有存儲在任何地方。 對于連續視圖來說,唯一持久的資料是通過從該視圖運作SELECT來傳回的任何資料。 是以,可以将連續視圖看作是一個非常高吞吐量、實時的物化視圖。
CREATE CONTINUOUS VIEW name AS query # 建立視圖
DROP CONTINUOUS VIEW name # 删除視圖
SELECT truncate_continuous_view('name'); # 删除所有連續視圖的資料而不删除連續視圖本身
SELECT * FROM pipeline_views(); # 檢視視圖
SELECT activate('continuous_view_or_transform'); SELECT deactivate('continuous_view_or_transform'); # 用于檢測視圖或者轉換(觸發器)是否是活躍狀态,函數隻接受一個連續視圖或轉換名稱
-
Example
-
在這個例子中,将計算一些關于一天的Wikipedia頁面視圖資料的基本統計資訊。 資料集中的每個記錄都包含每個Wikipedia頁面的小時頁面視圖統計資訊 資料集可以下載下傳下來看一下
FROM pipelinedb docspsql -h localhost -p 5432 -d pipeline -c " CREATE STREAM wiki_stream(hour timestamp,project text,title text,view_count bigint,size bigint); CREATE CONTINUOUS VIEW wiki_stats AS SELECT hour, project, count(*) AS total_pages, sum(view_count) AS total_views, min(view_count) AS min_views, max(view_count) AS max_views, avg(view_count) AS avg_views, percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views, sum(size) AS total_bytes_served FROM wiki_stream GROUP BY hour, project;" curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \ psql -h localhost -p 5432 -d pipeline -c " COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN" psql -h localhost -p 5432 -d pipeline -c " SELECT * FROM wiki_stats ORDER BY total_views DESC";
-
-
Transform
view和transform的差別在于,view的計算結果會儲存在資料庫中, 而transform不會儲存,隻能定義觸發器來将結果輸出到其他地方 可以在存儲過程中調用view,但是 transform由于結果已經被重定向了,是以無法被調用 此處沒有過多的介紹,後面找到更合适的應用執行個體,進行補寫
-
滑動視窗
- 滑動視窗的使用是重點
- 由于連續的視圖不斷地更新,是以資料庫系統可以在更新連續視圖的結果時考慮目前的時間。包含與目前時間相關的時間元件的WHERE子句的查詢稱為滑窗查詢。滑動WHERE子句過濾或接受的一系列事件會随着時間的推移而變化。
- clock_timestamp()
-
一個内置函數,它總是傳回目前時間戳
-
- arrival_timestamp
-
所有傳入事件的特殊屬性,其中包含資料庫系統接收到的時間,如到達順序所述
-
- 沒有必要顯式地添加引用這些值的WHERE子句。資料庫在内部執行這個操作,隻需要在一個連續視圖定義中指定sw存儲參數。
-
Example
- 查詢使用者在最後一分鐘看到的内容
CREATE CONTINUOUS VIEW recent_users WITH (sw = '1 minute') AS SELECT user_id::integer FROM stream;
- pipelinedb将以上sql重寫為
CREATE CONTINUOUS VIEW recent_users AS SELECT user_id::integer FROM stream WHERE (arrival_timestamp > clock_timestamp() - interval '1 minute')
-
Sliding Aggregates
- 滑窗查詢也适用于聚合函數。滑動聚合通過盡可能多地聚合它們的輸入,但是不丢失需要知道如何随着時間的推移從視窗中删除資訊所需的粒度。這種局部聚合對使用者來說是透明的,隻有在滑動視窗聚合中才能看到完全聚合的結果
-
Example
- 傳感器5分鐘移動的平均溫度是多少?
CREATE CONTINUOUS VIEW sensor_temps WITH (sw = '5 minutes') AS SELECT sensor::integer, AVG(temp::numeric) FROM sensor_stream GROUP BY sensor;
- 在過去的30天裡我們看到了多少唯一的使用者?
CREATE CONTINUOUS VIEW uniques WITH (sw = '30 days') AS SELECT COUNT(DISTINCT user::integer) FROM user_stream;
- 在過去的5分鐘裡,伺服器的第99個預反應延遲是什麼?
CREATE CONTINUOUS VIEW latency WITH (sw = '5 minutes') AS SELECT server_id::integer, percentile_cont(0.99) WITHIN GROUP (ORDER BY latency::numeric) FROM server_stream GROUP BY server_id;
使用程式連接配接pipelinedb
方式與連接配接Postgresql一樣
Java需要相應的驅動程式jar
Python使用psycopg2較友善
-
Python
conn = psycopg2.connect("dbname='pipeline' user='pipe' password='pipeline' host='ip' port=5432") pipeline = conn.cursor() # stream(page_views)需要提前建立,或者加句建立stream的sql程式在執行建立視圖前執行下即可 query = """ CREATE CONTINUOUS VIEW view AS SELECT url::text, count(*) AS total_count, count(DISTINCT cookie::text) AS uniques, percentile_cont(0.99) WITHIN GROUP (ORDER BY latency::integer) AS p99_latency FROM page_views GROUP BY url """ pipeline.execute(query) conn.commit() # Now let's simulate some page views for n in range(10000): # 10 unique urls url = '/some/url/%d' % (n % 10) # 1000 unique cookies cookie = '%032d' % (n % 1000) # latency uniformly distributed between 1 and 100 latency = random.randint(1, 100) pipeline.execute(""" INSERT INTO page_views (url, cookie, latency) VALUES ('%s', '%s', %d) """ % (url, cookie, latency)) # The output of a continuous view can be queried like any other table or view pipeline.execute('SELECT * FROM view ORDER BY url') rows = pipeline.fetchall() for row in rows: print(row) pipeline.execute('DROP CONTINUOUS VIEW v') conn.commit()
-
executemany() 使用
示例目的是練習executemany()的使用,stream以及視圖的建立就不多表述 第一個for循環目的:構造内層為以aa、bb、cc為鍵的字典,namedict型如: {1:{'aa':10,'bb':50,'cc':100},2:{'aa':10,'bb':50,'cc':100},3:{'aa':10,'bb':50,'cc':100} tup型如:({'aa':10,'bb':50,'cc':100},{'aa':10,'bb':50,'cc':100},{'aa':10,'bb':50,'cc':100})
import psycopg2 namedict = {} aa = 'aa' bb = 'bb' cc = 'cc' # if條件句可以忽略,隻為指派所寫 for n in range(100000): rows1 = {} x = n % 10 rows1[aa]=100000 - n rows1[bb]=x if 0<n< 500: rows1[cc]=n*n-200 else: if n<10000: rows1[cc]=n*20-500 else: rows1[cc]=n*3-200 namedict[n] = rows1 for i in range(10): print(namedict[i]) print(namedict[i]['aa'],namedict[i]['bb'],namedict[i]['cc']) tup = tuple(namedict.values()) pipeline.executemany("""INSERT INTO test_stream (key,value,x) VALUES (%(aa)s,%(bb)s,%(cc)s)""", tup) pipeline.execute('SELECT * FROM view ORDER BY url') rows = pipeline.fetchall() for row in rows: print(row) pipeline.execute('DROP CONTINUOUS VIEW view') conn.commit()
-
-
Java
需要先将驅動程式jar檔案引入
import java.util.Properties; import java.sql.*; public class example { static final String HOST = "ip**"; static final String DATABASE = "pipeline"; static final String USER = "pipe"; static final String PASSWORD = "pipeline"; public static void main(String[] args) throws SQLException { String url = "jdbc:postgresql://" + HOST + ":5432/" + DATABASE; ResultSet rs; // Properties props = new Properties(); // props.setProperty("user", USER); Connection conn = DriverManager.getConnection(url, USER, PASSWORD); Statement stmt = conn.createStatement(); stmt.executeUpdate("CREATE CONTINUOUS VIEW view AS SELECT x::integer, COUNT(*) FROM stream GROUP BY x"); for (int i = 0; i < 100000; i++) { int x = i % 10; stmt.addBatch("INSERT INTO stream (x) VALUES (" + Integer.toString(x) + ")"); } stmt.executeBatch(); rs = stmt.executeQuery("SELECT * FROM view"); while (rs.next()) { int id = rs.getInt("x"); int count = rs.getInt("count"); System.out.println(id + " = " + count); } // stmt.executeUpdate("DROP CONTINUOUS VIEW v"); conn.close(); } }
pipelinedb docx