Postgres-XC customized aggregate introduction
背景
postgres-xc是基于PostgreSQL的分布式数据库,分布式数据库的聚合与单节点数据库的聚合不一样,需要先在数据节点聚合,然后在COORDINATOR节点完成第二阶段的聚合。
目前还不支持多阶段聚合(类似mapreduce)。
本文将介绍一下postgres-xc的聚合函数的概念和开发。
正文
Postgres-XC聚合与PostgreSQL的聚合有一定的区别, 因为Postgres-XC的数据存储在datanode, 聚合时数据可能分布在多个datanode上.
Postgres-XC支持传统的聚合方法, 聚合操作可以将数据从所有的数据节点传到coordinator节点后, 在coordinator节点进行聚合. 但是这种方法对于数据量较大的情况效率明显偏低.
Postgres-XC还支持另一种聚合方式, 就是数据在各自的datanode执行, 形成结果后, 将datanode聚合的结果传输到coordinator节点再次聚合.
如下 :
sfunc( internal-state, next-data-values ) ---> next-internal-state # 这个过程是在datanode节点完成的. input 是该datanode节点上的所有行(一次1行的进行调用).
cfunc( internal-state, internal-state ) ---> next-internal-state # 这个过程是在coordinator节点完成的. input是datanode节点的最终结果.
ffunc( internal-state ) ---> aggregate-value # 这个过程是在coordinator节点完成的. input是cfunc的结果.
语法如下 :
CREATE AGGREGATE name ( input_data_type [ , ... ] ) (
SFUNC = sfunc,
STYPE = state_data_type
[ , CFUNC = cfunc ]
[ , FINALFUNC = ffunc ]
[ , INITCOND = initial_condition ]
[ , INITCOLLECT = initial_collection_condition ]
[ , SORTOP = sort_operator ]
)
or the old syntax
CREATE AGGREGATE name (
BASETYPE = base_type,
SFUNC = sfunc,
STYPE = state_data_type
[ , FINALFUNC = ffunc ]
[ , INITCOND = initial_condition ]
[ , SORTOP = sort_operator ]
)
sfunc和stype是必须的. 如果没有定义cfunc那么这个聚合就只支持传统的聚合方法.
如果定义了cfunc, 那么这个聚合支持传统的聚合方法, 同时还支持分布式聚合方法.
例如 :
创建sfunc
digoal=# create or replace function d_sum(int,int) returns int as $$
select $1+$2;
$$ language sql strict;
创建聚合
digoal=# create AGGREGATE d_sum(int)
(
sfunc=d_sum,
stype=int,
cfunc=d_sum,
initcond='0',
initcollect='0'
);
创建测试表
digoal=# create table t1(id int, info text) distribute by (id) to group gp0;
digoal=# insert into t1 select generate_series(1,10),'test';
digoal=# insert into t1 values (null,'test');
比较d_sum和sum的聚合结果
digoal=# select d_sum(id) from t1;
d_sum
-------
55
(1 row)
digoal=# select sum(id) from t1;
sum
-----
55
(1 row)
比较sum和d_sum的执行计划
digoal=# explain (analyze,verbose,buffers) select d_sum(id) from t1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=250.00..250.01 rows=1 width=4) (actual time=1.657..1.657 rows=1 loops=1)
Output: d_sum((d_sum(t1.id)))
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.703..0.743 rows=5 loops=1)
Output: (d_sum(t1.id))
-> Data Node Scan on "__REMOTE_GROUP_QUERY__" (cost=0.00..0.00 rows=1000 width=4) (actual time=0.702..0.740 rows=5 loops=1)
Output: d_sum(t1.id)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_sum(group_1.id) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.689 ms
(9 rows)
d_sum, sum两者都是有了分布式聚合.
digoal=# explain (analyze,verbose,buffers) select sum(id) from t1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
Aggregate (cost=2.50..2.51 rows=1 width=4) (actual time=0.674..0.674 rows=1 loops=1)
Output: pg_catalog.sum((sum(t1.id)))
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.507..0.659 rows=5 loops=1)
Output: (sum(t1.id))
-> Data Node Scan on "__REMOTE_GROUP_QUERY__" (cost=0.00..0.00 rows=1000 width=4) (actual time=0.507..0.655 rows=5 loops=1)
Output: sum(t1.id)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT sum(group_1.id) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 0.705 ms
(9 rows)
假如没有cfunc, 再次比较d_sum和sum的执行计划 :
在没有cfunc的情况下, 数据必须从所有的datanode汇总到coordinator节点后执行sfunc聚合.
digoal=# drop AGGREGATE d_sum(int);
DROP AGGREGATE
digoal=# create AGGREGATE d_sum(int)
(
sfunc=d_sum,
stype=int,
initcond='0'
);
CREATE AGGREGATE
digoal=# select d_sum(id) from t1;
d_sum
-------
55
(1 row)
digoal=# explain (analyze,verbose,buffers) select d_sum(id) from t1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Aggregate (cost=250.00..250.01 rows=1 width=4) (actual time=1.981..1.981 rows=1 loops=1)
Output: d_sum(id)
-> Data Node Scan on t1 (cost=0.00..0.00 rows=1000 width=4) (actual time=0.567..0.610 rows=11 loops=1)
Output: id, info
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT id, info FROM ONLY t1 WHERE true
Total runtime: 2.015 ms
(7 rows)
聚合数据处理流程简介
1. 数据汇总聚合方法数据处理流程
Two phased aggregation - is used when the entire aggregation takes place on the Coordinator node.
In first phase called transition phase, Postgres-XC creates a temporary variable of data type stype to hold the current internal state of the aggregate.
At each input row, the aggregate argument value(s) are calculated and the state transition function is invoked with the current state value and the new argument value(s) to calculate a new internal state value.
After all the rows have been processed, in the second phase or finalization phase the final function is invoked once to calculate the aggregate's return value.
If there is no final function then the ending state value is returned as-is.
2. 分布式聚合方法数据处理流程
Three phased aggregation - is used when the process of aggregation is divided between Coordinator and Datanodes.
In this mode, each Postgres-XC Datanode involved in the query carries out the first phase named transition phase.
This phase is similar to the first phase in the two phased aggregation mode discussed above, except that, every Datanode applies this phase on the rows available at the Datanode.
The result of transition phase is then transferred to the Coordinator node. Second phase called collection phase takes place on the Coordinator.
Postgres-XC Coordinator node creates a temporary variable of data type stype to hold the current internal state of the collection phase.
For every input from the Datanode (result of transition phase on that node), the collection function is invoked with the current collection state value and the new transition value (obtained from the Datanode) to calculate a new internal collection state value.
After all the transition values from data nodes have been processed, in the third or finalization phase the final function is invoked once to calculate the aggregate's return value.
If there is no final function then the ending collection state value is returned as-is.
聚合中的数据类型简介 :
sfunc( internal-state, next-data-values ) ---> next-internal-state
cfunc( internal-state, internal-state ) ---> next-internal-state
ffunc( internal-state ) ---> aggregate-value
聚合语法中的 input_data_type 对应 next-data-values
聚合语法中的 state_data_type 对应 internal-state
聚合最终输出类型有两种aggregate-value 或者 internal-state.
1. 当定义了finalfunc时是finalfunc的返回类型.
2. 当没有定义finalfunc时, 输出类型是cfunc或者sfunc的返回类型, 也就是internal-state类型, 既stype.
从流程来分析, sfunc的第一个参数的数据类型以及返回数据类型都必须和stype定义的类型相同.
sfunc的第二个参数必须和聚合的input类型相同.
cfunc的两个参数以及返回类型都必须和stype定义的类型相同.
ffunc的输入参数必须和stype定义的类型相同.
所以整个聚合涉及了3个数据类型:
1. 聚合的输入类型,
2. stype , (同时initcond, initcollect类型=stype);
3. 以及 finalfunc的返回类型.
聚合对空值的处理简介 :
与sfunc,cfunc,finalfunc的定义有关, 是否strict. 是则忽略null值, 不进行调用.
不是strict则需要函数自己处理null值. 所以要特别小心.
An aggregate function can provide an initial condition, that is, an initial value for the internal transition or collection state value.
This is specified and stored in the database as a value of type text, but it must be a valid external representation of a constant of the state value data type.
If it is not supplied then the state value starts out null.
If the collection function is declared "strict", then it cannot be called with null inputs. With such a collection function, aggregate execution behaves as follows.
Null state transition results are ignored (the function is not called and the previous collection state value is retained).
If the initial state value is null, then at the first non-null state transition result replaces the collection state value, and the collection function is invoked at subsequent rows with all-nonnull transition values.
This is handy for implementing aggregates like max.
If the state transition function is declared "strict", then it cannot be called with null inputs.
With such a transition function, aggregate execution behaves as follows. Rows with any null input values are ignored (the function is not called and the previous state value is retained).
If the initial state value is null, then at the first row with all-nonnull input values, the first argument value replaces the state value, and the transition function is invoked at subsequent rows with all-nonnull input values. This is handy for implementing aggregates like max.
Note that this behavior is only available when state_data_type is the same as the first input_data_type.
When these types are different, you must supply a nonnull initial condition or use a nonstrict transition function.
If the state transition and/or collection function is not strict, then it will be called unconditionally at each input row, and must deal with null inputs and null transition/collection values for itself.
This allows the aggregate author to have full control over the aggregate's handling of null values.
If the final function is declared "strict", then it will not be called when the ending state value is null; instead a null result will be returned automatically.
(Of course this is just the normal behavior of strict functions.) In any case the final function has the option of returning a null value.
For example, the final function for avg returns null when it sees there were zero input rows.
例如count, id有一条为null :
digoal=# select count(id) from t1;
count
-------
10
(1 row)
digoal=# select count(*) from t1;
count
-------
11
(1 row)
假如把d_sum(int,int)函数改成called on null input.
这种情况下, 即使row取到的是空值也会调用func. 因此null+val=null, 最后得到的就是null了.
digoal=# alter function d_sum(int,int) called on null input;
ALTER FUNCTION
digoal=# select d_sum(id) from t1;
d_sum
-------
(1 row)
聚合的优化, 使用btree.
sort_operator
The associated sort operator for a MIN- or MAX-like aggregate. This is just an operator name (possibly schema-qualified).
The operator is assumed to have the same input data types as the aggregate (which must be a single-argument aggregate).
Aggregates that behave like MIN or MAX can sometimes be optimized by looking into an index instead of scanning every input row.
If this aggregate can be so optimized, indicate it by specifying a sort operator.
The basic requirement is that the aggregate must yield the first element in the sort ordering induced by the operator;
in other words:
SELECT agg(col) FROM tab;
must be equivalent to:
SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
Further assumptions are that the aggregate ignores null inputs, and that it delivers a null result if and only if there were no non-null inputs.
Ordinarily, a data type's < operator is the proper sort operator for MIN, and > is the proper sort operator for MAX.
Note that the optimization will never actually take effect unless the specified operator is the "less than" or "greater than" strategy member of a B-tree index operator class.
定义类似count(*)这样的无参数聚合:
digoal=# select * from pg_aggregate where aggfnoid::text ~ 'count';
aggfnoid | aggtransfn | aggcollectfn | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect
------------------+-----------------------+------------------+------------+-----------+--------------+------------+----------------
pg_catalog.count | int8inc_any | int8_sum_to_int8 | - | 0 | 20 | 0 | 0
pg_catalog.count | int8inc | int8_sum_to_int8 | - | 0 | 20 | 0 | 0
digoal=# \df+ int8inc_any
List of functions
Schema | Name | Result data type | Argument data types | Type | Volatility | Owner | Language | Source code |
Description
------------+-------------+------------------+---------------------+--------+------------+----------+----------+-------------+------
------------------------------
pg_catalog | int8inc_any | bigint | bigint, "any" | normal | immutable | postgres | internal | int8inc_any | incre
ment, ignores second argument
(1 row)
digoal=# \df+ int8inc
List of functions
Schema | Name | Result data type | Argument data types | Type | Volatility | Owner | Language | Source code | Descripti
on
------------+---------+------------------+---------------------+--------+------------+----------+----------+-------------+----------
---
pg_catalog | int8inc | bigint | bigint | normal | immutable | postgres | internal | int8inc | increment
(1 row)
digoal=# select proisstrict from pg_proc where proname='int8inc';
proisstrict
-------------
t
(1 row)
digoal=# select proisstrict from pg_proc where proname='int8inc_any';
proisstrict
-------------
t
(1 row)
以上count实际上分了两种, 一种是count(列名字), 一种是count(). int8inc_any用于count(列名), int8inc用于count().\
举例 :
创建sfunc
digoal=# create or replace function d_count(int8) returns int8 as $$
select $1+1;
$$ language sql strict;
创建cfunc
digoal=# create or replace function d_count(int8,int8) returns int8 as $$
select $1+$2;
$$ language sql strict;
创建聚合
digoal=# create aggregate d_count(*)
(
sfunc=d_count,
stype=int8,
cfunc=d_count,
initcond='0',
initcollect='0'
);
digoal=# select d_count(*) from t1;
d_count
---------
11
(1 row)
digoal=# explain (analyze,verbose,buffers) select d_count(*) from t1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
---
Aggregate (cost=250.00..250.01 rows=1 width=0) (actual time=1.308..1.308 rows=1 loops=1)
Output: sj.d_count(*)
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.712..0.725 rows=5 loops=1)
Output: (d_count(*))
-> Data Node Scan on "__REMOTE_GROUP_QUERY__" (cost=0.00..0.00 rows=1000 width=0) (actual time=0.712..0.721 rows=5 loops=
1)
Output: d_count(*)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_count(*) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.339 ms
(9 rows)
注意, 为什么d_count(id)得到的结果不是10呢? 因为我没有定义d_count(“any”), 只定义了d_count(*) :
digoal=# select d_count(id) from t1;
d_count
---------
2
3
4
9
10
5
6
7
8
11
(11 rows)
因为这里的d_count是普通函数, 而不是聚合.
实际上调用的是d_count(int8)函数.
将sfunc,cfunc改名后就更能看出来了.
digoal=# alter function d_count(int8) rename to s_d_count;
ALTER FUNCTION
Time: 18.591 ms
digoal=# alter function d_count(int8,int8) rename to c_d_count;
ALTER FUNCTION
Time: 4.049 ms
digoal=# create aggregate d_count(*)
(
sfunc=s_d_count,
stype=int8,
cfunc=c_d_count,
initcond='0',
initcollect='0'
);
CREATE AGGREGATE
Time: 21.752 ms
digoal=# explain (analyze,verbose,buffers) select d_count(*) from t1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
---
Aggregate (cost=250.00..250.01 rows=1 width=0) (actual time=1.732..1.732 rows=1 loops=1)
Output: sj.d_count(*)
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.986..1.008 rows=5 loops=1)
Output: (d_count(*))
-> Data Node Scan on "__REMOTE_GROUP_QUERY__" (cost=0.00..0.00 rows=1000 width=0) (actual time=0.985..1.002 rows=5 loops=
1)
Output: d_count(*)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_count(*) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.766 ms
(9 rows)
Time: 3.088 ms
digoal=# explain (analyze,verbose,buffers) select d_count(id) from t1;
ERROR: function d_count(integer) does not exist
LINE 1: explain (analyze,verbose,buffers) select d_count(id) from t1...
^
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
定义类似count(*)和count(列名)的函数还有一个count(列名)是怎么定义的呢, 如下 :
digoal=# create or replace function s_d_count(int8,"anyelement") returns int8 as $$
select $1+1;
$$ language sql strict;
CREATE FUNCTION
Time: 4.715 ms
digoal=# create aggregate d_count("anyelement")
(
sfunc=s_d_count,
stype=int8,
cfunc=c_d_count,
initcond='0',
initcollect='0'
);
CREATE AGGREGATE
Time: 69.536 ms
digoal=# select d_count(id) from t1;
d_count
---------
10
(1 row)
Time: 3.694 ms
digoal=# explain (analyze,verbose,buffers) select d_count(id) from t1;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
---
Aggregate (cost=250.00..250.01 rows=1 width=4) (actual time=1.913..1.913 rows=1 loops=1)
Output: d_count((d_count(t1.id)))
-> Materialize (cost=0.00..0.00 rows=0 width=0) (actual time=0.893..0.923 rows=5 loops=1)
Output: (d_count(t1.id))
-> Data Node Scan on "__REMOTE_GROUP_QUERY__" (cost=0.00..0.00 rows=1000 width=4) (actual time=0.892..0.917 rows=5 loops=
1)
Output: d_count(t1.id)
Node/s: datanode_1, datanode_2, datanode_3, datanode_4, datanode_5
Remote query: SELECT d_count(group_1.id) FROM (SELECT id, info FROM ONLY t1 WHERE true) group_1
Total runtime: 1.947 ms
(9 rows)
第二个d_count聚合复用了前面定义的c_d_count(int8,int8).
digoal=# select * from pg_aggregate where aggfnoid::text ~ 'd_count';
aggfnoid | aggtransfn | aggcollectfn | aggfinalfn | aggsortop | aggtranstype | agginitval | agginitcollect
------------+--------------+--------------+------------+-----------+--------------+------------+----------------
sj.d_count | sj.s_d_count | c_d_count | - | 0 | 20 | 0 | 0
sj.d_count | sj.s_d_count | c_d_count | - | 0 | 20 | 0 | 0
(2 rows)
Time: 2.811 ms
注意
1. 如果你定义的聚合既支持分布式聚合同时又支持传统的数据汇总聚合, Postgres-XC会根据成本选择合适的聚合方法, 所以在这种情况下, 必须确保两种聚合方法得到的聚合结果是一致的. 否则就出大问题了.
2. 聚合最好不要与函数名重复. 否则会难以排错.
参考
1. http://postgres-xc.sourceforge.net/docs/1_0_3/xaggr.html
2. http://postgres-xc.sourceforge.net/docs/1_0_3/sql-createaggregate.html
3. http://blog.163.com/digoal@126/blog/static/16387704020121118112533410/