advisory lock 实现高并发非堵塞式 业务锁
create table lock_test (
tid int primary key, -- 任务ID
state int default 1, -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束
retry int default -1, -- 重试次数
info text, -- 其他信息
crt_time timestamp -- 时间
update lock_test set state=-1 , retry=retry+1 where tid=? and state=1;
update lock_test set state=1 where tid=? and state=-1;
update lock_test set state=0 where tid=? and state=-1;
advisory lock登场,实际上在秒杀业务中我们也看到了它的踪影。
《PostgreSQL 使用advisory lock实现行级读写堵塞》
《PostgreSQL 无缝自增ID的实现 - by advisory lock》
《PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率》
advisory lock 实现高并发非堵塞式 业务锁
List of functions
Schema | Name | Result data type | Argument data types | Type
pg_catalog | pg_try_advisory_lock | boolean | bigint | normal
pg_catalog | pg_try_advisory_xact_lock | boolean | bigint | normal
update lock_test set state=-1 , retry=retry+1 where tid=? and state=1 and pg_try_advisory_xact_lock(?) returning *;
update lock_test set state=1 where tid=? and state=-1 and pg_try_advisory_xact_lock(?);
update lock_test set state=0 where tid=? and state=-1 and pg_try_advisory_xact_lock(?);
create table lock_test (
tid int primary key, -- 任务ID
state int default 1, -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束
retry int default -1, -- 重试次数
info text, -- 其他信息
crt_time timestamp -- 时间
insert into lock_test values (1, 1, -1, 'test', now());
vi test1.sql
update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1;
update lock_test set state=1 where tid=1 and state=-1;
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 64 -j 64 -T 120
query mode: prepared
number of clients: 64
number of threads: 64
duration: 120 s
number of transactions actually processed: 966106
latency average = 7.940 ms
latency stddev = 6.840 ms
tps = 8050.081170 (including connections establishing)
tps = 8054.812052 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
3.978 update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1;
3.962 update lock_test set state=1 where tid=1 and state=-1;
2、advisory lock模式压测
vi test2.sql
update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1 and pg_try_advisory_xact_lock(1) returning *;
update lock_test set state=1 where tid=1 and state=-1 and pg_try_advisory_xact_lock(1);
pgbench -M prepared -n -r -P 1 -f ./test2.sql -c 64 -j 64 -T 120
query mode: prepared
number of clients: 64
number of threads: 64
duration: 120 s
number of transactions actually processed: 23984594
latency average = 0.320 ms
latency stddev = 0.274 ms
tps = 199855.983575 (including connections establishing)
tps = 199962.502494 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.163 update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1 and pg_try_advisory_xact_lock(1) returning *;
0.156 update lock_test set state=1 where tid=1 and state=-1 and pg_try_advisory_xact_lock(1);
8000 TPS提升到20万 TPS。开不开心、意不意外。
advisory lock锁机制简介
advisory lock的范围是(数据库、锁ID,会话或事务级别)。
* Functions for manipulating advisory locks
* We make use of the locktag fields as follows:
* field1: MyDatabaseId ... ensures locks are local to each database
* field2: first of 2 int4 keys, or high-order half of an int8 key
* field3: second of 2 int4 keys, or low-order half of an int8 key
* field4: 1 if using an int8 key, 2 if using 2 int4 keys
#define SET_LOCKTAG_INT64(tag, key64) \
MyDatabaseId, \
(uint32) ((key64) >> 32), \
(uint32) (key64), \
#define SET_LOCKTAG_INT32(tag, key1, key2) \
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2)
* pg_try_advisory_xact_lock(int8) - acquire xact scoped
* exclusive lock on an int8 key, no wait
* Returns true if successful, false if lock not available
int64 key = PG_GETARG_INT64(0);
LockAcquireResult res;
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, false, true);
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
* Inputs:
* locktag: unique identifier for the lockable object
* lockmode: lock mode to acquire
* sessionLock: if true, acquire lock for session not current transaction
* dontWait: if true, don't wait to acquire lock
* Returns one of:
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
* this same transaction, there is no need to examine the return value.
* Side Effects: The lock is acquired and recorded in lock tables.
* NOTE: if we wait for the lock, there is no way to abort the wait
* short of aborting the transaction.
LockAcquire(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait)
return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
* LockAcquireExtended - allows us to specify additional options
* reportMemoryError specifies whether a lock request that fills the
* lock table should generate an ERROR or not. This allows a priority
* caller to note that the lock table is full and then begin taking
* extreme action to reduce the number of other lock holders before
* retrying the action.
LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
bool reportMemoryError)
* If lock requested conflicts with locks requested by waiters, must join
* wait queue. Otherwise, check for conflict with already-held locks.
* (That's last because most complex check.)
if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
status = STATUS_FOUND;
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock);
if (status == STATUS_OK)
/* No conflict with held or previously requested locks */
GrantLock(lock, proclock, lockmode);
GrantLockLocal(locallock, owner);
赋予锁,更新lock, proclock数据结构,表示锁被该进程获取。
* GrantLock -- update the lock and proclock data structures to show
* the lock request has been granted.
* NOTE: if proc was blocked, it also needs to be removed from the wait list
* and have its waitLock/waitProcLock fields cleared. That's not done here.
* NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
* table entry; but since we may be awaking some other process, we can't do
* that here; it's done by GrantLockLocal, instead.
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
lock->grantMask |= LOCKBIT_ON(lockmode);
if (lock->granted[lockmode] == lock->requested[lockmode])
lock->waitMask &= LOCKBIT_OFF(lockmode);
proclock->holdMask |= LOCKBIT_ON(lockmode);
LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nGranted <= lock->nRequested);
1、使用advisory lock时,需要注意一点,因为它是库级别的轻量级锁,所以对于不同的业务(无需相互堵塞的业务),建议设计不同的advisory lock的ID空间,例如A业务的LOCK空间是1-1000000, B业务的LOCK空间是1000001-2000000的空间。诸如此类等等。
2、update, insert, delete都带returning语法,可以返回NEW, OLD value。
3、advisory 的其他应用:
《PostgreSQL 使用advisory lock实现行级读写堵塞》
《PostgreSQL 无缝自增ID的实现 - by advisory lock》
《PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率》
4、advisory lock的级别分事务级和会话级,根据业务的需求进行选择。