pg_clog的一致性 & 异步事务提交
背景
异步提交是指不需要等待事务对应的wal buffer fsync到磁盘,即返回,而且写CLOG时也不需要等待XLOG落盘。
而pg_clog和pg_xlog是两部分存储的,那么我们想一想,如果一个已提交事务的pg_clog已经落盘,而XLOG没有落盘,刚好此时数据库CRASH了。
数据库恢复时,由于该事务对应的XLOG缺失,数据无法恢复到最终状态,但是PG_CLOG却显示该事务已提交,这就出问题了。
所以对于异步事务,CLOG在write前,务必等待该事务对应的XLOG已经FLUSH到磁盘。
PostgreSQL如何记录事务和它产生的XLOG的LSN的关系呢?
其实不是一一对应的关系,而是记录了多事务对一个LSN的关系。
src/backend/access/transam/clog.c
LSN组,每32个事务,记录它们对应的最大LSN。
也就是32个事务,只记录最大的LSN。节约空间?
/* We store the latest async LSN for each group of transactions */
#define CLOG_XACTS_PER_LSN_GROUP 32 /* keep this a power of 2 */
每个CLOG页需要分成多少个LSN组。
#define CLOG_LSNS_PER_PAGE (CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP)
#define GetLSNIndex(slotno, xid) ((slotno) * CLOG_LSNS_PER_PAGE + \
((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP)
LSN被存储在这个数据结构中
src/include/access/slru.h
/*
* Shared-memory state
*/
typedef struct SlruSharedData
{
......
/*
* Optional array of WAL flush LSNs associated with entries in the SLRU
* pages. If not zero/NULL, we must flush WAL before writing pages (true
* for pg_clog, false for multixact, pg_subtrans, pg_notify). group_lsn[]
* has lsn_groups_per_page entries per buffer slot, each containing the
* highest LSN known for a contiguous group of SLRU entries on that slot's
* page. 仅仅pg_clog需要记录group_lsn
*/
XLogRecPtr *group_lsn; // 一个数组,存储32个事务组成的组中最大的LSN号。
int lsn_groups_per_page;
......
src/backend/access/transam/clog.c
* lsn must be the WAL location of the commit record when recording an async
* commit. For a synchronous commit it can be InvalidXLogRecPtr, since the
* caller guarantees the commit record is already flushed in that case. It
* should be InvalidXLogRecPtr for abort cases, too.
void
TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
{
......
更新事务状态时,同时更新对应LSN组的LSN为最大LSN值。(CLOG BUFFER中的操作)
/*
* Sets the commit status of a single transaction.
*
* Must be called with CLogControlLock held
*/
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
{
......
/*
* Update the group LSN if the transaction completion LSN is higher.
*
* Note: lsn will be invalid when supplied during InRecovery processing,
* so we don't need to do anything special to avoid LSN updates during
* recovery. After recovery completes the next clog change will set the
* LSN correctly.
*/
if (!XLogRecPtrIsInvalid(lsn))
{
int lsnindex = GetLSNIndex(slotno, xid);
if (ClogCtl->shared->group_lsn[lsnindex] < lsn) // 更新组LSN
ClogCtl->shared->group_lsn[lsnindex] = lsn;
}
......
将事务标记为commit状态,对于异步事务,多一个LSN参数,用于修改事务组的最大LSN。
/*
* TransactionIdCommitTree
* Marks the given transaction and children as committed
*
* "xid" is a toplevel transaction commit, and the xids array contains its
* committed subtransactions.
*
* This commit operation is not guaranteed to be atomic, but if not, subxids
* are correctly marked subcommit first.
*/
void
TransactionIdCommitTree(TransactionId xid, int nxids, TransactionId *xids)
{
TransactionIdSetTreeStatus(xid, nxids, xids,
TRANSACTION_STATUS_COMMITTED,
InvalidXLogRecPtr);
}
/*
* TransactionIdAsyncCommitTree
* Same as above, but for async commits. The commit record LSN is needed.
*/
void
TransactionIdAsyncCommitTree(TransactionId xid, int nxids, TransactionId *xids,
XLogRecPtr lsn)
{
TransactionIdSetTreeStatus(xid, nxids, xids,
TRANSACTION_STATUS_COMMITTED, lsn);
}
/*
* TransactionIdAbortTree
* Marks the given transaction and children as aborted.
*
* "xid" is a toplevel transaction commit, and the xids array contains its
* committed subtransactions.
*
* We don't need to worry about the non-atomic behavior, since any onlookers
* will consider all the xacts as not-yet-committed anyway.
*/
void
TransactionIdAbortTree(TransactionId xid, int nxids, TransactionId *xids)
{
TransactionIdSetTreeStatus(xid, nxids, xids,
TRANSACTION_STATUS_ABORTED, InvalidXLogRecPtr);
}
从XID号,获取它对应的LSN,需要注意的是,这个XID如果是一个FROZEN XID,则返回一个(XLogRecPtr) invalid lsn。
src/backend/access/transam/transam.c
/*
* TransactionIdGetCommitLSN
*
* This function returns an LSN that is late enough to be able
* to guarantee that if we flush up to the LSN returned then we
* will have flushed the transaction's commit record to disk.
*
* The result is not necessarily the exact LSN of the transaction's
* commit record! For example, for long-past transactions (those whose
* clog pages already migrated to disk), we'll return InvalidXLogRecPtr.
* Also, because we group transactions on the same clog page to conserve
* storage, we might return the LSN of a later transaction that falls into
* the same group.
*/
XLogRecPtr
TransactionIdGetCommitLSN(TransactionId xid)
{
XLogRecPtr result;
/*
* Currently, all uses of this function are for xids that were just
* reported to be committed by TransactionLogFetch, so we expect that
* checking TransactionLogFetch's cache will usually succeed and avoid an
* extra trip to shared memory.
*/
if (TransactionIdEquals(xid, cachedFetchXid))
return cachedCommitLSN;
/* Special XIDs are always known committed */
if (!TransactionIdIsNormal(xid))
return InvalidXLogRecPtr;
/*
* Get the transaction status.
*/
(void) TransactionIdGetStatus(xid, &result);
return result;
}
/*
* Interrogate the state of a transaction in the commit log.
*
* Aside from the actual commit status, this function returns (into *lsn)
* an LSN that is late enough to be able to guarantee that if we flush up to
* that LSN then we will have flushed the transaction's commit record to disk.
* The result is not necessarily the exact LSN of the transaction's commit
* record! For example, for long-past transactions (those whose clog pages // long-past事务,指非标准事务号。例如frozen xid。
* already migrated to disk), we'll return InvalidXLogRecPtr. Also, because
* we group transactions on the same clog page to conserve storage, we might
* return the LSN of a later transaction that falls into the same group.
*
* NB: this is a low-level routine and is NOT the preferred entry point
* for most uses; TransactionLogFetch() in transam.c is the intended caller.
*/
XidStatus
TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
{
int pageno = TransactionIdToPage(xid);
int byteno = TransactionIdToByte(xid);
int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;
int slotno;
int lsnindex;
char *byteptr;
XidStatus status;
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(ClogCtl, pageno, xid);
byteptr = ClogCtl->shared->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
lsnindex = GetLSNIndex(slotno, xid);
*lsn = ClogCtl->shared->group_lsn[lsnindex];
LWLockRelease(CLogControlLock);
return status;
}
前面所涉及的都是CLOG BUFFER中的操作,如果要将buffer写到磁盘,则真正需要涉及到一致性的问题,即在将CLOG write到磁盘前,必须先确保对应的事务产生的XLOG已经flush到磁盘。
那么这里就需要用到前面每个LSN组中记录的max LSN了。代码如下:
src/backend/access/transam/slru.c
/*
* Physical write of a page from a buffer slot
*
* On failure, we cannot just ereport(ERROR) since caller has put state in
* shared memory that must be undone. So, we return FALSE and save enough
* info in static variables to let SlruReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across
* independent read/write operations. We do batch operations during
* SimpleLruFlush, though.
*
* fdata is NULL for a standalone write, pointer to open-file info during
* SimpleLruFlush.
*/
static bool
SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata)
{
SlruShared shared = ctl->shared;
int segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd = -1;
/*
* Honor the write-WAL-before-data rule, if appropriate, so that we do not
* write out data before associated WAL records. This is the same action
* performed during FlushBuffer() in the main buffer manager.
*/
if (shared->group_lsn != NULL)
{
/*
* We must determine the largest async-commit LSN for the page. This
* is a bit tedious, but since this entire function is a slow path
* anyway, it seems better to do this here than to maintain a per-page
* LSN variable (which'd need an extra comparison in the
* transaction-commit path).
*/
XLogRecPtr max_lsn;
int lsnindex,
lsnoff;
lsnindex = slotno * shared->lsn_groups_per_page;
max_lsn = shared->group_lsn[lsnindex++];
for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
{
XLogRecPtr this_lsn = shared->group_lsn[lsnindex++];
if (max_lsn < this_lsn)
max_lsn = this_lsn;
}
if (!XLogRecPtrIsInvalid(max_lsn)) // 判断max_lsn是不是一个有效的LSN,如果是有效的LSN,说明需要先调用xlogflush将wal buffer中小于该LSN以及以前的buffer写入磁盘。
// 确保write-WAL-before-data规则。
{
/*
* As noted above, elog(ERROR) is not acceptable here, so if
* XLogFlush were to fail, we must PANIC. This isn't much of a
* restriction because XLogFlush is just about all critical
* section anyway, but let's make sure.
*/
START_CRIT_SECTION();
XLogFlush(max_lsn);
END_CRIT_SECTION();
}
}
......
小结
对于异步事务,如何保证write-WAL-before-data规则?
pg_clog将32个事务分为一组,存储这些事务的最大LSN。存储在SlruSharedData结构中。
在将clog buffer write到磁盘前,需要确保该clog page对应事务的xlog LSN已经flush到磁盘。
参考
src/backend/access/transam/clog.c
src/include/access/slru.h
src/backend/access/transam/transam.c
src/backend/access/transam/slru.c