标簽
PostgreSQL , citus , sharding , 優化器 , query planner , query exexutor , Real-time Executor , Router Executor , Task Tracker Executor , co-locate
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#%E8%83%8C%E6%99%AF 背景
A Citus cluster consists of a coordinator instance and multiple worker instances. The data is sharded and replicated on the workers while the coordinator stores metadata about these shards. All queries issued to the cluster are executed via the coordinator. The coordinator partitions the query into smaller query fragments where each query fragment can be run independently on a shard. The coordinator then assigns the query fragments to workers, oversees their execution, merges their results, and returns the final result to the user. The query processing architecture can be described in brief by the diagram below.
Citus是PG的sharding插件,citus叢集由一個coordinator節點以及若幹計算節點組成。表可以選擇分片(sharding)、複制(reference table,存在所有計算節點(完全一樣))、local(存在coordinator中的表)。對于sharding表,可以指定shard的副本數,内部可以使用pg的logical replication或citus自己多寫(statement level)、或2pc(對于reference table)實作資料的同步。
http://docs.citusdata.com/en/v8.3/develop/api_udf.html#create-distributed-tablePartition table
The pg_dist_partition table stores metadata about which tables in the database are distributed. For each distributed table, it also stores information about the distribution method and detailed information about the distribution column.
repmodel
char
The method used for data replication. The values of this column
corresponding to different replication methods are :-
* citus statement-based replication: ‘c’
* postgresql streaming replication: ‘s’
* two-phase commit (for reference tables): ‘t’
coordinator節點存儲中繼資料(表結構、分片方法等)。
使用者連接配接coordinator節點。向coordinator節點發送SQL請求。
coordinator節點收到使用者SQL請求後,首先将使用者的SQL請求拆分成小的query碎片,每個碎片可以獨立在一個shard執行。coordinator節點将這些query碎片發送給計算節點,監測計算節點的執行,接收計算節點執行這些QUERY碎片傳回的結果,将結果合并,傳回結果給用戶端。
架構如下:
藍色為citus插件加的功能,黃色為postgreSQL自帶的功能。
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#citus-distributed-query-planner CITUS Distributed Query Planner
CITUS分布式QUERY planner,處理使用者SQL請求,生成分布式執行計劃。
For SELECT queries, the planner first creates a plan tree of the input query and transforms it into its commutative and associative form so it can be parallelized. It also applies several optimizations to ensure that the queries are executed in a scalable manner, and that network I/O is minimized.
例如, SELECT語句,citus 分布式planner首先生成plan tree,并轉換為可并行執行的交換與關聯格式。同時盡可能的讓QUERY可以并行的執行,并盡量的節省網絡IO。
Next, the planner breaks the query into two parts - the coordinator query which runs on the coordinator and the worker query fragments which run on individual shards on the workers. The planner then assigns these query fragments to the workers such that all their resources are used efficiently. After this step, the distributed query plan is passed on to the distributed executor for execution.
然後,将QUERY拆分為兩個部分,第一個部分在coordinator執行,第二個部分在計算節點執行。并将這兩個部分分别下發給coordinator和計算節點。完成後,将進入distributed executor 的execution階段。
The planning process for key-value lookups on the distribution column or modification queries is slightly different as they hit exactly one shard. Once the planner receives an incoming query, it needs to decide the correct shard to which the query should be routed. To do this, it extracts the distribution column in the incoming row and looks up the metadata to determine the right shard for the query. Then, the planner rewrites the SQL of that command to reference the shard table instead of the original table. This re-written plan is then passed to the distributed executor.
當QUERY中包含了分片表的分片字段的值時,與前面描述略有不同,會涉及query rewrite的過程,首先planner process會從中繼資料表中得到分片鍵的方法,并使用分片字段的值計算出這個QUERY對應的shard,然後使用shard table name替換QUERY中的分片表原始表名。最後,将這個重寫過的QUERY發送給CITUS distributed executor.
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#citus-distributed-query-executor CITUS Distributed Query Executor
citus有三種執行器:
Real-time Executor, Router Executor, Task Tracker Executor
Citus’s distributed executors run distributed query plans and handle failures that occur during query execution. The executors connect to the workers, send the assigned tasks to them and oversee their execution. If the executor cannot assign a task to the designated worker or if a task execution fails, then the executor dynamically re-assigns the task to replicas on other workers. The executor processes only the failed query sub-tree, and not the entire query while handling failures.
citus 分布式執行器在coordinator上,它會與計算節點建立連接配接,将QUERY請求(分布式planner産生的QUERY碎片)發送到計算節點。在執行過程中,如果遇到計算節點failed,自動切換到其他正常的shard replica(僅當建分片表時指定了replica時),并維護shard所有replica的狀态。
Citus has three basic executor types: real time, router, and task tracker. It chooses which to use dynamically, depending on the structure of each query, and can use more than one at once for a single query, assigning different executors to different subqueries/CTEs as needed to support the SQL functionality. This process is recursive: if Citus cannot determine how to run a subquery then it examines sub-subqueries.
citus有三種executor類型(real time, router, and task tracker),executor類型在執行過程中是動态選擇的,是以一個使用者的QUERY可能使用多種executor類型(不同的碎片可能選擇不同的executor),例如一個複雜SQL,不同的subqueries/CTEs可能使用不同的executor 類型。
At a high level, the real-time executor is useful for handling simple key-value lookups and INSERT, UPDATE, and DELETE queries. The task tracker is better suited for larger SELECT queries, and the router executor for access data that is co-located in a single worker node.
簡單來說,real-time executor适合于簡單SQL,例如k-v型的SQL(即包含了分布鍵的),task tracker executor适合複雜查詢(例如需要跑很久的SQL),router executor适合QUERY中通路的資料在單個計算節點中(即query 沒有任何跨節點計算的情況。又或者當有多個表的JOIN時,這些表是co-locate的,并且QUERY條件中隻包含了單個計算節點相關的資料,(即這個QUERY隻需要下發到單個計算節點時),那麼也可以使用router executor)。
什麼是co-locate: rows with the same distribution column value are always on the same machine, even across different tables.
http://docs.citusdata.com/en/v8.1/sharding/data_modeling.html#colocation是以不同的表,在JOIN時,如果兩個表的JOIN字段類型一緻,并且這兩個表設定了co-locate在同一個GROUP中,那麼這兩個表的JOIN不需要跨計算節點。 (To ensure co-location, shards with the same hash range are always placed on the same node even after rebalance operations, such that equal distribution column values are always on the same node across tables.)
co-locate跟表的分片數沒關系,因為分片實際上是HASH邊界值。例如:
table1, hash by column id, 25個分片。
table2, hash by column id, 26個分片。
co-locate後,兩個表的相同hash value段對應的shard(s)配置設定到一個計算節點。例如
table1_shard1 : 1-10000
table2_shard1 : 1-8000 , table2_shard2 : 8001-10000
保證他們在一個計算節點即可
當然,建議co-locate的table 分片時使用相同數目的shard數,這樣會更加均衡。
使用 co-locate後,支援更多的特性,建議能夠使用co-locate時,盡量使用:
- Full SQL support for queries on a single set of co-located shards
- Multi-statement transaction support for modifications on a single set of co-located shards
- Aggregation through INSERT..SELECT
- Foreign keys
- Distributed outer joins
The choice of executor for each query can be displayed by running PostgreSQL’s EXPLAIN command. This can be useful for debugging performance issues.
使用explain,可以看到sql使用了哪些executor類型。
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#real-time-executor Real-time Executor
The real-time executor is the default executor used by Citus. It is well suited for getting fast responses to queries involving filters, aggregations and co-located joins. The real time executor opens one connection per shard to the workers and sends all fragment queries to them. It then fetches the results from each fragment query, merges them, and gives the final results back to the user.
real-time executor是預設的執行器類型,使用real-time執行器,執行器需要與每個shard建立連接配接,如果一個計算節點上有100個shard,那麼real-time executor需要與這個計算節點建立100個連接配接。
query 碎片也是按每個shard來發。是以一個計算節點上有100個shard時,會同時執行100條SQL。
real-time executor發送完QUERY碎片後,從所有的連接配接擷取結果,并對結果進行合并,将最終結果傳回給應用程式。
Since the real time executor maintains an open connection for each shard to which it sends queries, it may reach file descriptor / connection limits while dealing with high shard counts. In such cases, the real-time executor throttles on assigning more tasks to workers to avoid overwhelming them with too many tasks. One can typically increase the file descriptor limit on modern operating systems to avoid throttling, and change Citus configuration to use the real-time executor. But, that may not be ideal for efficient resource management while running complex queries. For queries that touch thousands of shards or require large table joins, you can use the task tracker executor.
很顯然,real-time執行器可能會耗費很多很多的連接配接,是以很容易達到作業系統fd (檔案描述符)上限。同時連接配接過多,可能導緻性能問題。(PG是程序模型)
1、OS層可以設定一些limits或者核心參數來加大這個限制。例如
/etc/security/limits.conf, /etc/sysctl.conf
* soft nofile 131072
* hard nofile 131072
* soft nproc 131072
* hard nproc 131072
fs.nr_open = 20480000
2、在計算節點前,設定pgbouncer連接配接池(語句模式、或 事務模式)。讓real-time 執行器與計算節點連接配接時,實際上使用的是pgbouncer連接配接池。
Furthermore, when the real time executor detects simple INSERT, UPDATE or DELETE queries it assigns the incoming query to the worker which has the target shard. The query is then handled by the worker PostgreSQL server and the results are returned back to the user. In case a modification fails on a shard replica, the executor marks the corresponding shard replica as invalid in order to maintain data consistency.
當real-time 執行器檢測到SQL為簡單的insert, update, delete(并且分布式planner已經将表名替換成了shard 表名)時,它會将QUERY直接丢給對應的計算節點去執行,如果計算節點執行異常,real-time executor會将這個shard replica标記為invalid。
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#router-executor Router Executor
When all data required for a query is stored on a single node, Citus can route the entire query to the node and run it there. The result set is then relayed through the coordinator node back to the client. The router executor takes care of this type of execution.
當一個QUERY涉及的所有資料(表、...)在單個計算節點時,那麼citus可以将這個QUERY下發給這個計算節點。這種情況下可以使用router 執行器。
Although Citus supports a large percentage of SQL functionality even for cross-node queries, the advantage of router execution is 100% SQL coverage. Queries executing inside a node are run in a full-featured PostgreSQL worker instance. The disadvantage of router execution is the reduced parallelism of executing a query using only one computer.
router執行器的好處:支援所有SQL(與單節點的PG無異)。壞處,沒有了并行能力,同一時刻隻能用一個計算節點。
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#task-tracker-executor Task Tracker Executor
The task tracker executor is well suited for long running, complex data warehousing queries. This executor opens only one connection per worker, and assigns all fragment queries to a task tracker daemon on the worker. The task tracker daemon then regularly schedules new tasks and sees through their completion. The executor on the coordinator regularly checks with these task trackers to see if their tasks completed.
任務跟蹤執行器,顯然它是以任務排程的形式執行QUERY碎片的。适合長時間執行的QUERY,數倉的QUERY。任務跟蹤執行器與每一個計算節點建立一個連接配接,同時任務跟蹤執行器需要将QUERY碎片指派給在計算節點中的任務執行背景程序。
計算節點上的任務執行背景程序,負責排程執行從coordinator上的任務執行器發過來的QUERY碎片,并且更新QUERY碎片的執行狀态。coordinator上的任務跟蹤執行器,規律性的檢查所有計算節點上的任務執行情況,檢視query碎片是否執行完畢。
Each task tracker daemon on the workers also makes sure to execute at most citus.max_running_tasks_per_node concurrently. This concurrency limit helps in avoiding disk I/O contention when queries are not served from memory. The task tracker executor is designed to efficiently handle complex queries which require repartitioning and shuffling intermediate data among workers.
通過citus.max_running_tasks_per_node參數,可以控制每個計算節點上的任務跟蹤背景程序的任務執行并行度,確定不會把資源打爆。(因為通常大的QUERY碎片才會使用任務跟蹤執行器執行,都是很耗費資源的QUERY碎片(IO,CPU,MEMORY等),甚至資料重分布等)
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#%E5%B0%8F%E7%BB%93 小結
使用者與citus叢集互動時,連接配接到citus叢集的coordinator節點,coordinator節點存有中繼資料(表結構,分布鍵,shard數,co-locate組資訊等),coordinator收到使用者SQL請求後,分布式planner将SQL請求拆分成兩個部分,一個部分在coordinator執行(分布式執行器),另一個部分包含了query rewrite後的query碎片,它們在計算節點中執行(citus分布式JOIN函數(可選,用于資料重分布),或者直接給到PostgreSQL原生query planner)。
coordinator分布式執行器負責監測計算節點的執行結果,接收,合并,傳回給用戶端。如果使用了分片表的副本功能(postgresql logical replication),如果執行過程中存在異常的shard replica,在有其他正常shard replica的情況下,分布式執行器還會負責做failover(使用正常shard replica),并将異常的shard replica标記為invalid,用于保證資料一緻性。
是以citus開啟多副本時,有一定的容錯能力。
citus有三種分布式執行器:
1、real-time,這種執行器适合簡單SQL,執行器與每個shard建立一個連接配接,如果table的shard很多,很容易打爆系統資源,資料庫連接配接數。建議加大OS資源限制,同時建議使用連接配接池連接配接計算節點。
2、router,當一個QUERY可下發到單一計算節點執行時,可以啟用router 執行器,這個執行器的好處是支援所有SQL,壞處是無法使用CITUS的分布式并行能力(因為隻用了單個節點)。
3、task tracker,每個執行器與每一個計算節點建立連接配接,每個計算節點上,開啟一個task tracker daemo背景程序,背景程序負責排程由coordinator發送過來的QUERY碎片,每個計算節點可同時執行的排程任務的并行度取決于參數citus.max_running_tasks_per_node。 計算節點上的背景程序,負責排程,同時負責更新QUERY碎片執行的結果,狀态。coordinator task tracker執行器,會監測計算節點中QUERY碎片的執行狀态,當執行完成後,接受結果,合并結果,傳回給用戶端。task tracker 執行器,适合大的QUERY,例如分析QUERY。
https://github.com/digoal/blog/blob/master/201903/20190316_02.md#%E5%8F%82%E8%80%83 參考
http://docs.citusdata.com/en/v8.1/develop/reference_processing.html#citus-query-processing 《PostgreSQL sharding : citus 系列7 - topn 加速(count(*) group by order by count(*) desc limit x
) (use 估值插件 topn)》 《PostgreSQL sharding : citus 系列6 - count(distinct xx) 加速 (use 估值插件 hll|hyperloglog)》 《PostgreSQL sharding : citus 系列5 - worker節點網絡優化》 《PostgreSQL sharding : citus 系列4 - DDL 操作規範 (新增DB,TABLE,SCHEMA,UDF,OP,使用者等)》 《PostgreSQL 11 相似圖像搜尋插件 imgsmlr 性能測試與優化 3 - citus 8機128shard (4億圖像)》 《Deepgreen(Greenplum) 多機部署測試 , TPC-H VS citus》 《PostgreSQL sharding : citus 系列3 - 視窗函數調用限制 與 破解之法(套用gpdb執行樹,分步執行)》 《PostgreSQL sharding : citus 系列2 - TPC-H》 《PostgreSQL citus, Greenplum 分布式執行計劃 DEBUG》 《PostgreSQL sharding : citus 系列1 - 多機部署(含OLTP(TPC-B)測試)》