PostgreSQL 如何实现upsert与新旧数据自动分离
背景
很多业务也行有这样的需求,新的数据会不断的插入,并且可能会有更新。
对于更新的数据,需要记录更新前的记录到历史表。
这个需求有点类似于审计需求,即需要对记录变更前后做审计。
我以前有写过使用hstore和触发器来满足审计需求的文档,有兴趣的同学可以参考
http://blog.163.com/digoal@126/blog/static/163877040201252575529358/
本文的目的并不是审计,而且也可能不期望使用触发器。
还有什么方法呢?
with语法,将插入和更新合并到一条SQL完成。
正文
PostgreSQL 这么高大上,当然有,而且还能在一句SQL里面完成,看法宝。
创建一张当前状态表,一张历史记录表。
postgres=# create table tbl(id int primary key, price int);
CREATE TABLE
postgres=# create table tbl_history (id int not null, price int);
CREATE TABLE
插入一条不存在的记录,不会触发插入历史表的行为。
注意替代变量
id = $1 = 2
price = $2 = 7
postgres=# with old as (select * from tbl where id= $1),
postgres-# new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)
postgres-# insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 0
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
tableoid | ctid | id | price
----------+-------+----+-------
18243 | (0,1) | 2 | 7
(1 row)
插入一条不存在的记录,不会触发插入历史表的行为。
id = $1 = 1
price = $2 = 1
postgres=# with old as (select * from tbl where id= $1),
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)
insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 0
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
tableoid | ctid | id | price
----------+-------+----+-------
18243 | (0,1) | 2 | 7
18243 | (0,2) | 1 | 1
(2 rows)
插入一条已存在的记录,并且有数据的变更,触发数据插入历史表的行为。
id = $1 = 1
price = $2 = 2
postgres=# with old as (select * from tbl where id= $1),
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)
insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 1
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
tableoid | ctid | id | price
----------+-------+----+-------
18243 | (0,1) | 2 | 7
18243 | (0,3) | 1 | 2
18251 | (0,1) | 1 | 1
(3 rows)
插入一条已存在的记录,并且已存在的记录值和老值一样,不会触发将数据插入历史表的行为。
id = $1 = 1
price = $2 = 2
postgres=# with old as (select * from tbl where id= $1),
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)
insert into tbl_history select old.* from old,new where old.id=new.id;
INSERT 0 0
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;
tableoid | ctid | id | price
----------+-------+----+-------
18243 | (0,1) | 2 | 7
18243 | (0,3) | 1 | 2
18251 | (0,1) | 1 | 1
(3 rows)
执行计划
postgres=# explain with old as (select * from tbl where id= $1),
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)
insert into tbl_history select old.* from old,new where old.id=new.id;
QUERY PLAN
----------------------------------------------------------------------------
Insert on tbl_history (cost=2.17..2.23 rows=1 width=8)
CTE old
-> Index Scan using tbl_pkey on tbl (cost=0.14..2.16 rows=1 width=8)
Index Cond: (id = 1)
CTE new
-> Insert on tbl tbl_1 (cost=0.00..0.01 rows=1 width=8)
Conflict Resolution: UPDATE
Conflict Arbiter Indexes: tbl_pkey
Conflict Filter: (tbl_1.price <> excluded.price)
-> Result (cost=0.00..0.01 rows=1 width=8)
-> Nested Loop (cost=0.00..0.05 rows=1 width=8)
Join Filter: (old.id = new.id)
-> CTE Scan on old (cost=0.00..0.02 rows=1 width=8)
-> CTE Scan on new (cost=0.00..0.02 rows=1 width=4)
(14 rows)
在不支持insert on conflict语法的PostgreSQL中(小于9.5的版本),SQL可以调整为:
id = $1 = 1
price = $2 = 2
with new as (update tbl set price=$2 where id=$1 and price<>$2)
insert into tbl select $1, $2 where not exists (select 1 from tbl where id=$1);
更多upset参考
https://yq.aliyun.com/articles/36103
小于9.5的版本,实现本文的场景,需要这样写。
id = $1 = 1
price = $2 = 2
with
old as (select * from tbl where id=$1),
new_upd as (update tbl set price=$2 where id=$1 and price<>$2 returning *),
new_ins as (insert into tbl select $1, $2 where not exists (select 1 from tbl where id=$1) returning *)
insert into tbl_history
select old.* from old left outer join new_upd on (old.id=new_upd.id) where new_upd.* is not null;
RULE法
实际上PostgreSQL很早就支持RULE语法,可以在RULE中创建规则,存在则更新,不存在则插入。
但是使用时,务必参考后面的注意事项,使用volatile函数,不要直接使用exists(会当成immutable函数风格处理)。
postgres=# create table d(id int primary key, info text, crt_time timestamp);
postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead update d set info=NEW.info,crt_time=NEW.crt_time where id=NEW.id;
postgres=# insert into d values (1,'test',now());
INSERT 0 1
postgres=# select * from d;
id | info | crt_time
----+------+----------------------------
1 | test | 2017-08-10 14:12:20.053353
(1 row)
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# select * from d;
id | info | crt_time
----+---------+----------------------------
1 | test123 | 2017-08-10 14:12:26.964074
(1 row)
存在则不插入(忽略),不存在则更新。实现幂等写入(断点续传写入不出问题)。
postgres=# create table d(id int primary key, info text, crt_time timestamp);
postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead nothing;
CREATE RULE
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (0,'test123',now());
INSERT 0 1
性能压测
vi test.sql
\set id random(1,1000000)
insert into d values (:id, md5(random()::text), now());
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 1000
progress: 90.0 s, 132056.5 tps, lat 0.220 ms stddev 0.055
progress: 91.0 s, 131656.9 tps, lat 0.220 ms stddev 0.040
progress: 92.0 s, 134941.0 tps, lat 0.215 ms stddev 0.090
progress: 93.0 s, 134324.5 tps, lat 0.216 ms stddev 0.076
progress: 94.0 s, 136699.4 tps, lat 0.212 ms stddev 0.070
progress: 95.0 s, 139291.4 tps, lat 0.208 ms stddev 0.067
progress: 96.0 s, 136073.2 tps, lat 0.213 ms stddev 0.076
progress: 97.0 s, 135804.6 tps, lat 0.214 ms stddev 0.076
progress: 98.0 s, 146037.6 tps, lat 0.199 ms stddev 0.069
progress: 99.0 s, 129619.5 tps, lat 0.224 ms stddev 0.049
progress: 100.0 s, 129230.0 tps, lat 0.224 ms stddev 0.047
progress: 101.0 s, 131048.4 tps, lat 0.221 ms stddev 0.055
progress: 102.0 s, 128808.0 tps, lat 0.225 ms stddev 0.048
progress: 103.0 s, 128954.6 tps, lat 0.225 ms stddev 0.048
progress: 104.0 s, 131227.9 tps, lat 0.221 ms stddev 0.042
progress: 105.0 s, 129604.0 tps, lat 0.224 ms stddev 0.057
rule法使用注意事项:
1、rule 中使用exists,用在这里并不完美,仅仅适合单条insert的语句(使用volatile 函数解决这个问题),否则需要约束来保证唯一性。
postgres=# create table e(id int, info text);
CREATE TABLE
postgres=# create rule r1 as on insert to e where exists (select 1 from e t1 where t1.id=NEW.id limit 1) do instead nothing;
CREATE RULE
在一个语句中插入多条,如果多条中有重复,则在RULE中判断条件时仅判断一次(类似immutable函数)。
postgres=# insert into e values (1,'test'),(1,'test');
INSERT 0 2
postgres=# select * from e;
id | info
----+------
1 | test
1 | test
(2 rows)
解决方法,使用volatile函数
postgres=# drop rule r1 on e;
DROP RULE
CREATE OR REPLACE FUNCTION public.ff(integer)
RETURNS boolean
LANGUAGE sql
STRICT
volatile
AS $function$
select true from e where id=$1 limit 1;
$function$;
postgres=# create rule r1 as on insert to e where ff(NEW.id) do instead nothing;
CREATE RULE
postgres=# insert into e values (1,'test'),(1,'test');
INSERT 0 0
postgres=# insert into e values (2,'test'),(2,'test');
INSERT 0 1
postgres=# insert into e values (3,'test'),(3,'test');
INSERT 0 1
postgres=# truncate e;
TRUNCATE TABLE
postgres=# select * from b;
id | info
----+------
1 | a
1 | b
(2 rows)
postgres=# insert into e select * from b;
INSERT 0 1
postgres=# \d+ b
Table "public.b"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
--------+---------+-----------+----------+---------+----------+--------------+-------------
id | integer | | | | plain | |
info | text | | | | extended | |
2、rule不支持COPY语句,也就是说COPY如果有重复,一样会导致问题。
不管怎么样,我始终建议需要UPSERT的表,必须有PK。