advisory lock 实现高并发非堵塞式 业务锁

5 minute read

背景

某些业务会利用数据库来作为一种可靠的锁,例如任务调度系统,或者其他需要可靠的锁机制的系统。

通常他们可能会使用数据库的一条记录来实现锁的SLOT和状态信息。

例如

create table lock_test (  
  tid int primary key,   -- 任务ID  
  state int default 1,   -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束  
  retry int default -1,   -- 重试次数  
  info text,   -- 其他信息  
  crt_time timestamp  -- 时间  
);  

任务处理系统到数据库获取任务

例如

update lock_test set state=-1 , retry=retry+1 where tid=? and state=1;  

处理失败

update lock_test set state=1 where tid=? and state=-1;  

处理成功

update lock_test set state=0 where tid=? and state=-1;  

当多个客户端并行获得同一个任务时,就会引发冲突,导致等待(虽然等待时间可能不长,但是在大型任务调度系统中,一点点的等待都无法忍受)。

如何解决这个冲突等待呢?

advisory lock登场,实际上在秒杀业务中我们也看到了它的踪影。

《PostgreSQL 使用advisory lock实现行级读写堵塞》

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

《PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率》

《聊一聊双十一背后的技术 - 不一样的秒杀技术, 裸秒》

advisory lock 实现高并发非堵塞式 业务锁

事务级或会话级,根据业务形态选择。

                                        List of functions  
   Schema   |               Name               | Result data type | Argument data types |  Type    
------------+----------------------------------+------------------+---------------------+--------  
 pg_catalog | pg_try_advisory_lock             | boolean          | bigint              | normal  
 pg_catalog | pg_try_advisory_xact_lock        | boolean          | bigint              | normal  

SQL改造如下

开始处理任务

update lock_test set state=-1 , retry=retry+1 where tid=? and state=1 and pg_try_advisory_xact_lock(?) returning *;  

处理失败

update lock_test set state=1 where tid=? and state=-1 and pg_try_advisory_xact_lock(?);  

处理成功

update lock_test set state=0 where tid=? and state=-1 and pg_try_advisory_xact_lock(?);  

性能压测对比

为了体现冲突的问题,我们使用一条记录来表示一个任务,大家都来抢一个任务的极端场景。

create table lock_test (  
  tid int primary key,   -- 任务ID  
  state int default 1,   -- 任务状态,1表示初始状态,-1表示正在处理, 0表示处理结束  
  retry int default -1,   -- 重试次数  
  info text,   -- 其他信息  
  crt_time timestamp  -- 时间  
);  
  
insert into lock_test values (1, 1, -1, 'test', now());  

1、传统模式压测

vi test1.sql  
  
update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1;  
update lock_test set state=1 where tid=1 and state=-1;  
  
pgbench -M prepared -n -r -P 1 -f ./test1.sql -c 64 -j 64 -T 120  
  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 120 s  
number of transactions actually processed: 966106  
latency average = 7.940 ms  
latency stddev = 6.840 ms  
tps = 8050.081170 (including connections establishing)  
tps = 8054.812052 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         3.978  update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1;  
         3.962  update lock_test set state=1 where tid=1 and state=-1;  

2、advisory lock模式压测

vi test2.sql  
  
update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1 and pg_try_advisory_xact_lock(1) returning *;  
update lock_test set state=1 where tid=1 and state=-1 and pg_try_advisory_xact_lock(1);  
  
pgbench -M prepared -n -r -P 1 -f ./test2.sql -c 64 -j 64 -T 120  
  
query mode: prepared  
number of clients: 64  
number of threads: 64  
duration: 120 s  
number of transactions actually processed: 23984594  
latency average = 0.320 ms  
latency stddev = 0.274 ms  
tps = 199855.983575 (including connections establishing)  
tps = 199962.502494 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.163  update lock_test set state=-1 , retry=retry+1 where tid=1 and state=1 and pg_try_advisory_xact_lock(1) returning *;  
         0.156  update lock_test set state=1 where tid=1 and state=-1 and pg_try_advisory_xact_lock(1);  

8000 TPS提升到20万 TPS。开不开心、意不意外。

advisory lock锁机制简介

advisory lock的范围是(数据库、锁ID,会话或事务级别)。

src/backend/utils/adt/lockfuncs.c

/*  
 * Functions for manipulating advisory locks  
 *  
 * We make use of the locktag fields as follows:  
 *  
 *      field1: MyDatabaseId ... ensures locks are local to each database  
 *      field2: first of 2 int4 keys, or high-order half of an int8 key  
 *      field3: second of 2 int4 keys, or low-order half of an int8 key  
 *      field4: 1 if using an int8 key, 2 if using 2 int4 keys  
 */  
#define SET_LOCKTAG_INT64(tag, key64) \  
        SET_LOCKTAG_ADVISORY(tag, \  
                                                 MyDatabaseId, \  
                                                 (uint32) ((key64) >> 32), \  
                                                 (uint32) (key64), \  
                                                 1)  
#define SET_LOCKTAG_INT32(tag, key1, key2) \  
        SET_LOCKTAG_ADVISORY(tag, MyDatabaseId, key1, key2, 2)  

pg_try_advisory_xact_lock锁是如何实现的?

/*  
 * pg_try_advisory_xact_lock(int8) - acquire xact scoped  
 * exclusive lock on an int8 key, no wait  
 *  
 * Returns true if successful, false if lock not available  
 */  
Datum  
pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS)  
{  
        int64           key = PG_GETARG_INT64(0);  
        LOCKTAG         tag;  
        LockAcquireResult res;  
  
        PreventAdvisoryLocksInParallelMode();  
        SET_LOCKTAG_INT64(tag, key);  
  
        res = LockAcquire(&tag, ExclusiveLock, false, true);  
  
        PG_RETURN_BOOL(res != LOCKACQUIRE_NOT_AVAIL);  
}  

pg_try_advisory_xact_lock锁申请方法,如果遇到冲突,则不等待,返回FALSE。这也是高效的原因。

src/backend/storage/lmgr/lock.c

/*  
 * LockAcquire -- Check for lock conflicts, sleep if conflict found,  
 *              set lock if/when no conflicts.  
 *  
 * Inputs:  
 *      locktag: unique identifier for the lockable object  
 *      lockmode: lock mode to acquire  
 *      sessionLock: if true, acquire lock for session not current transaction  
 *      dontWait: if true, don't wait to acquire lock  
 *  
 * Returns one of:  
 *              LOCKACQUIRE_NOT_AVAIL           lock not available, and dontWait=true  
 *              LOCKACQUIRE_OK                          lock successfully acquired  
 *              LOCKACQUIRE_ALREADY_HELD        incremented count for lock already held  
 *  
 * In the normal case where dontWait=false and the caller doesn't need to  
 * distinguish a freshly acquired lock from one already taken earlier in  
 * this same transaction, there is no need to examine the return value.  
 *  
 * Side Effects: The lock is acquired and recorded in lock tables.  
 *  
 * NOTE: if we wait for the lock, there is no way to abort the wait  
 * short of aborting the transaction.  
 */  
LockAcquireResult  
LockAcquire(const LOCKTAG *locktag,  
                        LOCKMODE lockmode,  
                        bool sessionLock,  
                        bool dontWait)  
{  
        return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);  
}  
  
  
/*  
 * LockAcquireExtended - allows us to specify additional options  
 *  
 * reportMemoryError specifies whether a lock request that fills the  
 * lock table should generate an ERROR or not. This allows a priority  
 * caller to note that the lock table is full and then begin taking  
 * extreme action to reduce the number of other lock holders before  
 * retrying the action.  
 */  
LockAcquireResult  
LockAcquireExtended(const LOCKTAG *locktag,  
                                        LOCKMODE lockmode,  
                                        bool sessionLock,  
                                        bool dontWait,  
                                        bool reportMemoryError)  
{  
  
检测锁冲突  
  
        /*  
         * If lock requested conflicts with locks requested by waiters, must join  
         * wait queue.  Otherwise, check for conflict with already-held locks.  
         * (That's last because most complex check.)  
         */  
        if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)  
                status = STATUS_FOUND;  
        else  
                status = LockCheckConflicts(lockMethodTable, lockmode,  
                                                                        lock, proclock);  
  
不冲突,则赋予锁  
  
        if (status == STATUS_OK)  
        {  
                /* No conflict with held or previously requested locks */  
                GrantLock(lock, proclock, lockmode);  
                GrantLockLocal(locallock, owner);  
        }  
  
  
否则等待、或跳过锁等待。pg_try_advisory_xact_lock_int8为跳过锁等待。  
  
赋予锁,更新lock, proclock数据结构,表示锁被该进程获取。

/*
 * GrantLock -- update the lock and proclock data structures to show
 *              the lock request has been granted.
 *
 * NOTE: if proc was blocked, it also needs to be removed from the wait list
 * and have its waitLock/waitProcLock fields cleared.  That's not done here.
 *
 * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
 * table entry; but since we may be awaking some other process, we can't do
 * that here; it's done by GrantLockLocal, instead.
 */
void
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
{
        lock->nGranted++;
        lock->granted[lockmode]++;
        lock->grantMask |= LOCKBIT_ON(lockmode);
        if (lock->granted[lockmode] == lock->requested[lockmode])
                lock->waitMask &= LOCKBIT_OFF(lockmode);
        proclock->holdMask |= LOCKBIT_ON(lockmode);
        LOCK_PRINT("GrantLock", lock, lockmode);
        Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
        Assert(lock->nGranted <= lock->nRequested);
}

小结

1、使用advisory lock时,需要注意一点,因为它是库级别的轻量级锁,所以对于不同的业务(无需相互堵塞的业务),建议设计不同的advisory lock的ID空间,例如A业务的LOCK空间是1-1000000, B业务的LOCK空间是1000001-2000000的空间。诸如此类等等。

2、update, insert, delete都带returning语法,可以返回NEW, OLD value。

3、advisory 的其他应用:

《PostgreSQL 使用advisory lock实现行级读写堵塞》

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

《PostgreSQL 使用advisory lock或skip locked消除行锁冲突, 提高几十倍并发更新效率》

《聊一聊双十一背后的技术 - 不一样的秒杀技术, 裸秒》

4、advisory lock的级别分事务级和会话级,根据业务的需求进行选择。

Flag Counter

digoal’s 大量PostgreSQL文章入口