Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式
背景
《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》
批量模式,接口函数的输入参数如下
1、schemaname
2、表名
3、分布键名(数组)
4、每一次请求的内容,JSON或JSONB 数组。(每次请求的字段可能各不相同,所以务必每个JSON或JSONB元素都需要带上字段名)
batch模式的好处,与数据库的交互次数变少、提交的次数降低,由于COMMIT时有同步WAL的IO,所以同步IO实际上也会变少。
理论上处理吞吐可以提升。
PostgreSQL
1、batch 函数如下
create or replace function gp_upsert_batch(nsp name, tbl name, keys text[], js jsonb[]) returns void as $$
declare
icontent jsonb;
begin
foreach icontent in array js
loop
perform gp_upsert(nsp,tbl,keys,icontent); -- 调用单次请求的函数
end loop;
return;
end;
$$ language plpgsql strict;
这里要解释一下为什么要使用嵌套函数,因为每一行的异常都需要捕获,而且需要继续处理整个BATCH中的其他行,所以只能通过嵌套函数来实现(在嵌套函数内处理exception)。
2、测试
postgres=# select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::jsonb, '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb, '{"id1":1, "id2":1, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb]);
postgres=# select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+----------
(0,2) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9999 | |
(0,3) | 1 | 1 | digoal ab | 123 | | 2.9999 | |
(2 rows)
3、压测
vi test.sql
\set id1 random(1,10000)
\set id2 random(10001,20000)
\set id3 random(20001,30000)
\set id4 random(30001,40000)
\set id5 random(40001,50000)
\set id6 random(50001,60000)
\set id7 random(60001,70000)
\set id8 random(70001,80000)
\set id9 random(80001,90000)
\set id10 random(90001,100000)
\set id11 random(100001,110000)
select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::jsonb, '{"id1": :id1, "id2": :id3, :a}'::jsonb, '{"id1": :id1, "id2": :id4, :a}'::jsonb, '{"id1": :id1, "id2": :id5, :a}'::jsonb, '{"id1": :id1, "id2": :id6, :a}'::jsonb, '{"id1": :id1, "id2": :id7, :a}'::jsonb, '{"id1": :id1, "id2": :id8, :a}'::jsonb, '{"id1": :id1, "id2": :id9, :a}'::jsonb, '{"id1": :id1, "id2": :id10, :a}'::jsonb, '{"id1": :id1, "id2": :id11, :a}'::jsonb]);
4、压测结果,吞吐确实好一点
pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'
transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 625842
latency average = 18.400 ms
latency stddev = 6.856 ms
tps = 5214.759785 (including connections establishing)
tps = 5215.929160 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.012 \set id1 random(1,10000)
0.006 \set id2 random(10001,20000)
0.006 \set id3 random(20001,30000)
0.005 \set id4 random(30001,40000)
0.006 \set id5 random(40001,50000)
0.006 \set id6 random(50001,60000)
0.006 \set id7 random(60001,70000)
0.005 \set id8 random(70001,80000)
0.006 \set id9 random(80001,90000)
0.005 \set id10 random(90001,100000)
0.006 \set id11 random(100001,110000)
18.345 select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::jsonb, '{"id1": :id1, "id2": :id3, :a}'::jsonb, '{"id1": :id1, "id2": :id4, :a}'::jsonb, '{"id1": :id1, "id2": :id5, :a}'::jsonb, '{"id1": :id1, "id2": :id6, :a}'::jsonb, '{"id1": :id1, "id2": :id7, :a}'::jsonb, '{"id1": :id1, "id2": :id8, :a}'::jsonb, '{"id1": :id1, "id2": :id9, :a}'::jsonb, '{"id1": :id1, "id2": :id10, :a}'::jsonb, '{"id1": :id1, "id2": :id11, :a}'::jsonb]);
Greenplum
1、batch 函数如下
create or replace function gp_upsert_batch(nsp name, tbl name, keys text[], js json[], out ins int, out upd int) returns record as $$
declare
icontent json;
res int;
begin
ins := 0;
upd := 0;
for icontent in select * from unnest(js)
loop
select gp_upsert(nsp,tbl,keys,icontent) into res; -- 调用单次请求的函数
ins := ins + case res when 0 then 1 else 0 end;
upd := upd + case res when 0 then 0 else 1 end;
end loop;
return;
end;
$$ language plpgsql strict;
这里要解释一下为什么要使用嵌套函数,因为每一行的异常都需要捕获,而且需要继续处理整个BATCH中的其他行,所以只能通过嵌套函数来实现(在嵌套函数内处理exception)。
2、测试
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+---------------------
(0,1) | 1 | 2 | digoal ab | 123 | 1.1 | 1.9999 | | 2018-01-01 10:10:10
(1 row)
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);
gp_upsert
-----------
(1 row)
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------------------+----+----------
(0,3) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9998999999999998 | |
(1 row)
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":1, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);
gp_upsert
-----------
(1 row)
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------------------+----+----------
(0,3) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9998999999999998 | |
(0,2) | 1 | 1 | digoal ab | 123 | | 2.9998999999999998 | |
(2 rows)
postgres=> delete from test;
DELETE 2
postgres=> select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::json, '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::json, '{"id1":1, "id2":1, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::json]);
gp_upsert_batch
-----------------
(1 row)
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------------------+----+----------
(0,7) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9998999999999998 | |
(0,4) | 1 | 1 | digoal ab | 123 | | 2.9998999999999998 | |
(2 rows)
3、压测
vi test.sql
\set id1 random(1,10000)
\set id2 random(10001,20000)
\set id3 random(20001,30000)
\set id4 random(30001,40000)
\set id5 random(40001,50000)
\set id6 random(50001,60000)
\set id7 random(60001,70000)
\set id8 random(70001,80000)
\set id9 random(80001,90000)
\set id10 random(90001,100000)
\set id11 random(100001,110000)
select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::json, '{"id1": :id1, "id2": :id3, :a}'::json, '{"id1": :id1, "id2": :id4, :a}'::json, '{"id1": :id1, "id2": :id5, :a}'::json, '{"id1": :id1, "id2": :id6, :a}'::json, '{"id1": :id1, "id2": :id7, :a}'::json, '{"id1": :id1, "id2": :id8, :a}'::json, '{"id1": :id1, "id2": :id9, :a}'::json, '{"id1": :id1, "id2": :id10, :a}'::json, '{"id1": :id1, "id2": :id11, :a}'::json]);
4、压测结果,吞吐确实好一点
PGOPTIONS='-c gp_session_role=utility' pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'
transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 108668
latency average = 106.000 ms
latency stddev = 46.055 ms
tps = 905.181364 (including connections establishing)
tps = 905.489034 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.002 \set id1 random(1,10000)
0.000 \set id2 random(10001,20000)
0.000 \set id3 random(20001,30000)
0.000 \set id4 random(30001,40000)
0.000 \set id5 random(40001,50000)
0.000 \set id6 random(50001,60000)
0.000 \set id7 random(60001,70000)
0.000 \set id8 random(70001,80000)
0.000 \set id9 random(80001,90000)
0.000 \set id10 random(90001,100000)
0.000 \set id11 random(100001,110000)
105.988 select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::json, '{"id1": :id1, "id2": :id3, :a}'::json, '{"id1": :id1, "id2": :id4, :a}'::json, '{"id1": :id1, "id2": :id5, :a}'::json, '{"id1": :id1, "id2": :id6, :a}'::json, '{"id1": :id1, "id2": :id7, :a}'::json, '{"id1": :id1, "id2": :id8, :a}'::json, '{"id1": :id1, "id2": :id9, :a}'::json, '{"id1": :id1, "id2": :id10, :a}'::json, '{"id1": :id1, "id2": :id11, :a}'::json]);
参考
《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》
Greenplum, PostgreSQL性能诊断,部署参考
《Greenplum PostgreSQL –enable-profiling 产生gprof性能诊断代码》
《PostgreSQL on Linux 最佳部署手册 - 珍藏级》
《PostgreSQL 源码性能诊断(perf profiling)指南 - 珍藏级》
《PostgreSQL 代码性能诊断之 - OProfile & Systemtap》