PostgreSQL sharding : citus 系列3 - 窗口函数调用限制 与 破解之法(套用gpdb执行树,分步执行)

3 minute read

背景

窗口函数是分析场景常用的,目前(citus 7.5)仅支持两种场景使用window函数,

1、partition by 必须是分布键。

2、where条件里面带分布键的等值过滤条件。

本质上:目前(citus 7.5)window函数不支持跨shard操作,或者说过程中不进行重分布。

而Greenplum这方面做得很好,是一个完整的MPP数据库。

citus window函数的支持

postgres=# \set VERBOSITY verbose  
  
  
postgres=# select row_number() over(partition by bid order by aid) rn,* from pgbench_accounts;  
ERROR:  0A000: could not run distributed query because the window function that is used cannot be pushed down  
HINT:  Window functions are supported in two ways.   
Either add an equality filter on the distributed tables' partition column   
or   
use the window functions with a PARTITION BY clause containing the distribution column  
LOCATION:  DeferErrorIfQueryNotSupported, multi_logical_planner.c:938  

满足以下条件即可支持

1、partition by 必须是分布键。

2、where条件里面带分布键的等值过滤条件。

postgres=# select row_number() over(partition by bid order by aid) rn,* from pgbench_accounts where aid=1;  
 rn | aid | bid | abalance |                                        filler                                          
----+-----+-----+----------+--------------------------------------------------------------------------------------  
  1 |   1 |   1 |        0 |                                                                                       
(1 row)  
  
postgres=# select row_number() over(partition by aid order by bid) rn,* from pgbench_accounts  limit 1;  
 rn | aid | bid | abalance |                                        filler                                          
----+-----+-----+----------+--------------------------------------------------------------------------------------  
  1 | 298 |   1 |        0 |                                                                                       
(1 row)  

执行计划

postgres=# explain verbose select row_number() over(partition by aid order by bid) rn,* from pgbench_accounts  limit 1;  
                                                                       QUERY PLAN                                                                          
---------------------------------------------------------------------------------------------------------------------------------------------------------  
 Limit  (cost=0.00..0.00 rows=0 width=0)  
   Output: remote_scan.rn, remote_scan.aid, remote_scan.bid, remote_scan.abalance, remote_scan.filler  
   ->  Custom Scan (Citus Real-Time)  (cost=0.00..0.00 rows=0 width=0)  
         Output: remote_scan.rn, remote_scan.aid, remote_scan.bid, remote_scan.abalance, remote_scan.filler  
         Task Count: 128  
         Tasks Shown: One of 128  
         ->  Task  
               Node: host=172.24.211.224 port=1921 dbname=postgres  
               ->  Limit  (cost=705.99..706.01 rows=1 width=105)  
                     Output: (row_number() OVER (?)), pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler  
                     ->  WindowAgg  (cost=705.99..860.95 rows=7748 width=105)  
                           Output: row_number() OVER (?), pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler  
                           ->  Sort  (cost=705.99..725.36 rows=7748 width=97)  
                                 Output: pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler  
                                 Sort Key: pgbench_accounts.aid, pgbench_accounts.bid  
                                 ->  Seq Scan on public.pgbench_accounts_106812 pgbench_accounts  (cost=0.00..205.48 rows=7748 width=97)  
                                       Output: pgbench_accounts.aid, pgbench_accounts.bid, pgbench_accounts.abalance, pgbench_accounts.filler  
(17 rows)  
  
postgres=# explain verbose select row_number() over(partition by bid order by aid) rn,* from pgbench_accounts where aid=1;  
                                                                         QUERY PLAN                                                                            
-------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Custom Scan (Citus Router)  (cost=0.00..0.00 rows=0 width=0)  
   Output: remote_scan.rn, remote_scan.aid, remote_scan.bid, remote_scan.abalance, remote_scan.filler  
   Task Count: 1  
   Tasks Shown: All  
   ->  Task  
         Node: host=172.24.211.232 port=1921 dbname=postgres  
         ->  WindowAgg  (cost=2.51..2.53 rows=1 width=105)  
               Output: row_number() OVER (?), aid, bid, abalance, filler  
               ->  Sort  (cost=2.51..2.51 rows=1 width=97)  
                     Output: aid, bid, abalance, filler  
                     Sort Key: pgbench_accounts.bid  
                     ->  Index Scan using pgbench_accounts_pkey_106819 on public.pgbench_accounts_106819 pgbench_accounts  (cost=0.28..2.50 rows=1 width=97)  
                           Output: aid, bid, abalance, filler  
                           Index Cond: (pgbench_accounts.aid = 1)  
(14 rows)  

Citus未在window调用中支持重分布的过程。

greenplum window函数的支持

支持任意姿势的window调用

postgres=# create table t(id int, c1 int, c2 int);  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
CREATE TABLE  
  
postgres=# insert into t select random()*100000, random()*10, random()*100 from generate_series(1,10000000);  
INSERT 0 10000000  
  
postgres=# explain select row_number() over (partition by c1 order by id) rn,* from t ;  
                                                    QUERY PLAN                                                      
------------------------------------------------------------------------------------------------------------------  
 Gather Motion 33:1  (slice2; segments: 33)  (cost=1477974.88..1553064.94 rows=10012008 width=12)  
   ->  Window  (cost=1477974.88..1553064.94 rows=303395 width=12)  
         Partition By: c1  
         Order By: id  
         ->  Sort  (cost=1477974.88..1503004.90 rows=303395 width=12)  
               Sort Key: c1, id  
               // 以下在citus中用临时表代替  
	       ->  Redistribute Motion 33:33  (slice1; segments: 33)  (cost=0.00..313817.24 rows=303395 width=12)  
                     Hash Key: c1  
                     ->  Seq Scan on t  (cost=0.00..113577.08 rows=303395 width=12)  
 Optimizer status: legacy query optimizer  
(10 rows)  

甚至一个SQL中支持多个不同维度的partition

postgres=# explain select row_number() over (partition by c1 order by id) rn1, row_number() over (partition by c2 order by c1) rn2, * from t ;  
                                                                   QUERY PLAN                                                                     
------------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 33:1  (slice3; segments: 33)  (cost=3017582.83..3192792.97 rows=10012008 width=12)  
   ->  Subquery Scan coplan  (cost=3017582.83..3192792.97 rows=303395 width=12)  
         ->  Window  (cost=3017582.83..3092672.89 rows=303395 width=12)  
               Partition By: coplan.c1  
               Order By: coplan.id  
               ->  Sort  (cost=3017582.83..3042612.85 rows=303395 width=12)  
                     Sort Key: coplan.c1, coplan.id  
                     // 以下在citus中用临时表代替  
		     ->  Redistribute Motion 33:33  (slice2; segments: 33)  (cost=1477974.88..1853425.18 rows=303395 width=12)  
                           Hash Key: coplan.c1  
                           ->  Subquery Scan coplan  (cost=1477974.88..1653185.02 rows=303395 width=12)  
                                 ->  Window  (cost=1477974.88..1553064.94 rows=303395 width=12)  
                                       Partition By: t.c2  
                                       Order By: t.c1  
                                       ->  Sort  (cost=1477974.88..1503004.90 rows=303395 width=12)  
                                             Sort Key: t.c2, t.c1  
                                             // 以下在citus中用临时表代替  
					     ->  Redistribute Motion 33:33  (slice1; segments: 33)  (cost=0.00..313817.24 rows=303395 width=12)  
                                                   Hash Key: t.c2  
                                                   ->  Seq Scan on t  (cost=0.00..113577.08 rows=303395 width=12)  
 Optimizer status: legacy query optimizer  
(19 rows)  

小结

citus 7.5的版本,对窗口函数的支持仅如下条件(二选一,满足即可调用):

本质上:目前(citus 7.5)window函数不支持跨shard操作。

1、partition by 必须是分布键。

2、where条件里面带分布键的等值过滤条件。

还是回到那句话,write in SQL, thinking in mapreduce。懂了这句话的精髓,你才可以使用citus用作分析场景,否则先乖乖的用来做TP为主的业务。

(比如上面不支持的场景,一条SQL拆成多条,最笨的方法,先创建一个临时表(按PARTITION BY分布),然后再跑window函数就支持了,多走几步即可。)

让CITUS支持本身不支持的SQL语法的最愉快的方法:

把结构导入Greenplum,看Greenplum的执行计划,将Redistribute Motion 的部分,在citus里面用临时表实现。 你照这个做,绝对可以让citus跑OLAP很欢快。

Flag Counter

digoal’s 大量PostgreSQL文章入口