PostgreSQL 无缝自增ID的实现 - by advisory lock
背景
一般来说,数据库都会有序列的功能,例如PostgreSQL就支持序列。
序列是指一直增长的值,但是它有一个不好的地方,就是用掉后就不会再有了,因此对于使用者来说,可能会拿到空洞的值。
例如
postgres=# create table seq_test(id serial, info text);
CREATE TABLE
postgres=# \d+ seq_test
Table "public.seq_test"
Column | Type | Modifiers | Storage | Stats target | Description
--------+---------+-------------------------------------------------------+----------+--------------+-------------
id | integer | not null default nextval('seq_test_id_seq'::regclass) | plain | |
info | text | | extended | |
序列的值只要被获取后,就消耗掉了,一直往前。
所以如果中间出现过回滚,就会出现空洞。
postgres=# insert into seq_test (info) values ('test');
INSERT 0 1
postgres=# begin;
BEGIN
postgres=# insert into seq_test (info) values ('test');
INSERT 0 1
postgres=# rollback;
ROLLBACK
postgres=# insert into seq_test (info) values ('test');
INSERT 0 1
postgres=# select * from seq_test;
id | info
----+------
1 | test
3 | test
(2 rows)
那么有没有一种方法可以得到完全无缝的自增序列值呢?
本文将给大家提供一种方法。
advisory lock
PostgreSQL提供了一个很棒的特性,叫做advisory lock,使用这个锁,可以提供并发的生成能力。
这个锁分为会话锁和事务锁,详见
https://www.postgresql.org/docs/9.6/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
Name | Return Type | Description |
---|---|---|
pg_advisory_lock(key bigint) | void | Obtain exclusive session level advisory lock |
pg_advisory_lock(key1 int, key2 int) | void | Obtain exclusive session level advisory lock |
pg_advisory_lock_shared(key bigint) | void | Obtain shared session level advisory lock |
pg_advisory_lock_shared(key1 int, key2 int) | void | Obtain shared session level advisory lock |
pg_advisory_unlock(key bigint) | boolean | Release an exclusive session level advisory lock |
pg_advisory_unlock(key1 int, key2 int) | boolean | Release an exclusive session level advisory lock |
pg_advisory_unlock_all() | void | Release all session level advisory locks held by the current session |
pg_advisory_unlock_shared(key bigint) | boolean | Release a shared session level advisory lock |
pg_advisory_unlock_shared(key1 int, key2 int) | boolean | Release a shared session level advisory lock |
pg_advisory_xact_lock(key bigint) | void | Obtain exclusive transaction level advisory lock |
pg_advisory_xact_lock(key1 int, key2 int) | void | Obtain exclusive transaction level advisory lock |
pg_advisory_xact_lock_shared(key bigint) | void | Obtain shared transaction level advisory lock |
pg_advisory_xact_lock_shared(key1 int, key2 int) | void | Obtain shared transaction level advisory lock |
pg_try_advisory_lock(key bigint) | boolean | Obtain exclusive session level advisory lock if available |
pg_try_advisory_lock(key1 int, key2 int) | boolean | Obtain exclusive session level advisory lock if available |
pg_try_advisory_lock_shared(key bigint) | boolean | Obtain shared session level advisory lock if available |
pg_try_advisory_lock_shared(key1 int, key2 int) boolean | Obtain shared session level advisory lock if available | |
pg_try_advisory_xact_lock(key bigint) | boolean | Obtain exclusive transaction level advisory lock if available |
pg_try_advisory_xact_lock(key1 int, key2 int) | boolean | Obtain exclusive transaction level advisory lock if available |
pg_try_advisory_xact_lock_shared(key bigint) | boolean | Obtain shared transaction level advisory lock if available |
pg_try_advisory_xact_lock_shared(key1 int, key2 int) | boolean | Obtain shared transaction level advisory lock if available |
并行无缝自增序列的实现
将逻辑放到函数中,如下,需要保证ID字段的唯一,以及它的顺序绝对保证,插入数据后,返回ID值给客户端。
postgres=# create table uniq_test(id int primary key, info text);
CREATE TABLE
create or replace function f_uniq(i_info text) returns int as $$
declare
newid int;
i int := 0;
res int;
begin
loop
if i>0 then
perform pg_sleep(0.2*random());
else
i := i+1;
end if;
-- 获取已有的最大ID+1 (即将插入的ID)
select max(id)+1 into newid from uniq_test;
if newid is not null then
-- 获取AD LOCK
if pg_try_advisory_xact_lock(newid) then
-- 插入
insert into uniq_test (id,info) values (newid,i_info);
-- 返回此次获取到的UID
return newid;
else
-- 没有获取到AD LOCK则继续循环
continue;
end if;
else
-- 表示这是第一条记录,获取AD=1 的LOCK
if pg_try_advisory_xact_lock(1) then
insert into uniq_test (id, info) values (1, i_info);
return 1;
else
continue;
end if;
end if;
end loop;
-- 如果因为瞬态导致PK冲突了,继续调用
exception when others then
select f_uniq(i_info) into res;
return res;
end;
$$ language plpgsql strict;
并行压测
$ vi test.sql
select f_uniq('test');
$ pgbench -M prepared -n -r -P 1 -f ./test.sql -c 164 -j 164 -T 10
progress: 1.0 s, 9526.0 tps, lat 13.759 ms stddev 69.983
progress: 2.0 s, 12305.9 tps, lat 13.554 ms stddev 67.042
progress: 3.0 s, 12378.7 tps, lat 13.206 ms stddev 65.303
progress: 4.0 s, 12277.0 tps, lat 12.969 ms stddev 68.373
progress: 5.0 s, 12332.3 tps, lat 13.535 ms stddev 71.023
progress: 6.0 s, 11852.9 tps, lat 13.715 ms stddev 70.337
progress: 7.0 s, 12168.1 tps, lat 13.582 ms stddev 71.053
progress: 8.0 s, 12174.8 tps, lat 13.390 ms stddev 69.684
progress: 9.0 s, 12145.0 tps, lat 13.393 ms stddev 70.059
progress: 10.0 s, 12240.7 tps, lat 13.642 ms stddev 68.665
transaction type: ./test.sql
scaling factor: 1
query mode: prepared
number of clients: 164
number of threads: 164
duration: 10 s
number of transactions actually processed: 119565
latency average = 13.791 ms
latency stddev = 70.055 ms
tps = 11729.522019 (including connections establishing)
tps = 11737.830312 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
13.791 select f_uniq('test');
性能还不错。
验证
postgres=# select count(*),max(id) from uniq_test ;
count | max
--------+--------
119565 | 119565
(1 row)