PostgreSQL 流复制xlog异步send

4 minute read

背景

PostgreSQL的流复制相比大家并不陌生,但是目前PG为了保证主的高度统治地位,一切以主库为准。包括SEND WAL时,也要求主已经FLUSH才能发给备库。

这实际上会导致些许的延迟,当然这个延迟目前来看可以忽略不计,但是随着硬件的发展,将来这个模式可能就会不适应。

那么能不能让主库的WAL record已经调用write或者已经写入wal buffer就允许发给备库,实现一步的wal send呢。

当然是可以的,来看一下。

源码

GetFlushRecPtr()可以修改为write位置,或者Insert的位置,实现异步的send。

《PostgreSQL xlog的位置》

src/backend/replication/walsender.c

/*
 * Wait till WAL < loc is flushed to disk so it can be safely read.
 */
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
        int                     wakeEvents;
        static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;


        /*
         * Fast path to avoid acquiring the spinlock in the we already know we
         * have enough WAL available. This is particularly interesting if we're
         * far behind.
         */
        if (RecentFlushPtr != InvalidXLogRecPtr &&
                loc <= RecentFlushPtr)
                return RecentFlushPtr;

        /* Get a more recent flush pointer. */
        if (!RecoveryInProgress())
                RecentFlushPtr = GetFlushRecPtr();  // 获取已flush位点
        else
                RecentFlushPtr = GetXLogReplayRecPtr(NULL);

        for (;;)
        {
                long            sleeptime;
                TimestampTz now;

                /*
                 * Emergency bailout if postmaster has died.  This is to avoid the
                 * necessity for manual cleanup of all postmaster children.
                 */
                if (!PostmasterIsAlive())
                        exit(1);

                /* Clear any already-pending wakeups */
                ResetLatch(MyLatch);

                CHECK_FOR_INTERRUPTS();

                /* Process any requests or signals received recently */
                if (got_SIGHUP)
                {
                        got_SIGHUP = false;
                        ProcessConfigFile(PGC_SIGHUP);
                        SyncRepInitConfig();
                }

                /* Check for input from the client */
                ProcessRepliesIfAny();

                /* Update our idea of the currently flushed position. */
                if (!RecoveryInProgress())
                        RecentFlushPtr = GetFlushRecPtr();  // 获取已flush位点
                else
                        RecentFlushPtr = GetXLogReplayRecPtr(NULL);

                /*
                 * If postmaster asked us to stop, don't wait here anymore. This will
                 * cause the xlogreader to return without reading a full record, which
                 * is the fastest way to reach the mainloop which then can quit.
                 *
                 * It's important to do this check after the recomputation of
                 * RecentFlushPtr, so we can send all remaining data before shutting
                 * down.
                 */
                if (walsender_ready_to_stop)
                        break;

                /*
                 * We only send regular messages to the client for full decoded
                 * transactions, but a synchronous replication and walsender shutdown
                 * possibly are waiting for a later location. So we send pings
                 * containing the flush location every now and then.
                 */
                if (MyWalSnd->flush < sentPtr &&
                        MyWalSnd->write < sentPtr &&
                        !waiting_for_ping_response)
                {
                        WalSndKeepalive(false);
                        waiting_for_ping_response = true;
                }

                /* check whether we're done */
                if (loc <= RecentFlushPtr)
                        break;

                /* Waiting for new WAL. Since we need to wait, we're now caught up. */
                WalSndCaughtUp = true;

                /*
                 * Try to flush pending output to the client. Also wait for the socket
                 * becoming writable, if there's still pending output after an attempt
                 * to flush. Otherwise we might just sit on output data while waiting
                 * for new WAL being generated.
                 */
                if (pq_flush_if_writable() != 0)
                        WalSndShutdown();

                now = GetCurrentTimestamp();

                /* die if timeout was reached */
                WalSndCheckTimeOut(now);

                /* Send keepalive if the time has come */
                WalSndKeepaliveIfNecessary(now);
                sleeptime = WalSndComputeSleeptime(now);

                wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
                        WL_SOCKET_READABLE | WL_TIMEOUT;

                if (pq_is_send_pending())
                        wakeEvents |= WL_SOCKET_WRITEABLE;

                /* Sleep until something happens or we time out */
                WaitLatchOrSocket(MyLatch, wakeEvents,
                                                  MyProcPort->sock, sleeptime);
        }

        /* reactivate latch so WalSndLoop knows to continue */
        SetLatch(MyLatch);
        return RecentFlushPtr;
}
static void
XLogSendPhysical(void)
{
......
        /* Figure out how far we can safely send the WAL. */
        if (sendTimeLineIsHistoric)
        {
......
        }
        else if (am_cascading_walsender)
        {
......
        }
        else
        {
                /*
                 * Streaming the current timeline on a master.
                 *
                 * Attempt to send all data that's already been written out and
                 * fsync'd to disk.  We cannot go further than what's been written out
                 * given the current implementation of XLogRead().  And in any case
                 * it's unsafe to send WAL that is not securely down to disk on the
                 * master: if the master subsequently crashes and restarts, slaves
                 * must not have applied any WAL that gets lost on the master.
                 */
                SendRqstPtr = GetFlushRecPtr(); 
        }

src/backend/access/transam/xlog.c

/*
 * Return the current Redo pointer from shared memory.
 *
 * As a side-effect, the local RedoRecPtr copy is updated.
 */
XLogRecPtr
GetRedoRecPtr(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile XLogCtlData *xlogctl = XLogCtl;
	XLogRecPtr	ptr;

	/*
	 * The possibly not up-to-date copy in XlogCtl is enough. Even if we
	 * grabbed a WAL insertion lock to read the master copy, someone might
	 * update it just after we've released the lock.
	 */
	SpinLockAcquire(&xlogctl->info_lck);
	ptr = xlogctl->RedoRecPtr;
	SpinLockRelease(&xlogctl->info_lck);

	if (RedoRecPtr < ptr)
		RedoRecPtr = ptr;

	return RedoRecPtr;
}

/*
 * GetInsertRecPtr -- Returns the current insert position.
 *
 * NOTE: The value *actually* returned is the position of the last full
 * xlog page. It lags behind the real insert position by at most 1 page.
 * For that, we don't need to scan through WAL insertion locks, and an
 * approximation is enough for the current usage of this function.
 */
XLogRecPtr
GetInsertRecPtr(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile XLogCtlData *xlogctl = XLogCtl;
	XLogRecPtr	recptr;

	SpinLockAcquire(&xlogctl->info_lck);
	recptr = xlogctl->LogwrtRqst.Write;
	SpinLockRelease(&xlogctl->info_lck);

	return recptr;
}

/*
 * GetFlushRecPtr -- Returns the current flush position, ie, the last WAL
 * position known to be fsync'd to disk.
 */
XLogRecPtr
GetFlushRecPtr(void)
{
	/* use volatile pointer to prevent code rearrangement */
	volatile XLogCtlData *xlogctl = XLogCtl;
	XLogRecPtr	recptr;

	SpinLockAcquire(&xlogctl->info_lck);
	recptr = xlogctl->LogwrtResult.Flush;
	SpinLockRelease(&xlogctl->info_lck);

	return recptr;
}

Flag Counter

digoal’s 大量PostgreSQL文章入口