advisory lock 实现高并发非堵塞式 业务锁
背景
某些业务会利用数据库来作为一种可靠的锁,例如任务调度系统,或者其他需要可靠的锁机制的系统。
通常他们可能会使用数据库的一条记录来实现锁的SLOT和状态信息。
例如
create table lock_test (
tid int primary key, -- 任务ID
state int default 1, -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束
retry int default -1, -- 重试次数
info text, -- 其他信息
crt_time timestamp -- 时间
);
任务处理系统到数据库获取任务
例如
update lock_test set state=-1 , retry=retry+1 where tid=? and state=1;
处理失败
update lock_test set state=1 where tid=? and state=-1;
处理成功
update lock_test set state=0 where tid=? and state=-1;
当多个客户端并行获得同一个任务时,就会引发冲突,导致等待(虽然等待时间可能不长,但是在大型任务调度系统中,一点点的等待都无法忍受)。
如何解决这个冲突等待呢?
advisory lock登场,实际上在秒杀业务中我们也看到了它的踪影。
《PostgreSQL 使用advisory lock实现行级读写堵塞》
《PostgreSQL 无缝自增ID的实现 - by advisory lock》
《PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率》
advisory lock 实现高并发非堵塞式 业务锁
事务级或会话级,根据业务形态选择。
List of functions
Schema | Name | Result data type | Argument data types | Type
------------+----------------------------------+------------------+---------------------+--------
pg_catalog | pg_try_advisory_lock | boolean | bigint | normal
pg_catalog | pg_try_advisory_xact_lock | boolean | bigint | normal
SQL改造如下
开始处理任务
update lock_test set state=-1 , retry=retry+1 where tid=? and state=1 and pg_try_advisory_xact_lock(?) returning *;
处理失败
update lock_test set state=1 where tid=? and state=-1 and pg_try_advisory_xact_lock(?);
处理成功
update lock_test set state=0 where tid=? and state=-1 and pg_try_advisory_xact_lock(?);
性能压测对比
为了体现冲突的问题,我们使用一条记录来表示一个任务,大家都来抢一个任务的极端场景。
create table lock_test (
tid int primary key, -- 任务ID
state int default 1, -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束
retry int default -1, -- 重试次数
info text, -- 其他信息
crt_time timestamp -- 时间
);
insert into lock_test values (1, 1, -1, 'test', now());
1、传统模式压测
vi test1.sql
update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1;
update lock_test set state=1 where tid=1 and state=-1;
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 64 -j 64 -T 120
query mode: prepared
number of clients: 64
number of threads: 64
duration: 120 s
number of transactions actually processed: 966106
latency average = 7.940 ms
latency stddev = 6.840 ms
tps = 8050.081170 (including connections establishing)
tps = 8054.812052 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
3.978 update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1;
3.962 update lock_test set state=1 where tid=1 and state=-1;
2、advisory lock模式压测
vi test2.sql
update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1 and pg_try_advisory_xact_lock(1) returning *;
update lock_test set state=1 where tid=1 and state=-1 and pg_try_advisory_xact_lock(1);
pgbench -M prepared -n -r -P 1 -f ./test2.sql -c 64 -j 64 -T 120
query mode: prepared
number of clients: 64
number of threads: 64
duration: 120 s
number of transactions actually processed: 23984594
latency average = 0.320 ms
latency stddev = 0.274 ms
tps = 199855.983575 (including connections establishing)
tps = 199962.502494 (excluding connections establishing)
script statistics:
- statement latencies in milliseconds:
0.163 update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1 and pg_try_advisory_xact_lock(1) returning *;
0.156 update lock_test set state=1 where tid=1 and state=-1 and pg_try_advisory_xact_lock(1);
8000 TPS提升到20万 TPS。开不开心、意不意外。
advisory lock锁机制简介
advisory lock的范围是(数据库、锁ID,会话或事务级别)。
src/backend/utils/adt/lockfuncs.c
/*
* Functions for manipulating advisory locks
*
* We make use of the locktag fields as follows:
*
* field1: MyDatabaseId ... ensures locks are local to each database
* field2: first of 2 int4 keys, or high-order half of an int8 key
* field3: second of 2 int4 keys, or low-order half of an int8 key
* field4: 1 if using an int8 key, 2 if using 2 int4 keys
*/
#define SET_LOCKTAG_INT64(tag, key64) \
SET_LOCKTAG_ADVISORY(tag, \
MyDatabaseId, \
(uint32) ((key64) >> 32), \
(uint32) (key64), \
1)
#define SET_LOCKTAG_INT32(tag, key1, key2) \
SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2)
pg_try_advisory_xact_lock锁是如何实现的?
/*
* pg_try_advisory_xact_lock(int8) - acquire xact scoped
* exclusive lock on an int8 key, no wait
*
* Returns true if successful, false if lock not available
*/
Datum
pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS)
{
int64 key = PG_GETARG_INT64(0);
LOCKTAG tag;
LockAcquireResult res;
PreventAdvisoryLocksInParallelMode();
SET_LOCKTAG_INT64(tag, key);
res = LockAcquire(&tag, ExclusiveLock, false, true);
PG_RETURN_BOOL(res != LOCKACQUIRE_NOT_AVAIL);
}
pg_try_advisory_xact_lock锁申请方法,如果遇到冲突,则不等待,返回FALSE。这也是高效的原因。
src/backend/storage/lmgr/lock.c
/*
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
*
* Inputs:
* locktag: unique identifier for the lockable object
* lockmode: lock mode to acquire
* sessionLock: if true, acquire lock for session not current transaction
* dontWait: if true, don't wait to acquire lock
*
* Returns one of:
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
*
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
* this same transaction, there is no need to examine the return value.
*
* Side Effects: The lock is acquired and recorded in lock tables.
*
* NOTE: if we wait for the lock, there is no way to abort the wait
* short of aborting the transaction.
*/
LockAcquireResult
LockAcquire(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait)
{
return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
}
/*
* LockAcquireExtended - allows us to specify additional options
*
* reportMemoryError specifies whether a lock request that fills the
* lock table should generate an ERROR or not. This allows a priority
* caller to note that the lock table is full and then begin taking
* extreme action to reduce the number of other lock holders before
* retrying the action.
*/
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
bool reportMemoryError)
{
检测锁冲突
/*
* If lock requested conflicts with locks requested by waiters, must join
* wait queue. Otherwise, check for conflict with already-held locks.
* (That's last because most complex check.)
*/
if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
status = STATUS_FOUND;
else
status = LockCheckConflicts(lockMethodTable, lockmode,
lock, proclock);
不冲突,则赋予锁
if (status == STATUS_OK)
{
/* No conflict with held or previously requested locks */
GrantLock(lock, proclock, lockmode);
GrantLockLocal(locallock, owner);
}
否则等待、或跳过锁等待。pg_try_advisory_xact_lock_int8为跳过锁等待。
赋予锁,更新lock, proclock数据结构,表示锁被该进程获取。
/*
* GrantLock -- update the lock and proclock data structures to show
* the lock request has been granted.
*
* NOTE: if proc was blocked, it also needs to be removed from the wait list
* and have its waitLock/waitProcLock fields cleared. That's not done here.
*
* NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
* table entry; but since we may be awaking some other process, we can't do
* that here; it's done by GrantLockLocal, instead.
*/
void
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
{
lock->nGranted++;
lock->granted[lockmode]++;
lock->grantMask |= LOCKBIT_ON(lockmode);
if (lock->granted[lockmode] == lock->requested[lockmode])
lock->waitMask &= LOCKBIT_OFF(lockmode);
proclock->holdMask |= LOCKBIT_ON(lockmode);
LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nGranted <= lock->nRequested);
}
小结
1、使用advisory lock时,需要注意一点,因为它是库级别的轻量级锁,所以对于不同的业务(无需相互堵塞的业务),建议设计不同的advisory lock的ID空间,例如A业务的LOCK空间是1-1000000, B业务的LOCK空间是1000001-2000000的空间。诸如此类等等。
2、update, insert, delete都带returning语法,可以返回NEW, OLD value。
3、advisory 的其他应用:
《PostgreSQL 使用advisory lock实现行级读写堵塞》
《PostgreSQL 无缝自增ID的实现 - by advisory lock》
《PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率》
4、advisory lock的级别分事务级和会话级,根据业务的需求进行选择。