Greenplum & PostgreSQL UPSERT udf 实现 - 2 batch批量模式

5 minute read

背景

《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》

批量模式,接口函数的输入参数如下

1、schemaname

2、表名

3、分布键名(数组)

4、每一次请求的内容,JSON或JSONB 数组。(每次请求的字段可能各不相同,所以务必每个JSON或JSONB元素都需要带上字段名)

batch模式的好处,与数据库的交互次数变少、提交的次数降低,由于COMMIT时有同步WAL的IO,所以同步IO实际上也会变少。

理论上处理吞吐可以提升。

PostgreSQL

1、batch 函数如下

create or replace function gp_upsert_batch(nsp name, tbl name, keys text[], js jsonb[]) returns void as $$    
declare    
  icontent jsonb;  
begin    
  foreach icontent in array js  
  loop  
    perform gp_upsert(nsp,tbl,keys,icontent);    -- 调用单次请求的函数  
  end loop;  
  return;  
end;    
$$ language plpgsql strict;   

这里要解释一下为什么要使用嵌套函数,因为每一行的异常都需要捕获,而且需要继续处理整个BATCH中的其他行,所以只能通过嵌套函数来实现(在嵌套函数内处理exception)。

2、测试

postgres=# select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::jsonb, '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb, '{"id1":1, "id2":1, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb]);  
  
  
postgres=# select ctid,* from test;  
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 | crt_time   
-------+-----+-----+------------+-----+-----+--------+----+----------  
 (0,2) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9999 |    |   
 (0,3) |   1 |   1 | digoal  ab | 123 |     | 2.9999 |    |   
(2 rows)  

3、压测

vi test.sql  
  
\set id1 random(1,10000)  
\set id2 random(10001,20000)  
\set id3 random(20001,30000)  
\set id4 random(30001,40000)  
\set id5 random(40001,50000)  
\set id6 random(50001,60000)  
\set id7 random(60001,70000)  
\set id8 random(70001,80000)  
\set id9 random(80001,90000)  
\set id10 random(90001,100000)  
\set id11 random(100001,110000)  
select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::jsonb, '{"id1": :id1, "id2": :id3, :a}'::jsonb, '{"id1": :id1, "id2": :id4, :a}'::jsonb, '{"id1": :id1, "id2": :id5, :a}'::jsonb, '{"id1": :id1, "id2": :id6, :a}'::jsonb, '{"id1": :id1, "id2": :id7, :a}'::jsonb, '{"id1": :id1, "id2": :id8, :a}'::jsonb, '{"id1": :id1, "id2": :id9, :a}'::jsonb, '{"id1": :id1, "id2": :id10, :a}'::jsonb, '{"id1": :id1, "id2": :id11, :a}'::jsonb]);  

4、压测结果,吞吐确实好一点

pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'  
  

transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 625842
latency average = 18.400 ms
latency stddev = 6.856 ms
tps = 5214.759785 (including connections establishing)
tps = 5215.929160 (excluding connections establishing)
script statistics:
 - statement latencies in milliseconds:
         0.012  \set id1 random(1,10000)  
         0.006  \set id2 random(10001,20000)  
         0.006  \set id3 random(20001,30000)  
         0.005  \set id4 random(30001,40000)  
         0.006  \set id5 random(40001,50000)  
         0.006  \set id6 random(50001,60000)  
         0.006  \set id7 random(60001,70000)  
         0.005  \set id8 random(70001,80000)  
         0.006  \set id9 random(80001,90000)  
         0.005  \set id10 random(90001,100000)  
         0.006  \set id11 random(100001,110000)  
        18.345  select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::jsonb, '{"id1": :id1, "id2": :id3, :a}'::jsonb, '{"id1": :id1, "id2": :id4, :a}'::jsonb, '{"id1": :id1, "id2": :id5, :a}'::jsonb, '{"id1": :id1, "id2": :id6, :a}'::jsonb, '{"id1": :id1, "id2": :id7, :a}'::jsonb, '{"id1": :id1, "id2": :id8, :a}'::jsonb, '{"id1": :id1, "id2": :id9, :a}'::jsonb, '{"id1": :id1, "id2": :id10, :a}'::jsonb, '{"id1": :id1, "id2": :id11, :a}'::jsonb]);  

Greenplum

1、batch 函数如下

create or replace function gp_upsert_batch(nsp name, tbl name, keys text[], js json[], out ins int, out upd int) returns record as $$    
declare    
  icontent json;  
  res int;
begin    
  ins := 0;
  upd := 0;
  for icontent in select * from unnest(js)  
  loop  
    select gp_upsert(nsp,tbl,keys,icontent) into res;   -- 调用单次请求的函数  
    ins := ins + case res when 0 then 1 else 0 end;
    upd := upd + case res when 0 then 0 else 1 end;
  end loop;  
  return;  
end;    
$$ language plpgsql strict;   

这里要解释一下为什么要使用嵌套函数,因为每一行的异常都需要捕获,而且需要继续处理整个BATCH中的其他行,所以只能通过嵌套函数来实现(在嵌套函数内处理exception)。

2、测试

postgres=> select ctid,* from test;  
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 |      crt_time         
-------+-----+-----+------------+-----+-----+--------+----+---------------------  
 (0,1) |   1 |   2 | digoal  ab | 123 | 1.1 | 1.9999 |    | 2018-01-01 10:10:10  
(1 row)  
  
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);  
 gp_upsert   
-----------  
   
(1 row)  
  
postgres=> select ctid,* from test;  
 ctid  | id1 | id2 |    info    | c1  | c2  |         c3         | c4 | crt_time   
-------+-----+-----+------------+-----+-----+--------------------+----+----------  
 (0,3) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9998999999999998 |    |   
(1 row)  
  
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":1, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);  
 gp_upsert   
-----------  
   
(1 row)  
  
postgres=> select ctid,* from test;  
 ctid  | id1 | id2 |    info    | c1  | c2  |         c3         | c4 | crt_time   
-------+-----+-----+------------+-----+-----+--------------------+----+----------  
 (0,3) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9998999999999998 |    |   
 (0,2) |   1 |   1 | digoal  ab | 123 |     | 2.9998999999999998 |    |   
(2 rows)  
  
  
postgres=> delete from test;  
DELETE 2  
postgres=> select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::json, '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::json, '{"id1":1, "id2":1, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::json]);  
 gp_upsert_batch   
-----------------  
   
(1 row)  
  
postgres=> select ctid,* from test;  
 ctid  | id1 | id2 |    info    | c1  | c2  |         c3         | c4 | crt_time   
-------+-----+-----+------------+-----+-----+--------------------+----+----------  
 (0,7) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9998999999999998 |    |   
 (0,4) |   1 |   1 | digoal  ab | 123 |     | 2.9998999999999998 |    |   
(2 rows)  

3、压测

vi test.sql  
  
\set id1 random(1,10000)  
\set id2 random(10001,20000)  
\set id3 random(20001,30000)  
\set id4 random(30001,40000)  
\set id5 random(40001,50000)  
\set id6 random(50001,60000)  
\set id7 random(60001,70000)  
\set id8 random(70001,80000)  
\set id9 random(80001,90000)  
\set id10 random(90001,100000)  
\set id11 random(100001,110000)  
select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::json, '{"id1": :id1, "id2": :id3, :a}'::json, '{"id1": :id1, "id2": :id4, :a}'::json, '{"id1": :id1, "id2": :id5, :a}'::json, '{"id1": :id1, "id2": :id6, :a}'::json, '{"id1": :id1, "id2": :id7, :a}'::json, '{"id1": :id1, "id2": :id8, :a}'::json, '{"id1": :id1, "id2": :id9, :a}'::json, '{"id1": :id1, "id2": :id10, :a}'::json, '{"id1": :id1, "id2": :id11, :a}'::json]);  

4、压测结果,吞吐确实好一点

PGOPTIONS='-c gp_session_role=utility' pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'  
  
  
transaction type: ./test.sql  
scaling factor: 1  
query mode: simple  
number of clients: 96  
number of threads: 96  
duration: 120 s  
number of transactions actually processed: 108668  
latency average = 106.000 ms  
latency stddev = 46.055 ms  
tps = 905.181364 (including connections establishing)  
tps = 905.489034 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.002  \set id1 random(1,10000)  
         0.000  \set id2 random(10001,20000)  
         0.000  \set id3 random(20001,30000)  
         0.000  \set id4 random(30001,40000)  
         0.000  \set id5 random(40001,50000)  
         0.000  \set id6 random(50001,60000)  
         0.000  \set id7 random(60001,70000)  
         0.000  \set id8 random(70001,80000)  
         0.000  \set id9 random(80001,90000)  
         0.000  \set id10 random(90001,100000)  
         0.000  \set id11 random(100001,110000)  
       105.988  select gp_upsert_batch('public', 'test', array['id1','id2'], array['{"id1": :id1, "id2": :id2, :a}'::json, '{"id1": :id1, "id2": :id3, :a}'::json, '{"id1": :id1, "id2": :id4, :a}'::json, '{"id1": :id1, "id2": :id5, :a}'::json, '{"id1": :id1, "id2": :id6, :a}'::json, '{"id1": :id1, "id2": :id7, :a}'::json, '{"id1": :id1, "id2": :id8, :a}'::json, '{"id1": :id1, "id2": :id9, :a}'::json, '{"id1": :id1, "id2": :id10, :a}'::json, '{"id1": :id1, "id2": :id11, :a}'::json]);  

参考

《Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式》

Greenplum, PostgreSQL性能诊断,部署参考

《Greenplum PostgreSQL –enable-profiling 产生gprof性能诊断代码》

《PostgreSQL on Linux 最佳部署手册 - 珍藏级》

《PostgreSQL 源码性能诊断(perf profiling)指南 - 珍藏级》

《PostgreSQL 代码性能诊断之 - OProfile & Systemtap》

Flag Counter

digoal’s 大量PostgreSQL文章入口