Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式

6 minute read

背景

PostgreSQL 9.5 开始支持了upsert的语法,

PostgreSQL 9.5 以前的版本,可以使用CTE语法来代替insert into on conflict(upsert),实现UPSERT。或者使用UDF来实现。

[《HTAP数据库 PostgreSQL 场景与性能测试之 22 - (OLTP) merge insert upsert insert on conflict 合并写入》](../201711/20171107_23.md)

《PostgreSQL upsert功能(insert on conflict do)的用法》

《PostgreSQL 如何实现upsert与新旧数据自动分离》

《[转载]postgresql 9.5版本之前实现upsert功能》

《PostgreSQL 11 preview - MERGE 语法支持与CTE内支持,兼容SQL:2016 , 兼容 Oracle》

《Greenplum merge insert 用法与性能 (insert on conflict) - 2》

《Greenplum merge insert 用法与性能 (insert on conflict) - 1》

《PostgreSQL merge join 评估成本时可能会查询索引 - 硬解析务必引起注意 - 批量删除数据后, 未释放empty索引页导致mergejoin执行计划变慢 case》

但是Greenplum目前的版本较老,(4.3的版本对应的PG是8.2的版本),所以没有办法使用CTE来实现UPSERT。

为了让greenplum实现UPSERT,可以使用UDF的方法。

本文提供PostgreSQL, Greenplum两种环境下的用法。(此处Greenplum指阿里云HybridDB for PostgreSQL,加了JSON类型的支持)

思路

用户提供几个输入:

1、schemaname, 表名

2、键值名(数组)

3、键值(数组),(业务无关:如果要直接定位到SEGMENT,计算哈希时,请使用对应类型的HASH函数来计算)。

4、K-V组成的JSON或JSONB

在函数中自动拼装SQL,实现UPDATE或INSERT。

PostgreSQL

用户把要输入的数据拼装成JSON或JSONB,在函数中解开,生成动态SQL

 pg_catalog | json_each_text               | SETOF record     | from_json json, OUT key text, OUT value text                                           | normal  
 pg_catalog | jsonb_each_text              | SETOF record     | from_json jsonb, OUT key text, OUT value text                                          | normal  

1、测试表

create table test(id1 int, id2 int, info text, c1 int, c2 numeric, c3 float8, c4 int8, crt_time timestamp, primary key(id1,id2));  

2、upsert动态函数

create or replace function gp_upsert(nsp name, tbl name, keys text[], icontent jsonb) returns void as $$  -- icontent中也必须包含PK的内容。  
declare  
  k text;  
  v text;  
  sql1 text := 'update '||quote_ident(nsp)||'.'||quote_ident(tbl)||' set ';  
  sql2 text := 'insert into '||quote_ident(nsp)||'.'||quote_ident(tbl)||' (';  
  sql3 text := 'values (';  
  sql4 text := ' where ';  
  rcnt int := 0;  
begin  
  for k,v in select * from jsonb_each_text(icontent)  
  loop  
    if (not array[k] && keys) then  
      sql1 := sql1||quote_ident(k)||'='||quote_nullable(v)||',';  
    else
      sql4 := sql4||quote_ident(k)||'='||quote_nullable(v)||' and ';
    end if;  
  end loop;  
  
  execute rtrim(sql1, ',') || rtrim(sql4, 'and ');  
  GET DIAGNOSTICS rcnt = ROW_COUNT;  
  if rcnt=0 then  
    
    for k,v in select * from jsonb_each_text(icontent)
    loop
      sql2 := sql2||quote_ident(k)||',';  
      sql3 := sql3||quote_nullable(v)||',';  
    end loop;

    execute rtrim(sql2, ',') || ') ' || rtrim(sql3, ',') || ') ';  
  end if;  
  
  return;  
  
  exception when others then  
    execute rtrim(sql1, ',') || rtrim(sql4, 'and ');  
    return;  
end;  
$$ language plpgsql strict; 

3、测试使用UPSERT动态函数,将数据upsert到test表

postgres=# truncate test;
TRUNCATE TABLE
postgres=# 
postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::jsonb);
 gp_upsert 
-----------
 
(1 row)

postgres=# select ctid,* from test;
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 |      crt_time       
-------+-----+-----+------------+-----+-----+--------+----+---------------------
 (0,1) |   1 |   2 | digoal  ab | 123 | 1.1 | 1.9999 |    | 2018-01-01 10:10:10
(1 row)

postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb);
 gp_upsert 
-----------
 
(1 row)

postgres=# select ctid,* from test;
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 | crt_time 
-------+-----+-----+------------+-----+-----+--------+----+----------
 (0,2) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9999 |    | 
(1 row)

postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":1, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb);
 gp_upsert 
-----------
 
(1 row)

postgres=# select ctid,* from test;
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 | crt_time 
-------+-----+-----+------------+-----+-----+--------+----+----------
 (0,2) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9999 |    | 
 (0,3) |   1 |   1 | digoal  ab | 123 |     | 2.9999 |    | 
(2 rows)

压测

1、脚本

vi test.sql

\set id1 random(1,1000)
\set id2 random(1,10000)
select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::jsonb);

2、压测

pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'

transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 5274457
latency average = 2.184 ms
latency stddev = 3.000 ms
tps = 43946.985362 (including connections establishing)
tps = 43953.237428 (excluding connections establishing)
script statistics:
 - statement latencies in milliseconds:
         0.006  \set id1 random(1,1000)
         0.004  \set id2 random(1,10000)
         2.176  select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::jsonb);

3、统计信息

postgres=# select * from pg_stat_all_tables where relname ='test';
-[ RECORD 1 ]-------+------------------------------
relid               | 219890
schemaname          | public
relname             | test
seq_scan            | 25
seq_tup_read        | 6201505
idx_scan            | 11490223
idx_tup_fetch       | 937792
n_tup_ins           | 10552430
n_tup_upd           | 937792
n_tup_del           | 5
n_tup_hot_upd       | 805144
n_live_tup          | 10512335
n_dead_tup          | 221195
n_mod_since_analyze | 697128
last_vacuum         | 
last_autovacuum     | 2018-06-05 09:56:02.173636+08
last_analyze        | 
last_autoanalyze    | 2018-06-05 10:03:03.646098+08
vacuum_count        | 0
autovacuum_count    | 1
analyze_count       | 0
autoanalyze_count   | 6

4、perf

Samples: 12M of event 'cpu-clock', Event count (approx.): 773892929467
Overhead  Shared Object        Symbol                                 
  31.18%  libc-2.17.so         [.] __mcount_internal                  
   9.63%  libc-2.17.so         [.] _mcount                            
   8.25%  [kernel]             [k] __do_softirq                       
   2.91%  postgres             [.] GetSnapshotData                    
   1.78%  postgres             [.] AllocSetAlloc                      
   1.56%  [kernel]             [k] _raw_spin_unlock_irqrestore        
   1.52%  postgres             [.] LWLockAttemptLock                  
   1.34%  postgres             [.] base_yyparse                       
   1.34%  postgres             [.] SearchCatCache                     
   1.23%  libc-2.17.so         [.] __memcpy_ssse3_back                
   1.06%  postgres             [.] LWLockRelease                      
   0.96%  postgres             [.] hash_search_with_hash_value        

Greenplum

用户把要输入的数据拼装成JSON或JSONB,在函数中解开,生成动态SQL

 pg_catalog | json_each_text               | SETOF record     | from_json json, OUT key text, OUT value text                                           | normal  

1、测试表

create table test(id1 int, id2 int, info text, c1 int, c2 numeric, c3 float8, c4 int8, crt_time timestamp, primary key(id1,id2));  

2、upsert动态函数

create or replace function gp_upsert(nsp name, tbl name, keys text[], icontent json, out rcnt int) returns int as $$  
declare  
  k text;  
  v text;  
  sql1 text := 'update '||quote_ident(nsp)||'.'||quote_ident(tbl)||' set ';  
  sql2 text := 'insert into '||quote_ident(nsp)||'.'||quote_ident(tbl)||' (';  
  sql3 text := 'values (';  
  sql4 text := ' where ';   
begin  
  rcnt := 0;
  for k,v in select * from json_each_text(icontent)  
  loop  
    if (not array[k] && keys) then  
      sql1 := sql1||quote_ident(k)||'='||coalesce(quote_literal(v),'NULL')||',';  
    else
      sql4 := sql4||quote_ident(k)||'='||coalesce(quote_literal(v),'NULL')||' and ';  
    end if;  
  end loop;  
  
  execute rtrim(sql1, ',') || rtrim(sql4, 'and ');  
  GET DIAGNOSTICS rcnt = ROW_COUNT;  
  if rcnt=0 then  

    for k,v in select * from json_each_text(icontent)  
    loop  
      sql2 := sql2||quote_ident(k)||',';  
      sql3 := sql3||coalesce(quote_literal(v),'NULL')||',';  
    end loop;  

    execute rtrim(sql2, ',') || ') ' || rtrim(sql3, ',') || ') ';  
  end if;  
  
  return;  
  
  exception when others then  
    execute rtrim(sql1, ',') || rtrim(sql4, 'and ');  
    return;  
end;  
$$ language plpgsql strict; 
  
-- 返回0 表示insert, 返回1表示update

3、测试使用UPSERT动态函数,将数据upsert到test表

postgres=> truncate test;
TRUNCATE TABLE
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::json);
 gp_upsert 
-----------
 
(1 row)

postgres=> select ctid,* from test;
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 |      crt_time       
-------+-----+-----+------------+-----+-----+--------+----+---------------------
 (0,1) |   1 |   2 | digoal  ab | 123 | 1.1 | 1.9999 |    | 2018-01-01 10:10:10
(1 row)

postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);
 gp_upsert 
-----------
 
(1 row)

postgres=> select ctid,* from test;
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 | crt_time 
-------+-----+-----+------------+-----+-----+--------+----+----------
 (0,2) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9999 |    | 
(1 row)

postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":1, "info":"digoal  ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);
 gp_upsert 
-----------
 
(1 row)

postgres=> select ctid,* from test;
 ctid  | id1 | id2 |    info    | c1  | c2  |   c3   | c4 | crt_time 
-------+-----+-----+------------+-----+-----+--------+----+----------
 (0,2) |   1 |   2 | digoal  ab | 123 | 1.1 | 2.9999 |    | 
 (0,3) |   1 |   1 | digoal  ab | 123 |     | 2.9999 |    | 
(2 rows)

压测

直连segment压测

vi test.sql
\set id1 random(1,1000)
\set id2 random(1,10000)
select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::json);


PGOPTIONS='-c gp_session_role=utility' pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal  ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'


transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 786825
latency average = 14.637 ms
latency stddev = 8.285 ms
tps = 6555.649259 (including connections establishing)
tps = 6557.772914 (excluding connections establishing)
script statistics:
 - statement latencies in milliseconds:
         0.001  \set id1 random(1,1000)
         0.000  \set id2 random(1,10000)
        14.633  select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::json);

小结

结合直接读写SEGMENT,如果使用HEAP TABLE,可以实现高效实时的UPSERT。

用户提供四组信息即可:

1、schemaname

2、表名

3、分布键(由于GPDB支持多个字段作为分布键,所以这里使用数组来提供),(如果要直接将请求发送到SEGMENT执行,计算哈希时,请使用对应类型的HASH函数来计算。)

4、K-V组成的JSON或JSONB

如果输入的字符串中包含了特殊字符,可以使用unicode格式输入。

postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal d\u0061t\u0061 \u0061 ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::jsonb);
 gp_upsert 
-----------
 
(1 row)

postgres=# select * from test;
 id1 | id2 |       info       | c1  | c2  |   c3   | c4 |      crt_time       
-----+-----+------------------+-----+-----+--------+----+---------------------
   1 |   2 | digoal data a ab | 123 | 1.1 | 1.9999 |    | 2018-01-01 10:10:10
(1 row)

json_lex_string@src/backend/utils/adt/json.c

《PostgreSQL 转义、UNICODE、与SQL注入》

如果要提升性能,可以使用《PostgreSQL Oracle 兼容性之 - DBMS_SQL(存储过程动态SQL中使用绑定变量)》 , 但是请注意,如果每次操作的表的字段不尽相同,需要多次绑定不同的PS,所以可能会导致过多的PS,所以这样的情况下使用PS也不一定是个好方法。

本例只用了单行upsert的方式,下一篇为batch方式,把对单表操作的多次请求封装到一次数据库请求,使用json或jsonb数组提交请求的内容(不同的JSON元素可以包含不同的被操作字段)。

Greenplum, PostgreSQL性能诊断,部署参考

《Greenplum PostgreSQL –enable-profiling 产生gprof性能诊断代码》

《PostgreSQL on Linux 最佳部署手册 - 珍藏级》

《PostgreSQL 源码性能诊断(perf profiling)指南 - 珍藏级》

《PostgreSQL 代码性能诊断之 - OProfile & Systemtap》

参考

《让greenplum的oltp性能飞起来 - 直接读写数据节点》

Flag Counter

digoal’s 大量PostgreSQL文章入口