PostgreSQL 批量、单步 写入 - row, statement 触发器(中间表)、CTE 几种用法性能对比
背景
数据库写入方式包括:
1、单条insert, autocommit
insert into xx values ();
2、批量(单QUERY)
insert into xx values (),(),...();
3、分组提交
begin;
insert into xx values ();
...
insert into xx values ();
end;
INSERT协议包括extended, prepared, simple等。
当需要对每行写入的数据进行逻辑处理时,可以使用以下三种方法:
postgres=# \h create trigger
Command: CREATE TRIGGER
Description: define a new trigger
Syntax:
CREATE [ CONSTRAINT ] TRIGGER name { BEFORE | AFTER | INSTEAD OF } { event [ OR ... ] }
ON table_name
[ FROM referenced_table_name ]
[ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]
[ REFERENCING { { OLD | NEW } TABLE [ AS ] transition_relation_name } [ ... ] ]
[ FOR [ EACH ] { ROW | STATEMENT } ]
[ WHEN ( condition ) ]
EXECUTE PROCEDURE function_name ( arguments )
where event can be one of:
INSERT
UPDATE [ OF column_name [, ... ] ]
DELETE
TRUNCATE
postgres=# \h create rule
Command: CREATE RULE
Description: define a new rewrite rule
Syntax:
CREATE [ OR REPLACE ] RULE name AS ON event
TO table_name [ WHERE condition ]
DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }
where event can be one of:
SELECT | INSERT | UPDATE | DELETE
1、行级触发器
2、语句级触发器
https://www.postgresql.org/docs/devel/static/plpgsql-trigger.html
《PostgreSQL 10.0 preview 功能增强 - 触发器函数内置中间表》
3、CTE写法
with tmp as (insert into xxx values xxx returning *),
t1 as (逻辑处理1 from tmp where ...),
...,
tn as (逻辑处理n from tmp where ...)
逻辑处理n+1 from tmp where ...;
下面针对以上三种方法,分别介绍单步、批量插入时的性能差异,以便业务上针对性的做出选择。
项目背景参考
《PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)》
DEMO
1、建表
create table test(id int, info text, c1 int , c2 int, crt_time timestamp);
2、逻辑处理的结果写入下面的表
create table t_result (like test);
无触发器时,批量写入性能
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);
INSERT 0 1000000
Time: 1118.051 ms (00:01.118)
行级触发器 - 多个触发器的批量写入性能
1、触发器函数
create or replace function tg1() returns trigger as $$
declare
begin
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
return null;
end;
$$ language plpgsql strict;
2、2个触发器的性能
create trigger tg1 after insert on test for each row execute procedure tg1();
create trigger tg2 after insert on test for each row execute procedure tg1();
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);
INSERT 0 1000000
Time: 4879.985 ms (00:04.880)
3、4个触发器的性能
create trigger tg3 after insert on test for each row execute procedure tg1();
create trigger tg4 after insert on test for each row execute procedure tg1();
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);
INSERT 0 1000000
Time: 8776.416 ms (00:08.776)
4、6个触发器的性能
create trigger tg5 after insert on test for each row execute procedure tg1();
create trigger tg6 after insert on test for each row execute procedure tg1();
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);
INSERT 0 1000000
Time: 12648.707 ms (00:12.649)
5、将触发器合并成1个,所有逻辑放到一个触发器中,批量写入性能
drop trigger tg1 ON test;
drop trigger tg2 ON test;
drop trigger tg3 ON test;
drop trigger tg4 ON test;
drop trigger tg5 ON test;
drop trigger tg6 ON test;
create or replace function tg1() returns trigger as $$
declare
begin
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
return null;
end;
$$ language plpgsql strict;
create trigger tg1 after insert on test for each row execute procedure tg1();
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);
INSERT 0 1000000
Time: 5042.071 ms (00:05.042)
语句级触发器 - 1个触发器,批量写入,6个规则的批量写入性能
https://www.postgresql.org/docs/devel/static/plpgsql-trigger.html
1、创建触发器函数,使用中间表,逻辑处理放到中间表中
create or replace function tg1() returns trigger as $$
declare
begin
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
return null;
end;
$$ language plpgsql strict;
drop trigger tg1 ON test;
create trigger tg1 after insert on test REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure tg1();
2、6个规则的性能
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);
INSERT 0 1000000
Time: 1847.532 ms (00:01.848)
CTE - 批量写入,6个规则性能
with new_table as (insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000) returning *),
t1 as (insert into t_result select * from new_table where c1<=1 and c2<=1),
t2 as (insert into t_result select * from new_table where c1<=1 and c2<=1),
t3 as (insert into t_result select * from new_table where c1<=1 and c2<=1),
t4 as (insert into t_result select * from new_table where c1<=1 and c2<=1),
t5 as (insert into t_result select * from new_table where c1<=1 and c2<=1)
insert into t_result select * from new_table where c1<=1 and c2<=1;
INSERT 0 222
Time: 2833.217 ms (00:02.833)
单条INSERT,“行级、语句级触发器、CTE用法”6个规则,性能对比
1、测试脚本
vi test.sql
insert into test values (1,'test',random()*100,random()*100,now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 120
无触发器
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 29783861
latency average = 0.113 ms
latency stddev = 0.329 ms
tps = 248197.480858 (including connections establishing)
tps = 248213.036221 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.113 insert into test values (1,'test',random()*100,random()*100,now());
2、ROW触发器
create or replace function tg1() returns trigger as $$
declare
begin
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
if NEW.c1<=1 and NEW.c2<=1 then
insert into t_result values (NEW.*);
end if;
return null;
end;
$$ language plpgsql strict;
create trigger tg1 after insert on test for each row execute procedure tg1();
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 18398837
latency average = 0.183 ms
latency stddev = 0.335 ms
tps = 153322.919798 (including connections establishing)
tps = 153332.364931 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.183 insert into test values (1,'test',random()*100,random()*100,now());
3、STATEMENT触发器
create or replace function tg1() returns trigger as $$
declare
begin
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
insert into t_result select * from new_table where c1<=1 and c2<=1;
return null;
end;
$$ language plpgsql strict;
drop trigger tg1 ON test;
create trigger tg1 after insert on test REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure tg1();
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 28
number of threads: 28
duration: 120 s
number of transactions actually processed: 16868185
latency average = 0.199 ms
latency stddev = 0.214 ms
tps = 140567.379172 (including connections establishing)
tps = 140576.126770 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.199 insert into test values (1,'test',random()*100,random()*100,now());
性能总结
模式 | 规则个数 | 语句触发器 处理吞吐 | 行触发器 处理吞吐 | 无规则 处理吞吐 |
---|---|---|---|---|
单条写入 | 单触发器6个规则 | 14万行/s | 15.3万行/s | 24.8万行/s |
批量写入100万行 | 单触发器6个规则 | 54.1万行/s | 19.8万行/s | 89.4万行/s |
批量写入100万行 | 2个触发器2个规则 | - | 20.5万行/s | - |
批量写入100万行 | 4个触发器4个规则 | - | 11.4万行/s | - |
批量写入100万行 | 6个触发器6个规则 | - | 7.9万行/s | - |
CTE语法批量写入100万行 | 6个规则 | - | - | 35.3万行/s (含6规则) |
小结
对于需要逻辑处理的数据表,使用语句级触发器,同时使用中间表的模式来进行逻辑处理,性能是比较均衡的。相比行级触发器,性能好很多。
另外,每增加一个触发器,性能会下降比较厉害。建议把处理逻辑放到一个触发器里面,而不要使用多个触发器。
参考
《PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)》