PostgreSQL 流复制xlog异步send
背景
PostgreSQL的流复制相比大家并不陌生,但是目前PG为了保证主的高度统治地位,一切以主库为准。包括SEND WAL时,也要求主已经FLUSH才能发给备库。
这实际上会导致些许的延迟,当然这个延迟目前来看可以忽略不计,但是随着硬件的发展,将来这个模式可能就会不适应。
那么能不能让主库的WAL record已经调用write或者已经写入wal buffer就允许发给备库,实现一步的wal send呢。
当然是可以的,来看一下。
源码
GetFlushRecPtr()可以修改为write位置,或者Insert的位置,实现异步的send。
src/backend/replication/walsender.c
/*
* Wait till WAL < loc is flushed to disk so it can be safely read.
*/
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
/*
* Fast path to avoid acquiring the spinlock in the we already know we
* have enough WAL available. This is particularly interesting if we're
* far behind.
*/
if (RecentFlushPtr != InvalidXLogRecPtr &&
loc <= RecentFlushPtr)
return RecentFlushPtr;
/* Get a more recent flush pointer. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr(); // 获取已flush位点
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
for (;;)
{
long sleeptime;
TimestampTz now;
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive())
exit(1);
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
got_SIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
SyncRepInitConfig();
}
/* Check for input from the client */
ProcessRepliesIfAny();
/* Update our idea of the currently flushed position. */
if (!RecoveryInProgress())
RecentFlushPtr = GetFlushRecPtr(); // 获取已flush位点
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
/*
* If postmaster asked us to stop, don't wait here anymore. This will
* cause the xlogreader to return without reading a full record, which
* is the fastest way to reach the mainloop which then can quit.
*
* It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting
* down.
*/
if (walsender_ready_to_stop)
break;
/*
* We only send regular messages to the client for full decoded
* transactions, but a synchronous replication and walsender shutdown
* possibly are waiting for a later location. So we send pings
* containing the flush location every now and then.
*/
if (MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
{
WalSndKeepalive(false);
waiting_for_ping_response = true;
}
/* check whether we're done */
if (loc <= RecentFlushPtr)
break;
/* Waiting for new WAL. Since we need to wait, we're now caught up. */
WalSndCaughtUp = true;
/*
* Try to flush pending output to the client. Also wait for the socket
* becoming writable, if there's still pending output after an attempt
* to flush. Otherwise we might just sit on output data while waiting
* for new WAL being generated.
*/
if (pq_flush_if_writable() != 0)
WalSndShutdown();
now = GetCurrentTimestamp();
/* die if timeout was reached */
WalSndCheckTimeOut(now);
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary(now);
sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_READABLE | WL_TIMEOUT;
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
}
/* reactivate latch so WalSndLoop knows to continue */
SetLatch(MyLatch);
return RecentFlushPtr;
}
static void
XLogSendPhysical(void)
{
......
/* Figure out how far we can safely send the WAL. */
if (sendTimeLineIsHistoric)
{
......
}
else if (am_cascading_walsender)
{
......
}
else
{
/*
* Streaming the current timeline on a master.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of XLogRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* master: if the master subsequently crashes and restarts, slaves
* must not have applied any WAL that gets lost on the master.
*/
SendRqstPtr = GetFlushRecPtr();
}
src/backend/access/transam/xlog.c
/*
* Return the current Redo pointer from shared memory.
*
* As a side-effect, the local RedoRecPtr copy is updated.
*/
XLogRecPtr
GetRedoRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr ptr;
/*
* The possibly not up-to-date copy in XlogCtl is enough. Even if we
* grabbed a WAL insertion lock to read the master copy, someone might
* update it just after we've released the lock.
*/
SpinLockAcquire(&xlogctl->info_lck);
ptr = xlogctl->RedoRecPtr;
SpinLockRelease(&xlogctl->info_lck);
if (RedoRecPtr < ptr)
RedoRecPtr = ptr;
return RedoRecPtr;
}
/*
* GetInsertRecPtr -- Returns the current insert position.
*
* NOTE: The value *actually* returned is the position of the last full
* xlog page. It lags behind the real insert position by at most 1 page.
* For that, we don't need to scan through WAL insertion locks, and an
* approximation is enough for the current usage of this function.
*/
XLogRecPtr
GetInsertRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->LogwrtRqst.Write;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}
/*
* GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
* position known to be fsync'd to disk.
*/
XLogRecPtr
GetFlushRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->LogwrtResult.Flush;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
}