PostgreSQL upsert功能(insert on conflict do)的用法

6 minute read

背景

PostgreSQL 9.5 引入了一项新功能,UPSERT(insert on conflict do),当插入遇到约束错误时,直接返回,或者改为执行UPDATE。

语法如下

Command:     INSERT  
Description: create new rows in a table  
Syntax:  
[ WITH [ RECURSIVE ] with_query [, ...] ]  
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]  
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }  
    [ ON CONFLICT [ conflict_target ] conflict_action ]  
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]  
  
where conflict_target can be one of:  
  
    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]  
    ON CONSTRAINT constraint_name  
  
and conflict_action is one of:  
  
    DO NOTHING  
    DO UPDATE SET { column_name = { expression | DEFAULT } |  
                    ( column_name [, ...] ) = ( { expression | DEFAULT } [, ...] ) |  
                    ( column_name [, ...] ) = ( sub-SELECT )  
                  } [, ...]  
              [ WHERE condition ]  

PostgreSQL 9.5以前的版本,可以通过函数,或者with语法来实现UPSERT类似的功能。

9.5+ UPSERT用法举例

创建一张测试表,其中一个字段为唯一键或者主键。

create table test(id int primary key, info text, crt_time timestamp);  

1. 不存在则插入,存在则更新

test03=# insert into test values (1,'test',now()) on conflict (id) do update set info=excluded.info,crt_time=excluded.crt_time;  
INSERT 0 1  
  
test03=# select * from test;  
 id | info |          crt_time            
----+------+----------------------------  
  1 | test | 2017-04-24 15:27:25.393948  
(1 row)  
  
test03=# insert into test values (1,'hello digoal',now()) on conflict (id) do update set info=excluded.info,crt_time=excluded.crt_time;  
INSERT 0 1  
  
test03=# select * from test;  
 id |     info     |          crt_time            
----+--------------+----------------------------  
  1 | hello digoal | 2017-04-24 15:27:39.140877  
(1 row)  

2. 不存在则插入,存在则直接返回(不做任何处理)

test03=# insert into test values (1,'hello digoal',now()) on conflict (id) do nothing;  
INSERT 0 0  
test03=# insert into test values (1,'pu',now()) on conflict (id) do nothing;  
INSERT 0 0  
test03=# insert into test values (2,'pu',now()) on conflict (id) do nothing;  
INSERT 0 1  
test03=# select * from test;  
 id |     info     |          crt_time            
----+--------------+----------------------------  
  1 | hello digoal | 2017-04-24 15:27:39.140877  
  2 | pu           | 2017-04-24 15:28:20.37392  
(2 rows)  

9.5- UPSERT用法举例

用户可以根据实际需求,使用不同的方法

1. 函数

test03=# create or replace function f_upsert(int,text,timestamp) returns void as $$  
declare  
  res int;  
begin  
  update test set info=$2,crt_time=$3 where id=$1;  
  if not found then  
    insert into test (id,info,crt_time) values ($1,$2,$3);  
  end if;  
  exception when others then  
    return;  
end;  
$$ language plpgsql strict;  
CREATE FUNCTION  
  
test03=# select f_upsert(1,'digoal',now()::timestamp);  
 f_upsert   
----------  
   
(1 row)  
  
test03=# select * from test;  
 id |  info  |          crt_time            
----+--------+----------------------------  
  2 | pu     | 2017-04-24 15:28:20.37392  
  1 | digoal | 2017-04-24 15:31:29.254325  
(2 rows)  
  
test03=# select f_upsert(1,'digoal001',now()::timestamp);  
 f_upsert   
----------  
   
(1 row)  
  
test03=# select * from test;  
 id |   info    |         crt_time            
----+-----------+---------------------------  
  2 | pu        | 2017-04-24 15:28:20.37392  
  1 | digoal001 | 2017-04-24 15:31:38.0529  
(2 rows)  
  
test03=# select f_upsert(3,'hello',now()::timestamp);  
 f_upsert   
----------  
   
(1 row)  
  
test03=# select * from test;  
 id |   info    |         crt_time            
----+-----------+---------------------------  
  2 | pu        | 2017-04-24 15:28:20.37392  
  1 | digoal001 | 2017-04-24 15:31:38.0529  
  3 | hello     | 2017-04-24 15:31:49.14291  
(3 rows)  

使用函数对数据进行upsert,如果要实现批量写入,可以创建variadic参数来实现。

《PostgreSQL plpgsql variadic argments , parameters - 可变参数个数》

create table batch (id int primary key, info text, crt_time timestamp);

create or replace function merge_batch(VARIADIC i batch[])
returns void as $$
declare var batch;
begin
  foreach var in array i loop
    update batch set info=var.info,crt_time=var.crt_time where id=var.id;
    if not found then
      insert into batch values (var.*);
    end if;
  end loop;
exception when others then
  return; 
end;
$$ language plpgsql strict;

select merge_batch((1,'abc','2000-01-01'),(1,'abcd','2017-02-02'));

2. WITH语法,用法1

create table test(id int primary key, info text, crt_time timestamp);  

存在则更新,不存在则插入。

with upsert as (update test set info=$info,crt_time=$crt_time where id=$id returning *) insert into test select $id,$info,$crt_time where not exists (select 1 from upsert where id=$id);    

替换变量,进行测试

with upsert as (update test set info='test',crt_time=now() where id=1 returning *) insert into test select 1,'test',now() where not exists (select 1 from upsert where id=1);    

同时插入一条不存在的值,只有一个会话成功,另一个会话会报PK约束错误。

3. WITH语法,用法2

即使表没有PK或者唯一约束,也能保证并发。

create table test(id int, info text, crt_time timestamp);  

3.1 对于记录不存在,可以保证只有一个session插入数据,对于同一条数据更新,先来的session会lock着记录,后来的session会wait。

with     
  w1 as(select ('x'||substr(md5('$id'),1,16))::bit(64)::bigint as tra_id),    
  upsert as (update test set info=$info,crt_time=$crt_time where id=$id returning *)    
  insert into test select $id, $info, $crt_time from w1     
    where pg_try_advisory_xact_lock(tra_id) and not exists (select 1 from upsert where id=$id);    

替换变量,进行测试

with     
  w1 as(select ('x'||substr(md5('1'),1,16))::bit(64)::bigint as tra_id),    
  upsert as (update test set info='digoal0123',crt_time=now() where id=1 returning *)    
  insert into test select 1, 'digoal0123', now() from w1     
    where pg_try_advisory_xact_lock(tra_id) and not exists (select 1 from upsert where id=1);    
  
INSERT 0 0  
  
test03=# select * from test;  
 id |    info    |         crt_time            
----+------------+---------------------------  
  2 | pu         | 2017-04-24 15:28:20.37392  
  3 | hello      | 2017-04-24 15:31:49.14291  
  1 | digoal0123 | 2017-04-24 15:31:38.0529  
(3 rows)  
  
with     
  w1 as(select ('x'||substr(md5('4'),1,16))::bit(64)::bigint as tra_id),    
  upsert as (update test set info='digoal0123',crt_time=now() where id=4 returning *)    
  insert into test select 4, 'digoal0123', now() from w1     
    where pg_try_advisory_xact_lock(tra_id) and not exists (select 1 from upsert where id=4);    
  
INSERT 0 1  
  
test03=# select * from test;  
 id |    info    |          crt_time            
----+------------+----------------------------  
  2 | pu         | 2017-04-24 15:28:20.37392  
  3 | hello      | 2017-04-24 15:31:49.14291  
  1 | digoal0123 | 2017-04-24 15:31:38.0529  
  4 | digoal0123 | 2017-04-24 15:38:39.801908  
(4 rows)  

3.2 对于记录不存在,可以保证只有一个session插入数据,对于同一条数据更新,先来的session会更新数据,后来的session不等待,直接失败。

with w1 as(select ('x'||substr(md5('$id'),1,16))::bit(64)::bigint as tra_id),    
  upsert as (update test set info=$info,crt_time=$crt_time from w1 where pg_try_advisory_xact_lock(tra_id) and id=$id returning *)    
  insert into test select $id,$info,$crt_time from w1   
    where pg_try_advisory_xact_lock(tra_id) and not exists (select 1 from upsert where id=$id);     

替换变量,进行测试

with w1 as(select ('x'||substr(md5('1'),1,16))::bit(64)::bigint as tra_id),    
  upsert as (update test set info='test',crt_time=now() from w1 where pg_try_advisory_xact_lock(tra_id) and id=1 returning *)    
  insert into test select 1,'test',now() from w1   
    where pg_try_advisory_xact_lock(tra_id) and not exists (select 1 from upsert where id=1);    
  
INSERT 0 0  
  
test03=# select * from test;  
 id |    info    |          crt_time            
----+------------+----------------------------  
  2 | pu         | 2017-04-24 15:28:20.37392  
  3 | hello      | 2017-04-24 15:31:49.14291  
  4 | digoal0123 | 2017-04-24 15:42:50.912887  
  1 | test       | 2017-04-24 15:44:44.245167  
(4 rows)  

4、RULE法

实际上PostgreSQL很早就支持RULE语法,可以在RULE中创建规则,存在则更新,不存在则插入。

务必使用volatile函数,不要直接使用exists(会当成immutable函数风格处理)。

存在则不插入(忽略),不存在则更新。

实现幂等写入(断点续传写入不出问题)。

postgres=# create table e(id int primary key, info text);
CREATE TABLE

创建volatile函数

CREATE OR REPLACE FUNCTION public.ff(integer)
 RETURNS boolean
 LANGUAGE sql
 STRICT
 volatile
AS $function$
  select true from e where id=$1 limit 1;
$function$;

创建规则

postgres=# create rule r1 as on insert to e where ff(NEW.id) do instead update e set info=NEW.info where id=NEW.id;
CREATE RULE

postgres=# insert into e values (1,'test'),(1,'test');
INSERT 0 0
postgres=# insert into e values (2,'test'),(2,'test');
INSERT 0 1
postgres=# insert into e values (3,'test'),(3,'test');
INSERT 0 1
postgres=# truncate e;
TRUNCATE TABLE
postgres=# select * from b;
 id | info 
----+------
  1 | a
  1 | b
(2 rows)

postgres=# insert into e select * from b;
INSERT 0 1
postgres=# \d+ b
                                     Table "public.b"
 Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
--------+---------+-----------+----------+---------+----------+--------------+-------------
 id     | integer |           |          |         | plain    |              | 
 info   | text    |           |          |         | extended |              | 

性能压测

vi test.sql

\set id random(1,1000000)
insert into e values (:id, md5(random()::text));

pgbench -M prepared -n -r -P 1 -f ./test.sql -c 64 -j 64 -T 1000

progress: 90.0 s, 132056.5 tps, lat 0.220 ms stddev 0.055
progress: 91.0 s, 131656.9 tps, lat 0.220 ms stddev 0.040
progress: 92.0 s, 134941.0 tps, lat 0.215 ms stddev 0.090
progress: 93.0 s, 134324.5 tps, lat 0.216 ms stddev 0.076
progress: 94.0 s, 136699.4 tps, lat 0.212 ms stddev 0.070
progress: 95.0 s, 139291.4 tps, lat 0.208 ms stddev 0.067
progress: 96.0 s, 136073.2 tps, lat 0.213 ms stddev 0.076
progress: 97.0 s, 135804.6 tps, lat 0.214 ms stddev 0.076
progress: 98.0 s, 146037.6 tps, lat 0.199 ms stddev 0.069
progress: 99.0 s, 129619.5 tps, lat 0.224 ms stddev 0.049
progress: 100.0 s, 129230.0 tps, lat 0.224 ms stddev 0.047
progress: 101.0 s, 131048.4 tps, lat 0.221 ms stddev 0.055
progress: 102.0 s, 128808.0 tps, lat 0.225 ms stddev 0.048
progress: 103.0 s, 128954.6 tps, lat 0.225 ms stddev 0.048
progress: 104.0 s, 131227.9 tps, lat 0.221 ms stddev 0.042
progress: 105.0 s, 129604.0 tps, lat 0.224 ms stddev 0.057

rule法使用注意事项:

1、rule 中使用exists,用在这里并不完美,仅仅适合单条insert的语句(所以需要使用volatile 函数解决这个问题)。

postgres=# create table e(id int, info text);
CREATE TABLE
postgres=# create rule r1 as on insert to e where exists (select 1 from e t1 where t1.id=NEW.id limit 1) do instead nothing;
CREATE RULE
  
在一个语句中插入多条,如果多条中有重复,则在RULE中判断条件时仅判断一次(类似immutable函数)。  
  
postgres=# insert into e values (1,'test'),(1,'test');
INSERT 0 2
postgres=# select * from e;
 id | info 
----+------
  1 | test
  1 | test
(2 rows)

2、rule不支持COPY语句,也就是说COPY如果有重复,一样会导致问题。

不管怎么样,我始终建议需要UPSERT的表,必须有PK。

Flag Counter

digoal’s 大量PostgreSQL文章入口