PostgreSQL 批量、单步 写入 - row, statement 触发器(中间表)、CTE 几种用法性能对比

6 minute read

背景

数据库写入方式包括:

1、单条insert, autocommit

insert into xx values ();  

2、批量(单QUERY)

insert into xx values (),(),...();  

3、分组提交

begin;  
insert into xx values ();  
...  
insert into xx values ();  
end;  

INSERT协议包括extended, prepared, simple等。

当需要对每行写入的数据进行逻辑处理时,可以使用以下三种方法:

postgres=# \h create trigger  
Command:     CREATE TRIGGER  
Description: define a new trigger  
Syntax:  
CREATE [ CONSTRAINT ] TRIGGER name { BEFORE | AFTER | INSTEAD OF } { event [ OR ... ] }  
    ON table_name  
    [ FROM referenced_table_name ]  
    [ NOT DEFERRABLE | [ DEFERRABLE ] [ INITIALLY IMMEDIATE | INITIALLY DEFERRED ] ]  
    [ REFERENCING { { OLD | NEW } TABLE [ AS ] transition_relation_name } [ ... ] ]  
    [ FOR [ EACH ] { ROW | STATEMENT } ]  
    [ WHEN ( condition ) ]  
    EXECUTE PROCEDURE function_name ( arguments )  
  
where event can be one of:  
  
    INSERT  
    UPDATE [ OF column_name [, ... ] ]  
    DELETE  
    TRUNCATE  
  
postgres=# \h create rule  
Command:     CREATE RULE  
Description: define a new rewrite rule  
Syntax:  
CREATE [ OR REPLACE ] RULE name AS ON event  
    TO table_name [ WHERE condition ]  
    DO [ ALSO | INSTEAD ] { NOTHING | command | ( command ; command ... ) }  
  
where event can be one of:  
  
    SELECT | INSERT | UPDATE | DELETE  

1、行级触发器

2、语句级触发器

https://www.postgresql.org/docs/devel/static/plpgsql-trigger.html

《PostgreSQL 10.0 preview 功能增强 - 触发器函数内置中间表》

3、CTE写法

with tmp as (insert into xxx values xxx returning *),  
t1 as (逻辑处理1 from tmp where ...),  
...,  
tn as (逻辑处理n from tmp where ...)  
逻辑处理n+1 from tmp where ...;  

下面针对以上三种方法,分别介绍单步、批量插入时的性能差异,以便业务上针对性的做出选择。

项目背景参考

《PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)》

DEMO

1、建表

create table test(id int, info text, c1 int , c2 int, crt_time timestamp);  

2、逻辑处理的结果写入下面的表

create table t_result (like test);  

无触发器时,批量写入性能

postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);  
INSERT 0 1000000  
Time: 1118.051 ms (00:01.118)  

行级触发器 - 多个触发器的批量写入性能

1、触发器函数

create or replace function tg1() returns trigger as $$  
declare  
begin  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  return null;  
end;  
$$ language plpgsql strict;  

2、2个触发器的性能

create trigger tg1 after insert on test for each row execute procedure tg1();  
create trigger tg2 after insert on test for each row execute procedure tg1();  
  
  
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);  
INSERT 0 1000000  
Time: 4879.985 ms (00:04.880)  

3、4个触发器的性能

create trigger tg3 after insert on test for each row execute procedure tg1();  
create trigger tg4 after insert on test for each row execute procedure tg1();  
  
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);  
INSERT 0 1000000  
Time: 8776.416 ms (00:08.776)  

4、6个触发器的性能

create trigger tg5 after insert on test for each row execute procedure tg1();  
create trigger tg6 after insert on test for each row execute procedure tg1();  
  
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);  
INSERT 0 1000000  
Time: 12648.707 ms (00:12.649)  

5、将触发器合并成1个,所有逻辑放到一个触发器中,批量写入性能

drop trigger tg1 ON test;  
drop trigger tg2 ON test;  
drop trigger tg3 ON test;  
drop trigger tg4 ON test;  
drop trigger tg5 ON test;  
drop trigger tg6 ON test;  
  
  
create or replace function tg1() returns trigger as $$  
declare  
begin  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  return null;  
end;  
$$ language plpgsql strict;  
  
  
create trigger tg1 after insert on test for each row execute procedure tg1();  
  
  
postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);  
INSERT 0 1000000  
Time: 5042.071 ms (00:05.042)  

语句级触发器 - 1个触发器,批量写入,6个规则的批量写入性能

https://www.postgresql.org/docs/devel/static/plpgsql-trigger.html

1、创建触发器函数,使用中间表,逻辑处理放到中间表中

create or replace function tg1() returns trigger as $$  
declare  
begin  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  return null;  
end;  
$$ language plpgsql strict;  
  
drop trigger tg1 ON test;  
  
create trigger tg1 after insert on test REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure tg1();  

2、6个规则的性能

postgres=# insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000);  
INSERT 0 1000000  
Time: 1847.532 ms (00:01.848)  

CTE - 批量写入,6个规则性能

with new_table as (insert into test select 1,'test',random()*100,random()*100,now() from generate_series(1,1000000) returning *),  
t1 as (insert into t_result select * from new_table where c1<=1 and c2<=1),  
t2 as (insert into t_result select * from new_table where c1<=1 and c2<=1),  
t3 as (insert into t_result select * from new_table where c1<=1 and c2<=1),  
t4 as (insert into t_result select * from new_table where c1<=1 and c2<=1),  
t5 as (insert into t_result select * from new_table where c1<=1 and c2<=1)  
insert into t_result select * from new_table where c1<=1 and c2<=1;  
  
INSERT 0 222  
Time: 2833.217 ms (00:02.833)  

单条INSERT,“行级、语句级触发器、CTE用法”6个规则,性能对比

1、测试脚本

vi test.sql  
  
insert into test values (1,'test',random()*100,random()*100,now());  
  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 120  

无触发器

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 28  
number of threads: 28  
duration: 120 s  
number of transactions actually processed: 29783861  
latency average = 0.113 ms  
latency stddev = 0.329 ms  
tps = 248197.480858 (including connections establishing)  
tps = 248213.036221 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.113  insert into test values (1,'test',random()*100,random()*100,now());  

2、ROW触发器

create or replace function tg1() returns trigger as $$  
declare  
begin  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  if NEW.c1<=1 and NEW.c2<=1 then   
    insert into t_result values (NEW.*);  
  end if;  
  return null;  
end;  
$$ language plpgsql strict;  
  
  
create trigger tg1 after insert on test for each row execute procedure tg1();  
  
  
  
  
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 28  
number of threads: 28  
duration: 120 s  
number of transactions actually processed: 18398837  
latency average = 0.183 ms  
latency stddev = 0.335 ms  
tps = 153322.919798 (including connections establishing)  
tps = 153332.364931 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.183  insert into test values (1,'test',random()*100,random()*100,now());  

3、STATEMENT触发器

create or replace function tg1() returns trigger as $$  
declare  
begin  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  insert into t_result select * from new_table where c1<=1 and c2<=1;  
  return null;  
end;  
$$ language plpgsql strict;  
  
drop trigger tg1 ON test;  
  
create trigger tg1 after insert on test REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure tg1();  
  
  
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 28  
number of threads: 28  
duration: 120 s  
number of transactions actually processed: 16868185  
latency average = 0.199 ms  
latency stddev = 0.214 ms  
tps = 140567.379172 (including connections establishing)  
tps = 140576.126770 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.199  insert into test values (1,'test',random()*100,random()*100,now());  

性能总结

模式 规则个数 语句触发器 处理吞吐 行触发器 处理吞吐 无规则 处理吞吐
单条写入 单触发器6个规则 14万行/s 15.3万行/s 24.8万行/s
批量写入100万行 单触发器6个规则 54.1万行/s 19.8万行/s 89.4万行/s
批量写入100万行 2个触发器2个规则 - 20.5万行/s -
批量写入100万行 4个触发器4个规则 - 11.4万行/s -
批量写入100万行 6个触发器6个规则 - 7.9万行/s -
CTE语法批量写入100万行 6个规则 - - 35.3万行/s (含6规则)

小结

对于需要逻辑处理的数据表,使用语句级触发器,同时使用中间表的模式来进行逻辑处理,性能是比较均衡的。相比行级触发器,性能好很多。

另外,每增加一个触发器,性能会下降比较厉害。建议把处理逻辑放到一个触发器里面,而不要使用多个触发器。

参考

《PostgreSQL 流式处理应用实践 - 二手商品实时归类(异步消息notify/listen、阅后即焚)》

Flag Counter

digoal’s 大量PostgreSQL文章入口