Greenplum支持人为多阶段聚合的方法 - 直连segment(PGOPTIONS=’-c gp_session_role=utility’) Or gp_dist_random(‘gp_id’) Or 多阶段聚合 prefunc

7 minute read

背景

聚合操作是分析型场景中最常见的需求之一,在Greenplum中,数据已分布存储,聚合操作需要多阶段执行。

实际上PostgreSQL 9.6开始支持并行聚合后,聚合的方法也与分布式数据库的多阶段聚合类似。

创建聚合函数时,必须实现多阶段的API才能够让聚合操作真正的并行起来。

有些插件可能因为某些原因没有实现多阶段聚合

《Greenplum roaring bitmap与业务场景 (类阿里云RDS PG varbitx, 应用于海量用户 实时画像和圈选、透视)》

还有什么方法可以让聚合并行起来呢?

1、gp_dist_random(‘gp_id’)

2、直连segment

3、gpdb mapreduce接口

1 gp_dist_random(‘gp_id’)

PG内部有一个函数接口,优化器会将调用这个函数接口的QUERY弄到SEGMENT直接执行。

统计数据库大小时也用到了

select sum(pg_database_size('%s'))::int8 from gp_dist_random('gp_id');    

源码如下

Datum      
pg_database_size_name(PG_FUNCTION_ARGS)      
{      
        int64           size = 0;      
        Name            dbName = PG_GETARG_NAME(0);      
        Oid                     dbOid = get_database_oid(NameStr(*dbName));      
    
        if (!OidIsValid(dbOid))      
                ereport(ERROR,      
                                (errcode(ERRCODE_UNDEFINED_DATABASE),      
                                 errmsg("database \"%s\" does not exist",      
                                                NameStr(*dbName))));      
    
        size = calculate_database_size(dbOid);      
    
        if (Gp_role == GP_ROLE_DISPATCH)      
        {      
                StringInfoData buffer;      
    
                initStringInfo(&buffer);      
    
                appendStringInfo(&buffer, "select sum(pg_database_size('%s'))::int8 from gp_dist_random('gp_id');", NameStr(*dbName));      
    
                size += get_size_from_segDBs(buffer.data);      
        }      
    
        PG_RETURN_INT64(size);      
}      

1、以rb插件为例,当前的聚合是单阶段聚合,收到MASTER后才开始聚合,所以很慢。

test=# explain select rb_and_cardinality_agg(b) from testpay1;  
                                           QUERY PLAN                                              
-------------------------------------------------------------------------------------------------  
 Aggregate  (cost=908857.80..908857.81 rows=1 width=4)  
   ->  Gather Motion 256:1  (slice1; segments: 256)  (cost=0.00..907979.68 rows=351246 width=37)  
         ->  Seq Scan on testpay1  (cost=0.00..5277.46 rows=1373 width=37)  
 Settings:  effective_cache_size=8GB; gp_statistics_use_fkeys=on  
 Optimizer status: legacy query optimizer  
(5 rows)  

2、为了使用gp_dist_random(‘gp_id’)来实现并行多阶段聚合,我们需要定义一个函数接口,让这个接口来执行prefunc的动作,比如先在SEGMENT聚合一次。

test=> create or replace function get_rb(v_sql text) returns roaringbitmap as $$  
declare  
  res roaringbitmap;  
begin  
  execute v_sql into res;  
  return res;  
end;  
$$ language plpgsql strict;  
CREATE FUNCTION  

3、但是很遗憾的是,GPDB内部做了保护,如果UDF里面访问的表不是系统表(replication table,或者叫全副本表,非分布式表),数据库会拒绝直接在segment访问。

(目标:rb_and_agg在所有的segment直接执行,返回rb类型,然后再返回给MASTER,执行candidate操作。实现并行)

test=> explain analyze select get_rb($$select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')$$) from gp_dist_random('gp_id');  
NOTICE:  function cannot execute on segment because it accesses relation "public.testpay1" (functions.c:155)  (seg3 slice1 11.180.113.94:3068 pid=54354) (cdbdisp.c:1326)  
DETAIL:    
SQL statement "select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')"  
PL/pgSQL function "get_rb" line 4 at execute statement  
  
  
test=> explain analyze select rb_and_cardinality_agg(get_rb($$select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')$$)) from gp_dist_random('gp_id');  
NOTICE:  query plan with multiple segworker groups is not supported (cdbdisp.c:302)  
HINT:  likely caused by a function that reads or modifies data in a distributed table  
CONTEXT:  SQL statement "select RB_AND_AGG(b) from public.testpay1 where a in ('3y','10y')"  
PL/pgSQL function "get_rb" line 4 at execute statement  

4、而如果UDF里面访问的是系统表(replication table,或者叫全副本表,非分布式表),数据库允许直接在segment访问。

create or replace function get_catalog(v_sql text) returns int8 as $$  
declare  
  res int8;  
begin  
  execute v_sql into res;  
  return res;  
end;  
$$ language plpgsql strict;  
test=> explain analyze select get_catalog($$select max(oid::int8) from pg_class$$) from gp_dist_random('gp_id');  
                                                                QUERY PLAN                                                                  
------------------------------------------------------------------------------------------------------------------------------------------  
 Gather Motion 256:1  (slice1; segments: 256)  (cost=0.00..1.01 rows=1 width=0)  
   Rows out:  256 rows at destination with 2.887 ms to first row, 6.589 ms to end, start offset by 1.203 ms.  
   ->  Seq Scan on gp_id  (cost=0.00..1.01 rows=1 width=0)  
         Rows out:  Avg 1.0 rows x 256 workers.  Max 1 rows (seg0) with 1.243 ms to first row, 1.244 ms to end, start offset by 3.534 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 495K bytes.  
   (slice1)    Executor memory: 139K bytes avg x 256 workers, 139K bytes max (seg0).  
 Statement statistics:  
   Memory used: 2047000K bytes  
 Settings:  effective_cache_size=8GB; gp_statistics_use_fkeys=on  
 Optimizer status: legacy query optimizer  
 Total runtime: 8.015 ms  
(12 rows)  

5、保护代码如下

src/backend/executor/functions.c

    110 /**  
    111  * Walker for querytree_safe_for_segment.   
    112  */  
    113 bool querytree_safe_for_segment_walker(Node *expr, void *context)  
    114 {  
    115         Assert(context == NULL);  
    116           
    117         if (!expr)  
    118         {  
    119                 /**  
    120                  * Do not end recursion just because we have reached one leaf node.  
    121                  */  
    122                 return false;  
    123         }  
    124   
    125         switch(nodeTag(expr))  
    126         {  
    127                 case T_Query:  
    128                         {  
    129                                 Query *q = (Query *) expr;  
    130                                   
    131                                 if (!allow_segment_DML &&  
    132                                         (q->commandType != CMD_SELECT  
    133                                          || q->intoClause != NULL  
    134                                          || q->resultRelation > 0))  
    135                                 {  
    136                                         ereport(ERROR,  
    137                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  
    138                                                          errmsg("function cannot execute on segment because it issues a non-SELECT statement")));  
    139                                 }  
    140                                   
    141                                 ListCell * f = NULL;  
    142                                 foreach(f,q->rtable)  
    143                                 {  
    144                                         RangeTblEntry *rte = (RangeTblEntry *) lfirst(f);  
    145   
    146                                         if (rte->rtekind == RTE_RELATION)  
    147                                         {  
    148                                                 Assert(rte->relid != InvalidOid);  
    149                                                   
    150                                                 Oid namespaceId = get_rel_namespace(rte->relid);  
    151   
    152                                                 Assert(namespaceId != InvalidOid);  
    153                                                   
    154                                                 if (!(IsSystemNamespace(namespaceId) ||  
    155                                                           IsToastNamespace(namespaceId) ||  
    156                                                           IsAoSegmentNamespace(namespaceId)))  
    157                                                 {  
    158                                                         ereport(ERROR,  
    159                                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  
    160                                                                          errmsg("function cannot execute on segment because it accesses relation \"%s.%s\"",  
    161                                                                                         quote_identifier(get_namespace_name(namespaceId)),  
    162                                                                                         quote_identifier(get_rel_name(rte->relid)))));  
    163                                                 }  
    164                                         }  
    165                                 }  
    166                                 query_tree_walker(q, querytree_safe_for_segment_walker, context, 0);  
    167                                 break;  
    168                         }  
    169                 default:  
    170                         break;  
    171         }  
    172           
    173         return expression_tree_walker(expr, querytree_safe_for_segment_walker, context);  
    174 }  

src/backend/cdb/dispatcher/cdbdisp.c

     36 /*  
     37  * cdbdisp_dispatchToGang:  
     38  * Send the strCommand SQL statement to the subset of all segdbs in the cluster  
     39  * specified by the gang parameter. cancelOnError indicates whether an error  
     40  * occurring on one of the qExec segdbs should cause all still-executing commands to cancel  
     41  * on other qExecs. Normally this would be true. The commands are sent over the libpq  
     42  * connections that were established during cdblink_setup.      They are run inside of threads.  
     43  * The number of segdbs handled by any one thread is determined by the  
     44  * guc variable gp_connections_per_thread.  
     45  *  
     46  * The caller must provide a CdbDispatchResults object having available  
     47  * resultArray slots sufficient for the number of QEs to be dispatched:  
     48  * i.e., resultCapacity - resultCount >= gp->size.      This function will  
     49  * assign one resultArray slot per QE of the Gang, paralleling the Gang's  
     50  * db_descriptors array. Success or failure of each QE will be noted in  
     51  * the QE's CdbDispatchResult entry; but before examining the results, the  
     52  * caller must wait for execution to end by calling CdbCheckDispatchResult().  
     53  *  
     54  * The CdbDispatchResults object owns some malloc'ed storage, so the caller  
     55  * must make certain to free it by calling cdbdisp_destroyDispatcherState().  
     56  *  
     57  * When dispatchResults->cancelOnError is false, strCommand is to be  
     58  * dispatched to every connected gang member if possible, despite any  
     59  * cancellation requests, QE errors, connection failures, etc.  
     60  *  
     61  * NB: This function should return normally even if there is an error.  
     62  * It should not longjmp out via elog(ERROR, ...), ereport(ERROR, ...),  
     63  * PG_THROW, CHECK_FOR_INTERRUPTS, etc.  
     64  */  
     65 void  
     66 cdbdisp_dispatchToGang(struct CdbDispatcherState *ds,  
     67                                            struct Gang *gp,  
     68                                            int sliceIndex,  
     69                                            CdbDispatchDirectDesc *disp_direct)  
     70 {  
     71         struct CdbDispatchResults *dispatchResults = ds->primaryResults;  
     72   
     73         Assert(Gp_role == GP_ROLE_DISPATCH);  
     74         Assert(gp && gp->size > 0);  
     75         Assert(dispatchResults && dispatchResults->resultArray);  
     76   
     77         if (dispatchResults->writer_gang)  
     78         {  
     79                 /*  
     80                  * Are we dispatching to the writer-gang when it is already busy ?  
     81                  */  
     82                 if (gp == dispatchResults->writer_gang)  
     83                 {  
     84                         if (dispatchResults->writer_gang->dispatcherActive)  
     85                         {  
     86                                 ereport(ERROR,  
     87                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),  
     88                                                  errmsg("query plan with multiple segworker groups is not supported"),  
     89                                                  errhint("likely caused by a function that reads or modifies data in a distributed table")));  
     90                         }  
     91   
     92                         dispatchResults->writer_gang->dispatcherActive = true;  
     93                 }  
     94         }  
     95   
     96         /*  
     97          * WIP: will use a function pointer for implementation later, currently just use an internal function to move dispatch  
     98          * thread related code into a separate file.  
     99          */  
    100         (pDispatchFuncs->dispatchToGang)(ds, gp, sliceIndex, disp_direct);  
    101 }  

6、如果你的GPDB没有RB插件,可以使用普通类型测试模拟这个问题

postgres=# create table test(id int, info text, crt_time timestamp);  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
CREATE TABLE  
  
postgres=# insert into test select id, md5(random()::text), clock_timestamp() from generate_series(1,1000000) t(id);  
INSERT 0 1000000  
  
  
create or replace function get_max(v_sql text) returns int as $$  
declare  
  res int;  
begin  
  execute v_sql into res;  
  return res;  
end;  
$$ language plpgsql strict;  
  
  
  
postgres=# \set VERBOSITY verbose  
postgres=# select get_max($$select max(id) from test$$) from gp_dist_random('gp_id');  
ERROR:  0A000: function cannot execute on segment because it accesses relation "public.test"  (seg0 slice1 127.0.0.1:25432 pid=1443)  
DETAIL:    
SQL statement "select max(id) from test"  
PL/pgSQL function "get_max" line 4 at EXECUTE statement  
LOCATION:  cdbdisp_finishCommand, cdbdisp.c:254  

7、用元数据欺骗不了GPDB,因为保护不是在元数据层面判断,而是在执行层面。

postgres=# create table tmp_gp_distribution_policy as select * from gp_distribution_policy where localoid='test'::regclass;  
NOTICE:  Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'localoid' as the Greenplum Database data distribution key for this table.  
HINT:  The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.  
SELECT 1  
  
postgres=# set allow_system_table_mods=DML;  
SET  
  
postgres=# delete from gp_distribution_policy where localoid='test'::regclass;  
DELETE 1  
  
依旧的错误  
  
postgres=# \set VERBOSITY verbose  
postgres=# select get_max($$select max(id) from test$$) from gp_dist_random('gp_id');  
ERROR:  0A000: function cannot execute on segment because it accesses relation "public.test"  (seg0 slice1 127.0.0.1:25432 pid=1443)  
DETAIL:    
SQL statement "select max(id) from test"  
PL/pgSQL function "get_max" line 4 at EXECUTE statement  
LOCATION:  cdbdisp_finishCommand, cdbdisp.c:254  

2 直连SEGMENT

《Greenplum segment节点直接读写配置与性能》

《Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式》

《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》

这个方法是可行的,不过过于麻烦,需要直连。

postgres=# select * from gp_segment_configuration where content<>'-1' and role='p';  
 dbid | content | role | preferred_role | mode | status | port  |        hostname         |  address  | replication_port   
------+---------+------+----------------+------+--------+-------+-------------------------+-----------+------------------  
    2 |       0 | p    | p              | s    | u      | 25432 | iZbp13nu0s9j3x3op4zpd4Z | localhost |                   
    3 |       1 | p    | p              | s    | u      | 25433 | iZbp13nu0s9j3x3op4zpd4Z | localhost |                   
(2 rows)  
PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25432   
digoal@iZbp13nu0s9j3x3op4zpd4Z-> PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25432 -c "select max(id) from test"  
   max     
---------  
 1000000  
(1 row)  
  
digoal@iZbp13nu0s9j3x3op4zpd4Z-> PGOPTIONS='-c gp_session_role=utility' psql -h iZbp13nu0s9j3x3op4zpd4Z -p 25433 -c "select max(id) from test"  
  max     
--------  
 999999  
(1 row)  
  
digoal@iZbp13nu0s9j3x3op4zpd4Z-> psql -c "select greatest(1000000,999999)"  
 greatest   
----------  
  1000000  
(1 row)  

小结

1、gp_dist_random(‘gp_id’) 的方法,因为内部做了保护,目前只使用与复制表,不适合分布式表。(用户感知)

2、使用直连SEGMENT的方法,可行,但是操作过于繁琐,而且需要用户直连SEGMENT。(用户感知)

3、最好的方法,依旧是聚合接口本身支持prefunc API,内部多阶段并行。(用户无感知)

参考

直连SEGMENT

《Greenplum segment节点直接读写配置与性能》

《Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式》

《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》

多阶段聚合

《PostgreSQL 11 preview - 多阶段并行聚合array_agg, string_agg》

《PostgreSQL 10 自定义并行计算聚合函数的原理与实践 - (含array_agg合并多个数组为单个一元数组的例子)》

《HybridDB PostgreSQL “Sort、Group、distinct 聚合、JOIN” 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合》

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《Postgres-XC customized aggregate introduction》

《PostgreSQL aggregate function customize》

《Greenplum roaring bitmap与业务场景 (类阿里云RDS PG varbitx, 应用于海量用户 实时画像和圈选、透视)》

Flag Counter

digoal’s 大量PostgreSQL文章入口