Greenplum & PostgreSQL UPSERT udf 实现 - 1 单行模式
背景
PostgreSQL 9.5 开始支持了upsert的语法,
PostgreSQL 9.5 以前的版本,可以使用CTE语法来代替insert into on conflict(upsert),实现UPSERT。或者使用UDF来实现。
[《HTAP数据库 PostgreSQL 场景与性能测试之 22 - (OLTP) merge insert | upsert | insert on conflict | 合并写入》](../201711/20171107_23.md) |
《PostgreSQL upsert功能(insert on conflict do)的用法》
《PostgreSQL 如何实现upsert与新旧数据自动分离》
《[转载]postgresql 9.5版本之前实现upsert功能》
《PostgreSQL 11 preview - MERGE 语法支持与CTE内支持,兼容SQL:2016 , 兼容 Oracle》
《Greenplum merge insert 用法与性能 (insert on conflict) - 2》
《Greenplum merge insert 用法与性能 (insert on conflict) - 1》
《PostgreSQL merge join 评估成本时可能会查询索引 - 硬解析务必引起注意 - 批量删除数据后, 未释放empty索引页导致mergejoin执行计划变慢 case》
但是Greenplum目前的版本较老,(4.3的版本对应的PG是8.2的版本),所以没有办法使用CTE来实现UPSERT。
为了让greenplum实现UPSERT,可以使用UDF的方法。
本文提供PostgreSQL, Greenplum两种环境下的用法。(此处Greenplum指阿里云HybridDB for PostgreSQL,加了JSON类型的支持)
思路
用户提供几个输入:
1、schemaname, 表名
2、键值名(数组)
3、键值(数组),(业务无关:如果要直接定位到SEGMENT,计算哈希时,请使用对应类型的HASH函数来计算)。
4、K-V组成的JSON或JSONB
在函数中自动拼装SQL,实现UPDATE或INSERT。
PostgreSQL
用户把要输入的数据拼装成JSON或JSONB,在函数中解开,生成动态SQL
pg_catalog | json_each_text | SETOF record | from_json json, OUT key text, OUT value text | normal
pg_catalog | jsonb_each_text | SETOF record | from_json jsonb, OUT key text, OUT value text | normal
1、测试表
create table test(id1 int, id2 int, info text, c1 int, c2 numeric, c3 float8, c4 int8, crt_time timestamp, primary key(id1,id2));
2、upsert动态函数
create or replace function gp_upsert(nsp name, tbl name, keys text[], icontent jsonb) returns void as $$ -- icontent中也必须包含PK的内容。
declare
k text;
v text;
sql1 text := 'update '||quote_ident(nsp)||'.'||quote_ident(tbl)||' set ';
sql2 text := 'insert into '||quote_ident(nsp)||'.'||quote_ident(tbl)||' (';
sql3 text := 'values (';
sql4 text := ' where ';
rcnt int := 0;
begin
for k,v in select * from jsonb_each_text(icontent)
loop
if (not array[k] && keys) then
sql1 := sql1||quote_ident(k)||'='||quote_nullable(v)||',';
else
sql4 := sql4||quote_ident(k)||'='||quote_nullable(v)||' and ';
end if;
end loop;
execute rtrim(sql1, ',') || rtrim(sql4, 'and ');
GET DIAGNOSTICS rcnt = ROW_COUNT;
if rcnt=0 then
for k,v in select * from jsonb_each_text(icontent)
loop
sql2 := sql2||quote_ident(k)||',';
sql3 := sql3||quote_nullable(v)||',';
end loop;
execute rtrim(sql2, ',') || ') ' || rtrim(sql3, ',') || ') ';
end if;
return;
exception when others then
execute rtrim(sql1, ',') || rtrim(sql4, 'and ');
return;
end;
$$ language plpgsql strict;
3、测试使用UPSERT动态函数,将数据upsert到test表
postgres=# truncate test;
TRUNCATE TABLE
postgres=#
postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::jsonb);
gp_upsert
-----------
(1 row)
postgres=# select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+---------------------
(0,1) | 1 | 2 | digoal ab | 123 | 1.1 | 1.9999 | | 2018-01-01 10:10:10
(1 row)
postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb);
gp_upsert
-----------
(1 row)
postgres=# select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+----------
(0,2) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9999 | |
(1 row)
postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":1, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::jsonb);
gp_upsert
-----------
(1 row)
postgres=# select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+----------
(0,2) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9999 | |
(0,3) | 1 | 1 | digoal ab | 123 | | 2.9999 | |
(2 rows)
压测
1、脚本
vi test.sql
\set id1 random(1,1000)
\set id2 random(1,10000)
select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::jsonb);
2、压测
pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'
transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 5274457
latency average = 2.184 ms
latency stddev = 3.000 ms
tps = 43946.985362 (including connections establishing)
tps = 43953.237428 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.006 \set id1 random(1,1000)
0.004 \set id2 random(1,10000)
2.176 select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::jsonb);
3、统计信息
postgres=# select * from pg_stat_all_tables where relname ='test';
-[ RECORD 1 ]-------+------------------------------
relid | 219890
schemaname | public
relname | test
seq_scan | 25
seq_tup_read | 6201505
idx_scan | 11490223
idx_tup_fetch | 937792
n_tup_ins | 10552430
n_tup_upd | 937792
n_tup_del | 5
n_tup_hot_upd | 805144
n_live_tup | 10512335
n_dead_tup | 221195
n_mod_since_analyze | 697128
last_vacuum |
last_autovacuum | 2018-06-05 09:56:02.173636+08
last_analyze |
last_autoanalyze | 2018-06-05 10:03:03.646098+08
vacuum_count | 0
autovacuum_count | 1
analyze_count | 0
autoanalyze_count | 6
4、perf
Samples: 12M of event 'cpu-clock', Event count (approx.): 773892929467
Overhead Shared Object Symbol
31.18% libc-2.17.so [.] __mcount_internal
9.63% libc-2.17.so [.] _mcount
8.25% [kernel] [k] __do_softirq
2.91% postgres [.] GetSnapshotData
1.78% postgres [.] AllocSetAlloc
1.56% [kernel] [k] _raw_spin_unlock_irqrestore
1.52% postgres [.] LWLockAttemptLock
1.34% postgres [.] base_yyparse
1.34% postgres [.] SearchCatCache
1.23% libc-2.17.so [.] __memcpy_ssse3_back
1.06% postgres [.] LWLockRelease
0.96% postgres [.] hash_search_with_hash_value
Greenplum
用户把要输入的数据拼装成JSON或JSONB,在函数中解开,生成动态SQL
pg_catalog | json_each_text | SETOF record | from_json json, OUT key text, OUT value text | normal
1、测试表
create table test(id1 int, id2 int, info text, c1 int, c2 numeric, c3 float8, c4 int8, crt_time timestamp, primary key(id1,id2));
2、upsert动态函数
create or replace function gp_upsert(nsp name, tbl name, keys text[], icontent json, out rcnt int) returns int as $$
declare
k text;
v text;
sql1 text := 'update '||quote_ident(nsp)||'.'||quote_ident(tbl)||' set ';
sql2 text := 'insert into '||quote_ident(nsp)||'.'||quote_ident(tbl)||' (';
sql3 text := 'values (';
sql4 text := ' where ';
begin
rcnt := 0;
for k,v in select * from json_each_text(icontent)
loop
if (not array[k] && keys) then
sql1 := sql1||quote_ident(k)||'='||coalesce(quote_literal(v),'NULL')||',';
else
sql4 := sql4||quote_ident(k)||'='||coalesce(quote_literal(v),'NULL')||' and ';
end if;
end loop;
execute rtrim(sql1, ',') || rtrim(sql4, 'and ');
GET DIAGNOSTICS rcnt = ROW_COUNT;
if rcnt=0 then
for k,v in select * from json_each_text(icontent)
loop
sql2 := sql2||quote_ident(k)||',';
sql3 := sql3||coalesce(quote_literal(v),'NULL')||',';
end loop;
execute rtrim(sql2, ',') || ') ' || rtrim(sql3, ',') || ') ';
end if;
return;
exception when others then
execute rtrim(sql1, ',') || rtrim(sql4, 'and ');
return;
end;
$$ language plpgsql strict;
-- 返回0 表示insert, 返回1表示update
3、测试使用UPSERT动态函数,将数据upsert到test表
postgres=> truncate test;
TRUNCATE TABLE
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::json);
gp_upsert
-----------
(1 row)
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+---------------------
(0,1) | 1 | 2 | digoal ab | 123 | 1.1 | 1.9999 | | 2018-01-01 10:10:10
(1 row)
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);
gp_upsert
-----------
(1 row)
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+----------
(0,2) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9999 | |
(1 row)
postgres=> select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":1, "info":"digoal ab", "c1":123, "c3":2.9999, "crt_time":null}'::json);
gp_upsert
-----------
(1 row)
postgres=> select ctid,* from test;
ctid | id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-------+-----+-----+------------+-----+-----+--------+----+----------
(0,2) | 1 | 2 | digoal ab | 123 | 1.1 | 2.9999 | |
(0,3) | 1 | 1 | digoal ab | 123 | | 2.9999 | |
(2 rows)
压测
直连segment压测
vi test.sql
\set id1 random(1,1000)
\set id2 random(1,10000)
select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::json);
PGOPTIONS='-c gp_session_role=utility' pgbench -M simple -n -r -P 1 -f ./test.sql -c 96 -j 96 -T 120 -D a='"info":"digoal ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"'
transaction type: ./test.sql
scaling factor: 1
query mode: simple
number of clients: 96
number of threads: 96
duration: 120 s
number of transactions actually processed: 786825
latency average = 14.637 ms
latency stddev = 8.285 ms
tps = 6555.649259 (including connections establishing)
tps = 6557.772914 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.001 \set id1 random(1,1000)
0.000 \set id2 random(1,10000)
14.633 select gp_upsert('public', 'test', array['id1','id2'], '{"id1": :id1, "id2": :id2, :a}'::json);
小结
结合直接读写SEGMENT,如果使用HEAP TABLE,可以实现高效实时的UPSERT。
用户提供四组信息即可:
1、schemaname
2、表名
3、分布键(由于GPDB支持多个字段作为分布键,所以这里使用数组来提供),(如果要直接将请求发送到SEGMENT执行,计算哈希时,请使用对应类型的HASH函数来计算。)
4、K-V组成的JSON或JSONB
如果输入的字符串中包含了特殊字符,可以使用unicode格式输入。
postgres=# select gp_upsert('public', 'test', array['id1','id2'], '{"id1":1, "id2":2, "info":"digoal d\u0061t\u0061 \u0061 ab", "c1":123, "c2":1.1, "c3":1.9999, "crt_time":"2018-01-01 10:10:10"}'::jsonb);
gp_upsert
-----------
(1 row)
postgres=# select * from test;
id1 | id2 | info | c1 | c2 | c3 | c4 | crt_time
-----+-----+------------------+-----+-----+--------+----+---------------------
1 | 2 | digoal data a ab | 123 | 1.1 | 1.9999 | | 2018-01-01 10:10:10
(1 row)
json_lex_string@src/backend/utils/adt/json.c
《PostgreSQL 转义、UNICODE、与SQL注入》
如果要提升性能,可以使用《PostgreSQL Oracle 兼容性之 - DBMS_SQL(存储过程动态SQL中使用绑定变量)》 , 但是请注意,如果每次操作的表的字段不尽相同,需要多次绑定不同的PS,所以可能会导致过多的PS,所以这样的情况下使用PS也不一定是个好方法。
本例只用了单行upsert的方式,下一篇为batch方式,把对单表操作的多次请求封装到一次数据库请求,使用json或jsonb数组提交请求的内容(不同的JSON元素可以包含不同的被操作字段)。
Greenplum, PostgreSQL性能诊断,部署参考
《Greenplum PostgreSQL –enable-profiling 产生gprof性能诊断代码》
《PostgreSQL on Linux 最佳部署手册 - 珍藏级》
《PostgreSQL 源码性能诊断(perf profiling)指南 - 珍藏级》
《PostgreSQL 代码性能诊断之 - OProfile & Systemtap》
参考
《让greenplum的oltp性能飞起来 - 直接读写数据节点》