PostgreSQL 无缝自增ID的实现 - by advisory lock

3 minute read

背景

一般来说,数据库都会有序列的功能,例如PostgreSQL就支持序列。

序列是指一直增长的值,但是它有一个不好的地方,就是用掉后就不会再有了,因此对于使用者来说,可能会拿到空洞的值。

例如

postgres=# create table seq_test(id serial, info text);
CREATE TABLE

postgres=# \d+ seq_test
                                             Table "public.seq_test"
 Column |  Type   |                       Modifiers                       | Storage  | Stats target | Description 
--------+---------+-------------------------------------------------------+----------+--------------+-------------
 id     | integer | not null default nextval('seq_test_id_seq'::regclass) | plain    |              | 
 info   | text    |                                                       | extended |              | 

序列的值只要被获取后,就消耗掉了,一直往前。

所以如果中间出现过回滚,就会出现空洞。

postgres=# insert into seq_test (info) values ('test');
INSERT 0 1
postgres=# begin;
BEGIN
postgres=# insert into seq_test (info) values ('test');
INSERT 0 1
postgres=# rollback;
ROLLBACK
postgres=# insert into seq_test (info) values ('test');
INSERT 0 1
postgres=# select * from seq_test;
 id | info 
----+------
  1 | test
  3 | test
(2 rows)

那么有没有一种方法可以得到完全无缝的自增序列值呢?

本文将给大家提供一种方法。

advisory lock

PostgreSQL提供了一个很棒的特性,叫做advisory lock,使用这个锁,可以提供并发的生成能力。

这个锁分为会话锁和事务锁,详见

https://www.postgresql.org/docs/9.6/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS

Name Return Type Description
pg_advisory_lock(key bigint) void Obtain exclusive session level advisory lock
pg_advisory_lock(key1 int, key2 int) void Obtain exclusive session level advisory lock
pg_advisory_lock_shared(key bigint) void Obtain shared session level advisory lock
pg_advisory_lock_shared(key1 int, key2 int) void Obtain shared session level advisory lock
pg_advisory_unlock(key bigint) boolean Release an exclusive session level advisory lock
pg_advisory_unlock(key1 int, key2 int) boolean Release an exclusive session level advisory lock
pg_advisory_unlock_all() void Release all session level advisory locks held by the current session
pg_advisory_unlock_shared(key bigint) boolean Release a shared session level advisory lock
pg_advisory_unlock_shared(key1 int, key2 int) boolean Release a shared session level advisory lock
pg_advisory_xact_lock(key bigint) void Obtain exclusive transaction level advisory lock
pg_advisory_xact_lock(key1 int, key2 int) void Obtain exclusive transaction level advisory lock
pg_advisory_xact_lock_shared(key bigint) void Obtain shared transaction level advisory lock
pg_advisory_xact_lock_shared(key1 int, key2 int) void Obtain shared transaction level advisory lock
pg_try_advisory_lock(key bigint) boolean Obtain exclusive session level advisory lock if available
pg_try_advisory_lock(key1 int, key2 int) boolean Obtain exclusive session level advisory lock if available
pg_try_advisory_lock_shared(key bigint) boolean Obtain shared session level advisory lock if available
pg_try_advisory_lock_shared(key1 int, key2 int) boolean Obtain shared session level advisory lock if available  
pg_try_advisory_xact_lock(key bigint) boolean Obtain exclusive transaction level advisory lock if available
pg_try_advisory_xact_lock(key1 int, key2 int) boolean Obtain exclusive transaction level advisory lock if available
pg_try_advisory_xact_lock_shared(key bigint) boolean Obtain shared transaction level advisory lock if available
pg_try_advisory_xact_lock_shared(key1 int, key2 int) boolean Obtain shared transaction level advisory lock if available

并行无缝自增序列的实现

将逻辑放到函数中,如下,需要保证ID字段的唯一,以及它的顺序绝对保证,插入数据后,返回ID值给客户端。

postgres=# create table uniq_test(id int primary key, info text);
CREATE TABLE

create or replace function f_uniq(i_info text) returns int as $$
declare
  newid int;
  i int := 0;
  res int;
begin
  loop 
    if i>0 then 
      perform pg_sleep(0.2*random());
    else
      i := i+1;
    end if;

    -- 获取已有的最大ID+1 (即将插入的ID)
    select max(id)+1 into newid from uniq_test;
    if newid is not null then
      -- 获取AD LOCK
      if pg_try_advisory_xact_lock(newid) then
        -- 插入
	insert into uniq_test (id,info) values (newid,i_info);
        -- 返回此次获取到的UID
	return newid;
      else
	-- 没有获取到AD LOCK则继续循环
	continue;
      end if;
    else
      -- 表示这是第一条记录,获取AD=1 的LOCK
      if pg_try_advisory_xact_lock(1) then
	insert into uniq_test (id, info) values (1, i_info);
        return 1;
      else
	continue;
      end if;
    end if;
  end loop;
  
  -- 如果因为瞬态导致PK冲突了,继续调用
  exception when others then
    select f_uniq(i_info) into res;
    return res;
end;
$$ language plpgsql strict;

并行压测

$ vi test.sql
select f_uniq('test');

$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 164 -j 164 -T 10
progress: 1.0 s, 9526.0 tps, lat 13.759 ms stddev 69.983
progress: 2.0 s, 12305.9 tps, lat 13.554 ms stddev 67.042
progress: 3.0 s, 12378.7 tps, lat 13.206 ms stddev 65.303
progress: 4.0 s, 12277.0 tps, lat 12.969 ms stddev 68.373
progress: 5.0 s, 12332.3 tps, lat 13.535 ms stddev 71.023
progress: 6.0 s, 11852.9 tps, lat 13.715 ms stddev 70.337
progress: 7.0 s, 12168.1 tps, lat 13.582 ms stddev 71.053
progress: 8.0 s, 12174.8 tps, lat 13.390 ms stddev 69.684
progress: 9.0 s, 12145.0 tps, lat 13.393 ms stddev 70.059
progress: 10.0 s, 12240.7 tps, lat 13.642 ms stddev 68.665
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 164
number of threads: 164
duration: 10 s
number of transactions actually processed: 119565
latency average = 13.791 ms
latency stddev = 70.055 ms
tps = 11729.522019 (including connections establishing)
tps = 11737.830312 (excluding connections establishing)
script statistics:
 - statement latencies in milliseconds:
        13.791  select f_uniq('test');

性能还不错。

验证

postgres=# select count(*),max(id) from uniq_test ;
 count  |  max   
--------+--------
 119565 | 119565
(1 row)

Flag Counter

digoal’s 大量PostgreSQL文章入口