PostgreSQL 如何实现upsert与新旧数据自动分离

6 minute read

背景

很多业务也行有这样的需求,新的数据会不断的插入,并且可能会有更新。

对于更新的数据,需要记录更新前的记录到历史表。

pic

这个需求有点类似于审计需求,即需要对记录变更前后做审计。

我以前有写过使用hstore和触发器来满足审计需求的文档,有兴趣的同学可以参考

http://blog.163.com/digoal@126/blog/static/163877040201252575529358/

本文的目的并不是审计,而且也可能不期望使用触发器。

还有什么方法呢?

with语法,将插入和更新合并到一条SQL完成。

正文

PostgreSQL 这么高大上,当然有,而且还能在一句SQL里面完成,看法宝。

创建一张当前状态表,一张历史记录表。

postgres=# create table tbl(id int primary key, price int);  
CREATE TABLE  
postgres=# create table tbl_history (id int not null, price int);  
CREATE TABLE  

插入一条不存在的记录,不会触发插入历史表的行为。

注意替代变量

id = $1 = 2  
price = $2 = 7  
  
postgres=# with old as (select * from tbl where id= $1),   
postgres-# new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)   
postgres-# insert into tbl_history select old.* from old,new where old.id=new.id;  
INSERT 0 0  
  
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;  
 tableoid | ctid  | id | price   
----------+-------+----+-------  
    18243 | (0,1) |  2 |     7  
(1 row)  

插入一条不存在的记录,不会触发插入历史表的行为。

id = $1 = 1  
price = $2 = 1  
  
postgres=# with old as (select * from tbl where id= $1),   
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)   
insert into tbl_history select old.* from old,new where old.id=new.id;  
INSERT 0 0  
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;  
 tableoid | ctid  | id | price   
----------+-------+----+-------  
    18243 | (0,1) |  2 |     7  
    18243 | (0,2) |  1 |     1  
(2 rows)  

插入一条已存在的记录,并且有数据的变更,触发数据插入历史表的行为。

id = $1 = 1  
price = $2 = 2  
  
postgres=# with old as (select * from tbl where id= $1),   
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)   
insert into tbl_history select old.* from old,new where old.id=new.id;  
INSERT 0 1  
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;  
 tableoid | ctid  | id | price   
----------+-------+----+-------  
    18243 | (0,1) |  2 |     7  
    18243 | (0,3) |  1 |     2  
    18251 | (0,1) |  1 |     1  
(3 rows)  

插入一条已存在的记录,并且已存在的记录值和老值一样,不会触发将数据插入历史表的行为。

id = $1 = 1  
price = $2 = 2  
  
postgres=# with old as (select * from tbl where id= $1),   
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)   
insert into tbl_history select old.* from old,new where old.id=new.id;  
INSERT 0 0  
postgres=# select tableoid,ctid,* from tbl union all select tableoid,ctid,* from tbl_history ;  
 tableoid | ctid  | id | price   
----------+-------+----+-------  
    18243 | (0,1) |  2 |     7  
    18243 | (0,3) |  1 |     2  
    18251 | (0,1) |  1 |     1  
(3 rows)  

执行计划

postgres=# explain with old as (select * from tbl where id= $1),   
new as (insert into tbl values ($1, $2) on conflict (id) do update set price=excluded.price where tbl.price<>excluded.price returning *)   
insert into tbl_history select old.* from old,new where old.id=new.id;  
                                 QUERY PLAN                                   
----------------------------------------------------------------------------  
 Insert on tbl_history  (cost=2.17..2.23 rows=1 width=8)  
   CTE old  
     ->  Index Scan using tbl_pkey on tbl  (cost=0.14..2.16 rows=1 width=8)  
           Index Cond: (id = 1)  
   CTE new  
     ->  Insert on tbl tbl_1  (cost=0.00..0.01 rows=1 width=8)  
           Conflict Resolution: UPDATE  
           Conflict Arbiter Indexes: tbl_pkey  
           Conflict Filter: (tbl_1.price <> excluded.price)  
           ->  Result  (cost=0.00..0.01 rows=1 width=8)  
   ->  Nested Loop  (cost=0.00..0.05 rows=1 width=8)  
         Join Filter: (old.id = new.id)  
         ->  CTE Scan on old  (cost=0.00..0.02 rows=1 width=8)  
         ->  CTE Scan on new  (cost=0.00..0.02 rows=1 width=4)  
(14 rows)  

在不支持insert on conflict语法的PostgreSQL中(小于9.5的版本),SQL可以调整为:

id = $1 = 1  
price = $2 = 2  
  
with new as (update tbl set price=$2 where id=$1 and price<>$2)   
  insert into tbl select $1, $2 where not exists (select 1 from tbl where id=$1);  

更多upset参考

https://yq.aliyun.com/articles/36103

小于9.5的版本,实现本文的场景,需要这样写。

id = $1 = 1  
price = $2 = 2  
  
with   
old as (select * from tbl where id=$1),  
new_upd as (update tbl set price=$2 where id=$1 and price<>$2 returning *),  
new_ins as (insert into tbl select $1, $2 where not exists (select 1 from tbl where id=$1) returning *)  
insert into tbl_history   
select old.* from old left outer join new_upd on (old.id=new_upd.id) where new_upd.* is not null;  

RULE法

实际上PostgreSQL很早就支持RULE语法,可以在RULE中创建规则,存在则更新,不存在则插入。

但是使用时,务必参考后面的注意事项,使用volatile函数,不要直接使用exists(会当成immutable函数风格处理)。

postgres=# create table d(id int primary key, info text, crt_time timestamp);

postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead update d set info=NEW.info,crt_time=NEW.crt_time where id=NEW.id;

postgres=# insert into d values (1,'test',now());
INSERT 0 1
postgres=# select * from d;
 id | info |          crt_time          
----+------+----------------------------
  1 | test | 2017-08-10 14:12:20.053353
(1 row)

postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# select * from d;
 id |  info   |          crt_time          
----+---------+----------------------------
  1 | test123 | 2017-08-10 14:12:26.964074
(1 row)

存在则不插入(忽略),不存在则更新。实现幂等写入(断点续传写入不出问题)。

postgres=# create table d(id int primary key, info text, crt_time timestamp);

postgres=# create rule r1 as on insert to d where (exists (select 1 from d where d.id=NEW.id)) do instead nothing;
CREATE RULE
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (1,'test123',now());
INSERT 0 0
postgres=# insert into d values (0,'test123',now());
INSERT 0 1

性能压测

vi test.sql

\set id random(1,1000000)
insert into d values (:id, md5(random()::text), now());

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 1000

progress: 90.0 s, 132056.5 tps, lat 0.220 ms stddev 0.055
progress: 91.0 s, 131656.9 tps, lat 0.220 ms stddev 0.040
progress: 92.0 s, 134941.0 tps, lat 0.215 ms stddev 0.090
progress: 93.0 s, 134324.5 tps, lat 0.216 ms stddev 0.076
progress: 94.0 s, 136699.4 tps, lat 0.212 ms stddev 0.070
progress: 95.0 s, 139291.4 tps, lat 0.208 ms stddev 0.067
progress: 96.0 s, 136073.2 tps, lat 0.213 ms stddev 0.076
progress: 97.0 s, 135804.6 tps, lat 0.214 ms stddev 0.076
progress: 98.0 s, 146037.6 tps, lat 0.199 ms stddev 0.069
progress: 99.0 s, 129619.5 tps, lat 0.224 ms stddev 0.049
progress: 100.0 s, 129230.0 tps, lat 0.224 ms stddev 0.047
progress: 101.0 s, 131048.4 tps, lat 0.221 ms stddev 0.055
progress: 102.0 s, 128808.0 tps, lat 0.225 ms stddev 0.048
progress: 103.0 s, 128954.6 tps, lat 0.225 ms stddev 0.048
progress: 104.0 s, 131227.9 tps, lat 0.221 ms stddev 0.042
progress: 105.0 s, 129604.0 tps, lat 0.224 ms stddev 0.057

rule法使用注意事项:

1、rule 中使用exists,用在这里并不完美,仅仅适合单条insert的语句(使用volatile 函数解决这个问题),否则需要约束来保证唯一性。

postgres=# create table e(id int, info text);
CREATE TABLE
postgres=# create rule r1 as on insert to e where exists (select 1 from e t1 where t1.id=NEW.id limit 1) do instead nothing;
CREATE RULE

在一个语句中插入多条,如果多条中有重复,则在RULE中判断条件时仅判断一次(类似immutable函数)。

postgres=# insert into e values (1,'test'),(1,'test');
INSERT 0 2
postgres=# select * from e;
 id | info
----+------
  1 | test
  1 | test
(2 rows)

解决方法,使用volatile函数
postgres=# drop rule r1 on e;
DROP RULE
CREATE OR REPLACE FUNCTION public.ff(integer)
 RETURNS boolean
 LANGUAGE sql
 STRICT
 volatile
AS $function$
  select true from e where id=$1 limit 1;
$function$;

postgres=# create rule r1 as on insert to e where ff(NEW.id) do instead nothing;
CREATE RULE
postgres=# insert into e values (1,'test'),(1,'test');
INSERT 0 0
postgres=# insert into e values (2,'test'),(2,'test');
INSERT 0 1
postgres=# insert into e values (3,'test'),(3,'test');
INSERT 0 1
postgres=# truncate e;
TRUNCATE TABLE
postgres=# select * from b;
 id | info
----+------
  1 | a
  1 | b
(2 rows)

postgres=# insert into e select * from b;
INSERT 0 1
postgres=# \d+ b
                                     Table "public.b"
 Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description
--------+---------+-----------+----------+---------+----------+--------------+-------------
 id     | integer |           |          |         | plain    |              |
 info   | text    |           |          |         | extended |              |

2、rule不支持COPY语句,也就是说COPY如果有重复,一样会导致问题。

不管怎么样,我始终建议需要UPSERT的表,必须有PK。

Flag Counter

digoal’s 大量PostgreSQL文章入口