PostgreSQL sharding : citus 系列3 - 窗口函数调用限制 与 破解之法(套用gpdb执行树,分步执行)
背景
窗口函数是分析场景常用的,目前(citus 7.5)仅支持两种场景使用window函数,
1、partition by 必须是分布键。
2、where条件里面带分布键的等值过滤条件。
本质上:目前(citus 7.5)window函数不支持跨shard操作,或者说过程中不进行重分布。
而Greenplum这方面做得很好,是一个完整的MPP数据库。
citus window函数的支持
postgres=# \set VERBOSITY verbose
postgres=# select row_number() over(partition by bid order by aid) rn,* from pgbench_accounts;
ERROR: 0A000: could not run distributed query because the window function that is used cannot be pushed down
HINT: Window functions are supported in two ways.
Either add an equality filter on the distributed tables' partition column
or
use the window functions with a PARTITION BY clause containing the distribution column
LOCATION: DeferErrorIfQueryNotSupported, multi_logical_planner.c:938
满足以下条件即可支持
1、partition by 必须是分布键。
2、where条件里面带分布键的等值过滤条件。
postgres=# select row_number() over(partition by bid order by aid) rn,* from pgbench_accounts where aid=1;
rn | aid | bid | abalance | filler
----+-----+-----+----------+--------------------------------------------------------------------------------------
1 | 1 | 1 | 0 |
(1 row)
postgres=# select row_number() over(partition by aid order by bid) rn,* from pgbench_accounts limit 1;
rn | aid | bid | abalance | filler
----+-----+-----+----------+--------------------------------------------------------------------------------------
1 | 298 | 1 | 0 |
(1 row)
执行计划
postgres=# explain verbose select row_number() over(partition by aid order by bid) rn,* from pgbench_accounts limit 1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.00..0.00 rows=0 width=0)
Output: remote_scan.rn, remote_scan.aid, remote_scan.bid, remote_scan.abalance, remote_scan.filler
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Output: remote_scan.rn, remote_scan.aid, remote_scan.bid, remote_scan.abalance, remote_scan.filler
Task Count: 128
Tasks Shown: One of 128
-> Task
Node: host=172.24.211.224 port=1921 dbname=postgres
-> Limit (cost=705.99..706.01 rows=1 width=105)
Output: (row_number() OVER (?)), pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler
-> WindowAgg (cost=705.99..860.95 rows=7748 width=105)
Output: row_number() OVER (?), pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler
-> Sort (cost=705.99..725.36 rows=7748 width=97)
Output: pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler
Sort Key: pgbench_accounts.aid, pgbench_accounts.bid
-> Seq Scan on public.pgbench_accounts_106812 pgbench_accounts (cost=0.00..205.48 rows=7748 width=97)
Output: pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler
(17 rows)
postgres=# explain verbose select row_number() over(partition by bid order by aid) rn,* from pgbench_accounts where aid=1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------
Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0)
Output: remote_scan.rn, remote_scan.aid, remote_scan.bid, remote_scan.abalance, remote_scan.filler
Task Count: 1
Tasks Shown: All
-> Task
Node: host=172.24.211.232 port=1921 dbname=postgres
-> WindowAgg (cost=2.51..2.53 rows=1 width=105)
Output: row_number() OVER (?), aid, bid, abalance, filler
-> Sort (cost=2.51..2.51 rows=1 width=97)
Output: aid, bid, abalance, filler
Sort Key: pgbench_accounts.bid
-> Index Scan using pgbench_accounts_pkey_106819 on public.pgbench_accounts_106819 pgbench_accounts (cost=0.28..2.50 rows=1 width=97)
Output: aid, bid, abalance, filler
Index Cond: (pgbench_accounts.aid = 1)
(14 rows)
Citus未在window调用中支持重分布的过程。
greenplum window函数的支持
支持任意姿势的window调用
postgres=# create table t(id int, c1 int, c2 int);
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE TABLE
postgres=# insert into t select random()*100000, random()*10, random()*100 from generate_series(1,10000000);
INSERT 0 10000000
postgres=# explain select row_number() over (partition by c1 order by id) rn,* from t ;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------
Gather Motion 33:1 (slice2; segments: 33) (cost=1477974.88..1553064.94 rows=10012008 width=12)
-> Window (cost=1477974.88..1553064.94 rows=303395 width=12)
Partition By: c1
Order By: id
-> Sort (cost=1477974.88..1503004.90 rows=303395 width=12)
Sort Key: c1, id
// 以下在citus中用临时表代替
-> Redistribute Motion 33:33 (slice1; segments: 33) (cost=0.00..313817.24 rows=303395 width=12)
Hash Key: c1
-> Seq Scan on t (cost=0.00..113577.08 rows=303395 width=12)
Optimizer status: legacy query optimizer
(10 rows)
甚至一个SQL中支持多个不同维度的partition
postgres=# explain select row_number() over (partition by c1 order by id) rn1, row_number() over (partition by c2 order by c1) rn2, * from t ;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
Gather Motion 33:1 (slice3; segments: 33) (cost=3017582.83..3192792.97 rows=10012008 width=12)
-> Subquery Scan coplan (cost=3017582.83..3192792.97 rows=303395 width=12)
-> Window (cost=3017582.83..3092672.89 rows=303395 width=12)
Partition By: coplan.c1
Order By: coplan.id
-> Sort (cost=3017582.83..3042612.85 rows=303395 width=12)
Sort Key: coplan.c1, coplan.id
// 以下在citus中用临时表代替
-> Redistribute Motion 33:33 (slice2; segments: 33) (cost=1477974.88..1853425.18 rows=303395 width=12)
Hash Key: coplan.c1
-> Subquery Scan coplan (cost=1477974.88..1653185.02 rows=303395 width=12)
-> Window (cost=1477974.88..1553064.94 rows=303395 width=12)
Partition By: t.c2
Order By: t.c1
-> Sort (cost=1477974.88..1503004.90 rows=303395 width=12)
Sort Key: t.c2, t.c1
// 以下在citus中用临时表代替
-> Redistribute Motion 33:33 (slice1; segments: 33) (cost=0.00..313817.24 rows=303395 width=12)
Hash Key: t.c2
-> Seq Scan on t (cost=0.00..113577.08 rows=303395 width=12)
Optimizer status: legacy query optimizer
(19 rows)
小结
citus 7.5的版本,对窗口函数的支持仅如下条件(二选一,满足即可调用):
本质上:目前(citus 7.5)window函数不支持跨shard操作。
1、partition by 必须是分布键。
2、where条件里面带分布键的等值过滤条件。
还是回到那句话,write in SQL, thinking in mapreduce。懂了这句话的精髓,你才可以使用citus用作分析场景,否则先乖乖的用来做TP为主的业务。
(比如上面不支持的场景,一条SQL拆成多条,最笨的方法,先创建一个临时表(按PARTITION BY分布),然后再跑window函数就支持了,多走几步即可。)
让CITUS支持本身不支持的SQL语法的最愉快的方法:
把结构导入Greenplum,看Greenplum的执行计划,将Redistribute Motion 的部分,在citus里面用临时表实现。 你照这个做,绝对可以让citus跑OLAP很欢快。