PostgreSQL distinct 与 Greenplum distinct 的实现与优化

17 minute read

背景

求distinct是业务的一个普遍需求,例如每天有多少用户,每个省份有多少用户,每天有多少类目的用户等。

select date,count(dinstinct user) from tbl group by date;  
  
select date, province, count(distinct user) from tbl group by 1,2;  
  
select date, count(dinstnct user), count(distinct class) from tbl group by 1;  

distinct是一个求唯一值个数的需求,如果你不需要精确值的话,你还可以选择一些估值计算方法:

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《PostgreSQL hll (HyperLogLog) extension for “State of The Art Cardinality Estimation Algorithm” - 3》

《PostgreSQL hll (HyperLogLog) extension for “State of The Art Cardinality Estimation Algorithm” - 2》

《PostgreSQL hll (HyperLogLog) extension for “State of The Art Cardinality Estimation Algorithm” - 1》

《秒级任意维度分析1TB级大表 - 通过采样估值满足高效TOP N等统计分析需求》

《妙用explain Plan Rows快速估算行》

《PostgreSQL pg_stats used to estimate top N freps values and explain rows》

本文主要分析一下PostgreSQL和Greenplum的distinct 算法:

hashagg和groupagg

hashagg和groupagg观察模型

为了便于观察,我们需要创建一张测试表,灌入1亿条测试记录。

create table tbl(c1 int, c2 int, c3 int, c4 int);  
  
insert into tbl select random()*1000, random()*1000, random()*100, random()*100 from generate_series(1,100000000);  

distinct语句

1、

select c1,c2,count(distinct c3) from tbl group by c1,c2;  

2、

select c1,c2,count(distinct c3),count(distinct c4) from tbl group by c1,c2;  

3、

select c1,c2,count(distinct (c3,c4)) from tbl group by c1,c2;  

distinct替换语句

1、

select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2;  

2、

select t1.c1, t1.c2, t1.cn as c3, t2.cn as c4 from  
  (select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2) t1  
join  
  (select c1,c2,count(*) cn from (select c1,c2,c4 from tbl group by c1,c2,c4) t group by c1,c2) t2  
on (  
  NOT t1.c1 IS DISTINCT FROM t2.c1   
  AND   
  NOT t1.c2 IS DISTINCT FROM t2.c2  
);  

3、

select c1,c2,count(*) cn from (select c1,c2,row(c3,c4) from tbl group by c1,c2,row(c3,c4)) t group by c1,c2;  

PostgreSQL distinct 语句的算法

目前PostgreSQL 求distinct仅支持groupAgg,从源码可以看到,是通过排序去重来实现的:

src/backend/executor/nodeAgg.c

 *        If a normal aggregate call specifies DISTINCT or ORDER BY, we sort the  
 *        input tuples and eliminate duplicates (if required) before performing  
 *        the above-depicted process.  (However, we don't do that for ordered-set  
 *        aggregates; their "ORDER BY" inputs are ordinary aggregate arguments  
 *        so far as this module is concerned.)  Note that partial aggregation  
 *        is not supported in these cases, since we couldn't ensure global  
 *        ordering or distinctness of the inputs.  
        Tuplesortstate **sortstates;    /* sort objects, if DISTINCT or ORDER BY */  
  
/*  
 * Run the transition function for a DISTINCT or ORDER BY aggregate  
 * with only one input.  This is called after we have completed  
 * entering all the input values into the sort object.  We complete the  
 * sort, read out the values in sorted order, and run the transition  
 * function on each value (applying DISTINCT if appropriate).  
 *  
 * Note that the strictness of the transition function was checked when  
 * entering the values into the sort, so we don't check it again here;  
 * we just apply standard SQL DISTINCT logic.  
 *  
 * The one-input case is handled separately from the multi-input case  
 * for performance reasons: for single by-value inputs, such as the  
 * common case of count(distinct id), the tuplesort_getdatum code path  
 * is around 300% faster.  (The speedup for by-reference types is less  
 * but still noticeable.)  
 *  
 * This function handles only one grouping set (already set in  
 * aggstate->current_set).  
 *  
 * When called, CurrentMemoryContext should be the per-query context.  
 */  
static void  
process_ordered_aggregate_single(AggState *aggstate,  
                                                                 AggStatePerTrans pertrans,  
                                                                 AggStatePerGroup pergroupstate)  
{  
  
  
  
/*  
 * Run the transition function for a DISTINCT or ORDER BY aggregate  
 * with more than one input.  This is called after we have completed  
 * entering all the input values into the sort object.  We complete the  
 * sort, read out the values in sorted order, and run the transition  
 * function on each value (applying DISTINCT if appropriate).  
 *  
 * This function handles only one grouping set (already set in  
 * aggstate->current_set).  
 *  
 * When called, CurrentMemoryContext should be the per-query context.  
 */  
static void  
process_ordered_aggregate_multi(AggState *aggstate,  
                                                                AggStatePerTrans pertrans,  
                                                                AggStatePerGroup pergroupstate)  
{  

执行计划如下,排序后,走GroupAggregate的计划。

postgres=#  explain (verbose,summary) select c1,c2,count(distinct c3),count(distinct c4),count(distinct (c3,c4)) from tbl group by c1,c2;  
                                      QUERY PLAN                                         
---------------------------------------------------------------------------------------  
 GroupAggregate  (cost=1407453.56..1496253.56 rows=555000 width=32)  
   Output: c1, c2, count(DISTINCT c3), count(DISTINCT c4), count(DISTINCT ROW(c3, c4))  
   Group Key: tbl.c1, tbl.c2  
   ->  Sort  (cost=1407453.56..1421328.56 rows=5550000 width=16)  
         Output: c1, c2, c3, c4  
         Sort Key: tbl.c1, tbl.c2  
         ->  Seq Scan on public.tbl  (cost=0.00..596041.00 rows=5550000 width=16)  
               Output: c1, c2, c3, c4  
 Planning time: 0.110 ms  
(9 rows)  

如果要让PostgreSQL求distinct走hashAgg,需要换SQL写法,后面提到。

Greenplum distinct 语句的PLAN

Greenplum则同时支持hashAgg和groupAgg求distinct。

1、hashagg

postgres=# explain analyze select c1,c2,count(distinct c3) from tbl group by c1,c2;  
                                                                             QUERY PLAN                                                                               
--------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice2; segments: 48)  (cost=2748912.00..2761424.50 rows=1001000 width=16)  
   Rows out:  1002001 rows at destination with 5071 ms to end, start offset by 1.784 ms.  
   ->  HashAggregate  (cost=2748912.00..2761424.50 rows=20855 width=16)  
         Group By: partial_aggregation.c1, partial_aggregation.c2  
         Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.004 ms to first row, 223 ms to end, start offset by 4.338 ms.  
         ->  HashAggregate  (cost=2448912.00..2573912.00 rows=208334 width=12)  
               Group By: tbl.c1, tbl.c2, tbl.c3  
               Rows out:  Avg 1320761.3 rows x 48 workers.  Max 1323529 rows (seg9) with 0.002 ms to first row, 3120 ms to end, start offset by 4.491 ms.  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=2048912.00..2248912.00 rows=208334 width=12)  
                     Hash Key: tbl.c1, tbl.c2  
                     Rows out:  Avg 2061921.2 rows x 48 workers at destination.  Max 2066345 rows (seg31) with 1229 ms to end, start offset by 59 ms.  
                     ->  HashAggregate  (cost=2048912.00..2048912.00 rows=208334 width=12)  
                           Group By: tbl.c1, tbl.c2, tbl.c3  
                           Rows out:  Avg 2061921.2 rows x 48 workers.  Max 2062196 rows (seg24) with 0.006 ms to first row, 1706 ms to end, start offset by 59 ms.  
                           ->  Append-only Columnar Scan on tbl  (cost=0.00..1048912.00 rows=2083334 width=12)  
                                 Rows out:  0 rows (seg0) with 39 ms to end, start offset by 56 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 359K bytes.  
   (slice1)    Executor memory: 1053K bytes avg x 48 workers, 1053K bytes max (seg0).  
   (slice2)    Executor memory: 396K bytes avg x 48 workers, 396K bytes max (seg0).  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=off; enable_seqscan=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 5106.665 ms  
(25 rows)  

2、groupagg

先按distinct字段重分布,使用groupagg得到结果

然后按分组字段重分布,再次得到groupagg结果

这个分布式执行计划有点问题,理论上可以直接按分组字段重分布,然后进行groupagg。

postgres=# set enable_hashagg =off;  
SET  
  
postgres=# explain analyze select c1,c2,count(distinct c3) from tbl group by c1,c2;  
                                                                               QUERY PLAN                                                                                  
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice3; segments: 48)  (cost=23755578.10..23788110.60 rows=1001000 width=16)  
   Rows out:  1002001 rows at destination with 13064 ms to end, start offset by 245 ms.  
   ->  GroupAggregate  (cost=23755578.10..23788110.60 rows=20855 width=16)  
         Group By: tbl.c1, tbl.c2  
         Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.002 ms to first row, 208 ms to end, start offset by 250 ms.  
         ->  Sort  (cost=23755578.10..23758080.60 rows=20855 width=16)  
               Sort Key: tbl.c1, tbl.c2  
               Rows out:  Avg 727938.4 rows x 48 workers.  Max 729557 rows (seg1) with 0 ms to end, start offset by 247 ms.  
               Executor memory:  46266K bytes avg, 46266K bytes max (seg0).  
               Work_mem used:  46266K bytes avg, 46266K bytes max (seg0). Workfile: (48 spilling, 0 reused)  
               Work_mem wanted: 62546K bytes avg, 62686K bytes max (seg1) to lessen workfile I/O affecting 48 workers.  
               ->  Redistribute Motion 48:48  (slice2; segments: 48)  (cost=22623280.88..23655813.38 rows=20855 width=16)  
                     Hash Key: tbl.c1, tbl.c2  
                     Rows out:  Avg 727938.4 rows x 48 workers at destination.  Max 729557 rows (seg1) with 12518 ms to end, start offset by 247 ms.  
                     ->  GroupAggregate  (cost=22623280.88..23635793.38 rows=20855 width=16)  
                           Group By: tbl.c1, tbl.c2  
                           Rows out:  Avg 852220.6 rows x 41 workers.  Max 983342 rows (seg9) with 0.003 ms to first row, 2574 ms to end, start offset by 250 ms.  
                           ->  Sort  (cost=22623280.88..22873280.88 rows=2083334 width=12)  
                                 Sort Key: tbl.c1, tbl.c2  
                                 Rows out:  Avg 2439024.4 rows x 41 workers.  Max 4003392 rows (seg44) with 0.001 ms to end, start offset by 257 ms.  
                                 Executor memory:  37148K bytes avg, 43851K bytes max (seg0).  
                                 Work_mem used:  37148K bytes avg, 43851K bytes max (seg0). Workfile: (40 spilling, 0 reused)  
                                 Work_mem wanted: 137587K bytes avg, 221435K bytes max (seg44) to lessen workfile I/O affecting 40 workers.  
                                 ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..3048912.00 rows=2083334 width=12)  
                                       Hash Key: tbl.c3  
                                       Rows out:  Avg 2439024.4 rows x 41 workers at destination.  Max 4003392 rows (seg44) with 8081 ms to end, start offset by 257 ms.  
                                       ->  Append-only Columnar Scan on tbl  (cost=0.00..1048912.00 rows=2083334 width=12)  
                                             Rows out:  0 rows (seg0) with 23 ms to end, start offset by 256 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 362K bytes.  
   (slice1)    Executor memory: 1489K bytes avg x 48 workers, 1489K bytes max (seg0).  
   (slice2)  * Executor memory: 38329K bytes avg x 48 workers, 45109K bytes max (seg0).  Work_mem: 43851K bytes max, 221435K bytes wanted.  
   (slice3)  * Executor memory: 46597K bytes avg x 48 workers, 46597K bytes max (seg0).  Work_mem: 46266K bytes max, 62686K bytes wanted.  
 Statement statistics:  
   Memory used: 128000K bytes  
   Memory wanted: 664802K bytes  
 Settings:  enable_bitmapscan=off; enable_hashagg=off; enable_seqscan=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 13318.578 ms  
(39 rows)  

对于不需要重分布的表(当group字段与分布键一致),不会有执行计划问题:

优先选择了groupagg

postgres=# explain analyze select c1,c2,count(distinct c3) from tbl1 group by c1,c2;  
                                                                 QUERY PLAN                                                                    
---------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice1; segments: 48)  (cost=20623288.88..21635826.40 rows=1003002 width=16)  
   Rows out:  1002001 rows at destination with 6896 ms to end, start offset by 1.285 ms.  
   ->  GroupAggregate  (cost=20623288.88..21635826.40 rows=20896 width=16)  
         Group By: c1, c2  
         Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.003 ms to first row, 995 ms to end, start offset by 39 ms.  
         ->  Sort  (cost=20623288.88..20873288.88 rows=2083334 width=12)  
               Sort Key: c1, c2  
               Rows out:  Avg 2083333.3 rows x 48 workers.  Max 2087802 rows (seg31) with 0.002 ms to end, start offset by 38 ms.  
               Executor memory:  67386K bytes avg, 67386K bytes max (seg0).  
               Work_mem used:  67386K bytes avg, 67386K bytes max (seg0). Workfile: (48 spilling, 0 reused)  
               Work_mem wanted: 130193K bytes avg, 130472K bytes max (seg31) to lessen workfile I/O affecting 48 workers.  
               ->  Append-only Columnar Scan on tbl1  (cost=0.00..1048920.00 rows=2083334 width=12)  
                     Rows out:  0 rows (seg0) with 5555 ms to end, start offset by 38 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 347K bytes.  
   (slice1)  * Executor memory: 67984K bytes avg x 48 workers, 67984K bytes max (seg0).  Work_mem: 67386K bytes max, 130472K bytes wanted.  
 Statement statistics:  
   Memory used: 128000K bytes  
   Memory wanted: 261142K bytes  
 Settings:  enable_bitmapscan=off; enable_hashagg=on; enable_seqscan=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 6897.348 ms  
(22 rows)  

Greenplum 通过开关,可以打开控制使用hashagg后groupagg,实际上还是hashagg更快。

postgres=# set enable_groupagg =off;  
SET  
postgres=# set enable_hashagg =on;  
SET  
  
postgres=# explain analyze select c1,c2,count(distinct c3) from tbl1 group by c1,c2;  
                                                                         QUERY PLAN                                                                            
-------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice1; segments: 48)  (cost=2548920.00..2561457.52 rows=1003002 width=16)  
   Rows out:  1002001 rows at destination with 3002 ms to end, start offset by 1.252 ms.  
   ->  HashAggregate  (cost=2548920.00..2561457.52 rows=20896 width=16)  
         Group By: partial_aggregation.c1, partial_aggregation.c2  
         Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.005 ms to first row, 140 ms to end, start offset by 52 ms.  
         ->  HashAggregate  (cost=2248920.00..2373920.00 rows=208334 width=12)  
               Group By: tbl1.c1, tbl1.c2, tbl1.c3  
               Rows out:  Avg 1320761.3 rows x 48 workers.  Max 1323529 rows (seg9) with 0.004 ms to first row, 875 ms to end, start offset by 15 ms.  
               ->  HashAggregate  (cost=2048920.00..2048920.00 rows=208334 width=12)  
                     Group By: tbl1.c1, tbl1.c2, tbl1.c3  
                     Rows out:  Avg 1320761.3 rows x 48 workers.  Max 1323529 rows (seg9) with 0.004 ms to first row, 1479 ms to end, start offset by 15 ms.  
                     ->  Append-only Columnar Scan on tbl1  (cost=0.00..1048920.00 rows=2083334 width=12)  
                           Rows out:  0 rows (seg0) with 48 ms to end, start offset by 49 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 347K bytes.  
   (slice1)    Executor memory: 598K bytes avg x 48 workers, 598K bytes max (seg0).  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=off; enable_groupagg=off; enable_hashagg=on; enable_seqscan=off; enable_sort=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 3060.036 ms  
(21 rows)  

PostgreSQL distinct 的优化

为了让PostgreSQL 求distinct使用hashagg,目前可以修改SQL来实现。(将来的PostgreSQL版本,理论上通过sql rewrite,很容易实现distinct SQL的hashagg)

postgres=# set work_mem='32GB';  
SET  
  
postgres=# explain select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2;  
                                QUERY PLAN                                   
---------------------------------------------------------------------------  
 HashAggregate  (cost=652928.50..653328.50 rows=40000 width=16)  
   Group Key: tbl.c1, tbl.c2  
   ->  HashAggregate  (cost=637666.00..643216.00 rows=555000 width=12)  
         Group Key: tbl.c1, tbl.c2, tbl.c3  
         ->  Seq Scan on tbl  (cost=0.00..596041.00 rows=5550000 width=12)  
(5 rows)  

并行计算

Greenplum就不用说了,已经是MPP的架构,对于这类AP查询,性能非常卓越。

PostgreSQL 也支持并行计算,无论是hashagg还是groupagg,但是目前这两块的优化器执行器还可以改进,目前没有很好的发挥并行计算的能力。

postgres=# explain select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2;  
                                              QUERY PLAN                                                
------------------------------------------------------------------------------------------------------  
 GroupAggregate  (cost=888153.09..1057837.13 rows=40000 width=16)  
   Group Key: tbl.c1, tbl.c2  
   ->  Group  (cost=888153.09..1047724.63 rows=555000 width=12)  
         Group Key: tbl.c1, tbl.c2, tbl.c3  
         ->  Gather Merge  (cost=888153.09..1039399.63 rows=1110000 width=12)  
               Workers Planned: 2  
               ->  Group  (cost=887153.07..910278.07 rows=555000 width=12)  
                     Group Key: tbl.c1, tbl.c2, tbl.c3  
                     ->  Sort  (cost=887153.07..892934.32 rows=2312500 width=12)  
                           Sort Key: tbl.c1, tbl.c2, tbl.c3  
                           ->  Parallel Seq Scan on tbl  (cost=0.00..563666.00 rows=2312500 width=12)  
(11 rows)  
postgres=# explain select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2;  
                                       QUERY PLAN                                          
-----------------------------------------------------------------------------------------  
 HashAggregate  (cost=600203.50..600603.50 rows=40000 width=16)  
   Group Key: tbl.c1, tbl.c2  
   ->  HashAggregate  (cost=584941.00..590491.00 rows=555000 width=12)  
         Group Key: tbl.c1, tbl.c2, tbl.c3  
         ->  Gather  (cost=0.00..543316.00 rows=5550000 width=12)  
               Workers Planned: 20  
               ->  Parallel Seq Scan on tbl  (cost=0.00..543316.00 rows=277500 width=12)  
(7 rows)  

一个SQL多个求distinct

一个SQL中,包含多个distinct时,优化器是如何执行的呢?

实际上跑了两次分组聚合,如下:

postgres=# explain analyze select c1,c2,count(distinct c3),count(distinct c4) from tbl group by c1,c2;  
                                                                                   QUERY PLAN                                                                                     
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice3; segments: 48)  (cost=5647824.00..5707884.00 rows=1001000 width=32)  
   Rows out:  1002001 rows at destination with 9598 ms to end, start offset by 2.272 ms.  
   ->  Hash Join  (cost=5647824.00..5707884.00 rows=20855 width=32)  
         Hash Cond: NOT dqa_coplan_1.c1 IS DISTINCT FROM dqa_coplan_2.c1 AND NOT dqa_coplan_1.c2 IS DISTINCT FROM dqa_coplan_2.c2  
         Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.015 ms to first row, 1584 ms to end, start offset by 26 ms.  
         Executor memory:  816K bytes avg, 817K bytes max (seg11).  
         Work_mem used:  816K bytes avg, 817K bytes max (seg11). Workfile: (0 spilling, 0 reused)  
         ->  HashAggregate  (cost=2823912.00..2838927.00 rows=20855 width=16)  
               Group By: partial_aggregation.c1, partial_aggregation.c2  
               Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.004 ms to first row, 262 ms to end, start offset by 27 ms.  
               ->  HashAggregate  (cost=2473912.00..2623912.00 rows=208334 width=12)  
                     Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c3  
                     Rows out:  Avg 1320761.3 rows x 48 workers.  Max 1323529 rows (seg9) with 0.001 ms to first row, 2778 ms to end, start offset by 27 ms.  
                     ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=2048912.00..2248912.00 rows=208334 width=12)  
                           Hash Key: postgres.tbl.c1, postgres.tbl.c2  
                           Rows out:  Avg 2061921.2 rows x 48 workers at destination.  Max 2066345 rows (seg31) with 0.003 ms to end, start offset by 49 ms.  
                           ->  HashAggregate  (cost=2048912.00..2048912.00 rows=208334 width=12)  
                                 Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c3  
                                 Rows out:  Avg 2061921.2 rows x 48 workers.  Max 2062196 rows (seg24) with 0.003 ms to first row, 2958 ms to end, start offset by 86 ms.  
                                 ->  Append-only Columnar Scan on tbl  (cost=0.00..1048912.00 rows=2083334 width=16)  
                                       Rows out:  0 rows (seg0) with 76 ms to end, start offset by 128 ms.  
         ->  Hash  (cost=2848937.00..2848937.00 rows=20855 width=16)  
               Rows in:  (No row requested) 0 rows (seg0) with 0 ms to end.  
               ->  HashAggregate  (cost=2823912.00..2838927.00 rows=20855 width=16)  
                     Group By: partial_aggregation.c1, partial_aggregation.c2  
                     Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.004 ms to first row, 227 ms to end, start offset by 27 ms.  
                     ->  HashAggregate  (cost=2473912.00..2623912.00 rows=208334 width=12)  
                           Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c4  
                           Rows out:  Avg 1320773.6 rows x 48 workers.  Max 1323487 rows (seg9) with 0.001 ms to first row, 3916 ms to end, start offset by 27 ms.  
                           ->  Redistribute Motion 48:48  (slice2; segments: 48)  (cost=2048912.00..2248912.00 rows=208334 width=12)  
                                 Hash Key: postgres.tbl.c1, postgres.tbl.c2  
                                 Rows out:  Avg 2061913.9 rows x 48 workers at destination.  Max 2066340 rows (seg31) with 284 ms to end, start offset by 49 ms.  
                                 ->  HashAggregate  (cost=2048912.00..2048912.00 rows=208334 width=12)  
                                       Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c4  
                                       Rows out:  Avg 2061913.9 rows x 48 workers.  Max 2062167 rows (seg20) with 0.005 ms to first row, 3343 ms to end, start offset by 50 ms.  
                                       ->  Append-only Columnar Scan on tbl  (cost=0.00..1048912.00 rows=2083334 width=16)  
                                             Rows out:  0 rows (seg0) with 75 ms to end, start offset by 131 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 490K bytes.  
   (slice1)    Executor memory: 1213K bytes avg x 48 workers, 1213K bytes max (seg0).  
   (slice2)    Executor memory: 1213K bytes avg x 48 workers, 1213K bytes max (seg0).  
   (slice3)    Executor memory: 497K bytes avg x 48 workers, 497K bytes max (seg0).  Work_mem: 817K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=off; enable_seqscan=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 9612.938 ms  
(47 rows)  

相当于以下SQL:

select t1.c1, t1.c2, t1.cn as c3, t2.cn as c4 from  
(select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2) t1  
join  
(select c1,c2,count(*) cn from (select c1,c2,c4 from tbl group by c1,c2,c4) t group by c1,c2) t2  
on (NOT t1.c1 IS DISTINCT FROM t2.c1 AND NOT t1.c2 IS DISTINCT FROM t2.c2);  

执行计划:

postgres=# explain analyze select t1.c1, t1.c2, t1.cn as c3, t2.cn as c4 from  
(select c1,c2,count(*) cn from (select c1,c2,c3 from tbl group by c1,c2,c3) t group by c1,c2) t1  
join  
(select c1,c2,count(*) cn from (select c1,c2,c4 from tbl group by c1,c2,c4) t group by c1,c2) t2  
on (NOT t1.c1 IS DISTINCT FROM t2.c1 AND NOT t1.c2 IS DISTINCT FROM t2.c2);  
                                                                                             QUERY PLAN                                                                                               
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 48:1  (slice5; segments: 48)  (cost=0.00..21231.04 rows=316722656 width=24)  
   Rows out:  1002001 rows at destination with 12721 ms to end, start offset by 1253 ms.  
   ->  Hash Join  (cost=0.00..3782.79 rows=6598389 width=24)  
         Hash Cond: NOT postgres.tbl.c1 IS DISTINCT FROM postgres.tbl.c1 AND NOT postgres.tbl.c2 IS DISTINCT FROM postgres.tbl.c2  
         Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.042 ms to first row, 9546 ms to end, start offset by 1272 ms.  
         Executor memory:  816K bytes avg, 817K bytes max (seg11).  
         Work_mem used:  816K bytes avg, 817K bytes max (seg11). Workfile: (0 spilling, 0 reused)  
         ->  HashAggregate  (cost=0.00..1608.86 rows=11731 width=16)  
               Group By: postgres.tbl.c1, postgres.tbl.c2  
               Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.001 ms to first row, 929 ms to end, start offset by 1272 ms.  
               ->  Redistribute Motion 48:48  (slice2; segments: 48)  (cost=0.00..1605.90 rows=11731 width=16)  
                     Hash Key: postgres.tbl.c1, postgres.tbl.c2  
                     Rows out:  Avg 761778.0 rows x 48 workers at destination.  Max 763236 rows (seg1) with 0.004 ms to end, start offset by 1260 ms.  
                     ->  Result  (cost=0.00..1605.31 rows=11731 width=16)  
                           Rows out:  Avg 761778.0 rows x 48 workers.  Max 762979 rows (seg3) with 0.007 ms to first row, 39 ms to end, start offset by 1346 ms.  
                           ->  HashAggregate  (cost=0.00..1605.31 rows=11731 width=16)  
                                 Group By: postgres.tbl.c1, postgres.tbl.c2  
                                 Rows out:  Avg 761778.0 rows x 48 workers.  Max 762979 rows (seg3) with 0.004 ms to first row, 893 ms to end, start offset by 1346 ms.  
                                 ->  HashAggregate  (cost=0.00..1481.27 rows=499828 width=8)  
                                       Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c3  
                                       Rows out:  Avg 1320761.3 rows x 48 workers.  Max 1322202 rows (seg3) with 0.003 ms to first row, 6861 ms to end, start offset by 1346 ms.  
                                       ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..1297.74 rows=499828 width=12)  
                                             Hash Key: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c3  
                                             Rows out:  Avg 2061921.2 rows x 48 workers at destination.  Max 2064440 rows (seg9) with 1672 ms to end, start offset by 1319 ms.  
                                             ->  HashAggregate  (cost=0.00..1278.97 rows=499828 width=12)  
                                                   Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c3  
                                                   Rows out:  Avg 2061921.2 rows x 48 workers.  Max 2062196 rows (seg24) with 3.151 ms to first row, 3654 ms to end, start offset by 1354 ms.  
                                                   ->  Table Scan on tbl  (cost=0.00..465.38 rows=2083334 width=12)  
                                                         Rows out:  0 rows (seg0) with 86 ms to end, start offset by 1464 ms.  
         ->  Hash  (cost=1608.86..1608.86 rows=11731 width=16)  
               Rows in:  (No row requested) 0 rows (seg0) with 0 ms to end.  
               ->  HashAggregate  (cost=0.00..1608.86 rows=11731 width=16)  
                     Group By: postgres.tbl.c1, postgres.tbl.c2  
                     Rows out:  Avg 20875.0 rows x 48 workers.  Max 20914 rows (seg11) with 0.003 ms to first row, 1698 ms to end, start offset by 1272 ms.  
                     ->  Redistribute Motion 48:48  (slice4; segments: 48)  (cost=0.00..1605.90 rows=11731 width=16)  
                           Hash Key: postgres.tbl.c1, postgres.tbl.c2  
                           Rows out:  Avg 761897.3 rows x 48 workers at destination.  Max 763444 rows (seg26) with 94 ms to end, start offset by 1316 ms.  
                           ->  Result  (cost=0.00..1605.31 rows=11731 width=16)  
                                 Rows out:  Avg 761897.3 rows x 48 workers.  Max 762689 rows (seg21) with 0.008 ms to first row, 27 ms to end, start offset by 1316 ms.  
                                 ->  HashAggregate  (cost=0.00..1605.31 rows=11731 width=16)  
                                       Group By: postgres.tbl.c1, postgres.tbl.c2  
                                       Rows out:  Avg 761897.3 rows x 48 workers.  Max 762689 rows (seg21) with 0.004 ms to first row, 1592 ms to end, start offset by 1316 ms.  
                                       ->  HashAggregate  (cost=0.00..1481.27 rows=499828 width=8)  
                                             Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c4  
                                             Rows out:  Avg 1320773.6 rows x 48 workers.  Max 1322071 rows (seg20) with 0.004 ms to first row, 6853 ms to end, start offset by 1330 ms.  
                                             ->  Redistribute Motion 48:48  (slice3; segments: 48)  (cost=0.00..1297.74 rows=499828 width=12)  
                                                   Hash Key: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c4  
                                                   Rows out:  Avg 2061913.9 rows x 48 workers at destination.  Max 2064793 rows (seg20) with 1657 ms to end, start offset by 1330 ms.  
                                                   ->  HashAggregate  (cost=0.00..1278.97 rows=499828 width=12)  
                                                         Group By: postgres.tbl.c1, postgres.tbl.c2, postgres.tbl.c4  
                                                         Rows out:  Avg 2061913.9 rows x 48 workers.  Max 2062167 rows (seg20) with 0.005 ms to first row, 3001 ms to end, start offset by 1460 ms.  
                                                         ->  Table Scan on tbl  (cost=0.00..465.38 rows=2083334 width=12)  
                                                               Rows out:  0 rows (seg0) with 85 ms to end, start offset by 1479 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 507K bytes.  
   (slice1)    Executor memory: 1253K bytes avg x 48 workers, 1253K bytes max (seg0).  
   (slice2)    Executor memory: 1387K bytes avg x 48 workers, 1387K bytes max (seg0).  
   (slice3)    Executor memory: 1253K bytes avg x 48 workers, 1253K bytes max (seg0).  
   (slice4)    Executor memory: 1387K bytes avg x 48 workers, 1387K bytes max (seg0).  
   (slice5)    Executor memory: 561K bytes avg x 48 workers, 561K bytes max (seg0).  Work_mem: 817K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=off; enable_seqscan=off; optimizer=on  
 Optimizer status: PQO version 1.602  
 Total runtime: 13975.507 ms  
(65 rows)  

在PostgreSQL的执行计划中,隐藏了两步分组聚合。

postgres=# explain verbose select c1,c2,count(distinct c3),count(distinct c4) from tbl group by c1,c2;  
                                    QUERY PLAN                                      
----------------------------------------------------------------------------------  
 GroupAggregate  (cost=1217753.56..1292678.56 rows=555000 width=24)  
   Output: c1, c2, count(DISTINCT c3), count(DISTINCT c4)  
   Group Key: tbl.c1, tbl.c2  
   ->  Sort  (cost=1217753.56..1231628.56 rows=5550000 width=16)  
         Output: c1, c2, c3, c4  
         Sort Key: tbl.c1, tbl.c2  
         ->  Seq Scan on public.tbl  (cost=0.00..596041.00 rows=5550000 width=16)  
               Output: c1, c2, c3, c4  
(8 rows)  

小结

数据库中有两种手段支持求distinct:

1、HashAgg,无需排序,数据量较大时需要较大work_mem

2、GroupAgg,需要排序

数据量越大,HashAgg效果越明显。

目前PostgreSQL需要改写SQL来对求distinct实现HashAgg。Greenplum直接支持两种。

Flag Counter

digoal’s 大量PostgreSQL文章入口