pg_clog的一致性 & 异步事务提交

6 minute read

背景

异步提交是指不需要等待事务对应的wal buffer fsync到磁盘,即返回,而且写CLOG时也不需要等待XLOG落盘。

而pg_clog和pg_xlog是两部分存储的,那么我们想一想,如果一个已提交事务的pg_clog已经落盘,而XLOG没有落盘,刚好此时数据库CRASH了。

数据库恢复时,由于该事务对应的XLOG缺失,数据无法恢复到最终状态,但是PG_CLOG却显示该事务已提交,这就出问题了。

所以对于异步事务,CLOG在write前,务必等待该事务对应的XLOG已经FLUSH到磁盘。

PostgreSQL如何记录事务和它产生的XLOG的LSN的关系呢?

其实不是一一对应的关系,而是记录了多事务对一个LSN的关系。

src/backend/access/transam/clog.c

LSN组,每32个事务,记录它们对应的最大LSN。

也就是32个事务,只记录最大的LSN。节约空间?

/* We store the latest async LSN for each group of transactions */  
#define CLOG_XACTS_PER_LSN_GROUP        32      /* keep this a power of 2 */  

每个CLOG页需要分成多少个LSN组。

#define CLOG_LSNS_PER_PAGE      (CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP)  
  
#define GetLSNIndex(slotno, xid)        ((slotno) * CLOG_LSNS_PER_PAGE + \  
        ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)  

LSN被存储在这个数据结构中

src/include/access/slru.h

/*  
 * Shared-memory state  
 */  
typedef struct SlruSharedData  
{  
......  
	/*  
         * Optional array of WAL flush LSNs associated with entries in the SLRU  
         * pages.  If not zero/NULL, we must flush WAL before writing pages (true  
         * for pg_clog, false for multixact, pg_subtrans, pg_notify).  group_lsn[]  
         * has lsn_groups_per_page entries per buffer slot, each containing the  
         * highest LSN known for a contiguous group of SLRU entries on that slot's  
         * page.  仅仅pg_clog需要记录group_lsn  
         */  
        XLogRecPtr *group_lsn;  // 一个数组,存储32个事务组成的组中最大的LSN号。  
        int                     lsn_groups_per_page;  
......  

src/backend/access/transam/clog.c

 * lsn must be the WAL location of the commit record when recording an async  
 * commit.  For a synchronous commit it can be InvalidXLogRecPtr, since the  
 * caller guarantees the commit record is already flushed in that case.  It  
 * should be InvalidXLogRecPtr for abort cases, too.  
  
void  
TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,  
                                        TransactionId *subxids, XidStatus status, XLogRecPtr lsn)  
{  
......  

更新事务状态时,同时更新对应LSN组的LSN为最大LSN值。(CLOG BUFFER中的操作)

/*  
 * Sets the commit status of a single transaction.  
 *  
 * Must be called with CLogControlLock held  
 */  
static void  
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)  
{  
......  
        /*  
         * Update the group LSN if the transaction completion LSN is higher.  
         *  
         * Note: lsn will be invalid when supplied during InRecovery processing,  
         * so we don't need to do anything special to avoid LSN updates during  
         * recovery. After recovery completes the next clog change will set the  
         * LSN correctly.  
         */  
        if (!XLogRecPtrIsInvalid(lsn))  
        {  
                int                     lsnindex = GetLSNIndex(slotno, xid);  
  
                if (ClogCtl->shared->group_lsn[lsnindex] < lsn)  // 更新组LSN  
                        ClogCtl->shared->group_lsn[lsnindex] = lsn;  
        }  
......  

将事务标记为commit状态,对于异步事务,多一个LSN参数,用于修改事务组的最大LSN。

/*  
 * TransactionIdCommitTree  
 *              Marks the given transaction and children as committed  
 *  
 * "xid" is a toplevel transaction commit, and the xids array contains its  
 * committed subtransactions.  
 *  
 * This commit operation is not guaranteed to be atomic, but if not, subxids  
 * are correctly marked subcommit first.  
 */  
void  
TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)  
{  
        TransactionIdSetTreeStatus(xid, nxids, xids,  
                                                           TRANSACTION_STATUS_COMMITTED,  
                                                           InvalidXLogRecPtr);  
}  
  
/*  
 * TransactionIdAsyncCommitTree  
 *              Same as above, but for async commits.  The commit record LSN is needed.  
 */  
void  
TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId *xids,  
                                                         XLogRecPtr lsn)  
{  
        TransactionIdSetTreeStatus(xid, nxids, xids,  
                                                           TRANSACTION_STATUS_COMMITTED, lsn);  
}  
  
/*  
 * TransactionIdAbortTree  
 *              Marks the given transaction and children as aborted.  
 *  
 * "xid" is a toplevel transaction commit, and the xids array contains its  
 * committed subtransactions.  
 *  
 * We don't need to worry about the non-atomic behavior, since any onlookers  
 * will consider all the xacts as not-yet-committed anyway.  
 */  
void  
TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)  
{  
        TransactionIdSetTreeStatus(xid, nxids, xids,  
                                                           TRANSACTION_STATUS_ABORTED, InvalidXLogRecPtr);  
}  

从XID号,获取它对应的LSN,需要注意的是,这个XID如果是一个FROZEN XID,则返回一个(XLogRecPtr) invalid lsn。

src/backend/access/transam/transam.c

/*  
 * TransactionIdGetCommitLSN  
 *  
 * This function returns an LSN that is late enough to be able  
 * to guarantee that if we flush up to the LSN returned then we  
 * will have flushed the transaction's commit record to disk.  
 *  
 * The result is not necessarily the exact LSN of the transaction's  
 * commit record!  For example, for long-past transactions (those whose  
 * clog pages already migrated to disk), we'll return InvalidXLogRecPtr.  
 * Also, because we group transactions on the same clog page to conserve  
 * storage, we might return the LSN of a later transaction that falls into  
 * the same group.  
 */  
XLogRecPtr  
TransactionIdGetCommitLSN(TransactionId xid)  
{  
        XLogRecPtr      result;  
  
        /*  
         * Currently, all uses of this function are for xids that were just  
         * reported to be committed by TransactionLogFetch, so we expect that  
         * checking TransactionLogFetch's cache will usually succeed and avoid an  
         * extra trip to shared memory.  
         */  
        if (TransactionIdEquals(xid, cachedFetchXid))  
                return cachedCommitLSN;  
  
        /* Special XIDs are always known committed */  
        if (!TransactionIdIsNormal(xid))  
                return InvalidXLogRecPtr;  
  
        /*  
         * Get the transaction status.  
         */  
        (void) TransactionIdGetStatus(xid, &result);  
  
        return result;  
}  
  
  
/*  
 * Interrogate the state of a transaction in the commit log.  
 *  
 * Aside from the actual commit status, this function returns (into *lsn)  
 * an LSN that is late enough to be able to guarantee that if we flush up to  
 * that LSN then we will have flushed the transaction's commit record to disk.  
 * The result is not necessarily the exact LSN of the transaction's commit  
 * record!      For example, for long-past transactions (those whose clog pages  // long-past事务,指非标准事务号。例如frozen xid。  
 * already migrated to disk), we'll return InvalidXLogRecPtr.  Also, because  
 * we group transactions on the same clog page to conserve storage, we might  
 * return the LSN of a later transaction that falls into the same group.  
 *  
 * NB: this is a low-level routine and is NOT the preferred entry point  
 * for most uses; TransactionLogFetch() in transam.c is the intended caller.  
 */  
XidStatus  
TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)  
{  
        int                     pageno = TransactionIdToPage(xid);  
        int                     byteno = TransactionIdToByte(xid);  
        int                     bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;  
        int                     slotno;  
        int                     lsnindex;  
        char       *byteptr;  
        XidStatus       status;  
  
        /* lock is acquired by SimpleLruReadPage_ReadOnly */  
  
        slotno = SimpleLruReadPage_ReadOnly(ClogCtl, pageno, xid);  
        byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;  
  
        status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;  
  
        lsnindex = GetLSNIndex(slotno, xid);  
        *lsn = ClogCtl->shared->group_lsn[lsnindex];  
  
        LWLockRelease(CLogControlLock);  
  
        return status;  
}  

前面所涉及的都是CLOG BUFFER中的操作,如果要将buffer写到磁盘,则真正需要涉及到一致性的问题,即在将CLOG write到磁盘前,必须先确保对应的事务产生的XLOG已经flush到磁盘。

那么这里就需要用到前面每个LSN组中记录的max LSN了。代码如下:

src/backend/access/transam/slru.c

/*  
 * Physical write of a page from a buffer slot  
 *  
 * On failure, we cannot just ereport(ERROR) since caller has put state in  
 * shared memory that must be undone.  So, we return FALSE and save enough  
 * info in static variables to let SlruReportIOError make the report.  
 *  
 * For now, assume it's not worth keeping a file pointer open across  
 * independent read/write operations.  We do batch operations during  
 * SimpleLruFlush, though.  
 *  
 * fdata is NULL for a standalone write, pointer to open-file info during  
 * SimpleLruFlush.  
 */  
static bool  
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)  
{  
        SlruShared      shared = ctl->shared;  
        int                     segno = pageno / SLRU_PAGES_PER_SEGMENT;  
        int                     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;  
        int                     offset = rpageno * BLCKSZ;  
        char            path[MAXPGPATH];  
        int                     fd = -1;  
  
        /*  
         * Honor the write-WAL-before-data rule, if appropriate, so that we do not  
         * write out data before associated WAL records.  This is the same action  
         * performed during FlushBuffer() in the main buffer manager.  
         */  
        if (shared->group_lsn != NULL)  
        {  
                /*  
                 * We must determine the largest async-commit LSN for the page. This  
                 * is a bit tedious, but since this entire function is a slow path  
                 * anyway, it seems better to do this here than to maintain a per-page  
                 * LSN variable (which'd need an extra comparison in the  
                 * transaction-commit path).  
                 */  
                XLogRecPtr      max_lsn;  
                int                     lsnindex,  
                                        lsnoff;  
  
                lsnindex = slotno * shared->lsn_groups_per_page;  
                max_lsn = shared->group_lsn[lsnindex++];  
                for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)  
                {  
                        XLogRecPtr      this_lsn = shared->group_lsn[lsnindex++];  
  
                        if (max_lsn < this_lsn)  
                                max_lsn = this_lsn;  
                }  
  
                if (!XLogRecPtrIsInvalid(max_lsn))  // 判断max_lsn是不是一个有效的LSN,如果是有效的LSN,说明需要先调用xlogflush将wal buffer中小于该LSN以及以前的buffer写入磁盘。  
		                                                   //  确保write-WAL-before-data规则。  
                {  
                        /*  
                         * As noted above, elog(ERROR) is not acceptable here, so if  
                         * XLogFlush were to fail, we must PANIC.  This isn't much of a  
                         * restriction because XLogFlush is just about all critical  
                         * section anyway, but let's make sure.  
                         */  
                        START_CRIT_SECTION();  
                        XLogFlush(max_lsn);  
                        END_CRIT_SECTION();  
                }  
        }  
......  

小结

对于异步事务,如何保证write-WAL-before-data规则?

pg_clog将32个事务分为一组,存储这些事务的最大LSN。存储在SlruSharedData结构中。

在将clog buffer write到磁盘前,需要确保该clog page对应事务的xlog LSN已经flush到磁盘。

参考

src/backend/access/transam/clog.c

src/include/access/slru.h

src/backend/access/transam/transam.c

src/backend/access/transam/slru.c

Flag Counter

digoal’s 大量PostgreSQL文章入口