PostgreSQL pg_stat_replication sent_location, write_location, flush_location, replay_location的差别

5 minute read

背景

PostgreSQL 的流复制统计信息中记录了4个WAL日志的位置信息,这些信息是standby的receiver进程反馈给primary的wal sender进程的。

统计视图如下:

pipeline=# \d+ pg_stat_replication  
                      View "pg_catalog.pg_stat_replication"  
      Column      |           Type           | Modifiers | Storage  | Description   
------------------+--------------------------+-----------+----------+-------------  
 pid              | integer                  |           | plain    |   
 usesysid         | oid                      |           | plain    |   
 usename          | name                     |           | plain    |   
 application_name | text                     |           | extended |   
 client_addr      | inet                     |           | main     |   
 client_hostname  | text                     |           | extended |   
 client_port      | integer                  |           | plain    |   
 backend_start    | timestamp with time zone |           | plain    |   
 backend_xmin     | xid                      |           | plain    |   
 state            | text                     |           | extended |   
 sent_location    | pg_lsn                   |           | plain    |   
 write_location   | pg_lsn                   |           | plain    |   
 flush_location   | pg_lsn                   |           | plain    |   
 replay_location  | pg_lsn                   |           | plain    |   
 sync_priority    | integer                  |           | plain    |   
 sync_state       | text                     |           | extended |   
View definition:  
 SELECT s.pid,  
    s.usesysid,  
    u.rolname AS usename,  
    s.application_name,  
    s.client_addr,  
    s.client_hostname,  
    s.client_port,  
    s.backend_start,  
    s.backend_xmin,  
    w.state,  
    w.sent_location,  
    w.write_location,  
    w.flush_location,  
    w.replay_location,  
    w.sync_priority,  
    w.sync_state  
   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin),  
    pg_authid u,  
    pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)  
  WHERE s.usesysid = u.oid AND s.pid = w.pid;  

实际上这几个位置信息是从函数pg_stat_get_wal_senders获取到的,

这个函数的信息如下

pipeline=# \df+ pg_stat_get_wal_senders  
List of functions  
-[ RECORD 1 ]-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
Schema              | pg_catalog  
Name                | pg_stat_get_wal_senders  
Result data type    | SETOF record  
Argument data types | OUT pid integer, OUT state text, OUT sent_location pg_lsn, OUT write_location pg_lsn, OUT flush_location pg_lsn, OUT replay_location pg_lsn, OUT sync_priority integer, OUT sync_state text  
Type                | normal  
Security            | invoker  
Volatility          | stable  
Owner               | postgres  
Language            | internal  
Source code         | pg_stat_get_wal_senders  
Description         | statistics: information about currently active replication  

pg_stat_get_wal_senders对应的源码如下

src/backend/replication/walsender.c

/*  
 * Returns activity of walsenders, including pids and xlog locations sent to  
 * standby servers.  
 */  
Datum  
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)  
{  
...  
                        /* use volatile pointer to prevent code rearrangement */  
                        volatile WalSnd *walsnd = MyWalSnd;  
...  
                state = walsnd->state;  // 4个位置信息的值是这样得来的  
                write = walsnd->write;  
                flush = walsnd->flush;  
                apply = walsnd->apply;  
...  

walsnd的定义如下

src/include/replication/walsender_private.h

/*  
 * Each walsender has a WalSnd struct in shared memory.  
 */  
typedef struct WalSnd  
{  
        pid_t           pid;                    /* this walsender's process id, or 0 */  
        WalSndState state;                      /* this walsender's state */  
        XLogRecPtr      sentPtr;                /* WAL has been sent up to this point */  
        bool            needreload;             /* does currently-open file need to be  
                                                                 * reloaded? */  
  
        /*  
         * The xlog locations that have been written, flushed, and applied by  
         * standby-side. These may be invalid if the standby-side has not offered  
         * values yet.  
         */  
        XLogRecPtr      write;  
        XLogRecPtr      flush;  
        XLogRecPtr      apply;  
  
        /* Protects shared variables shown above. */  
        slock_t         mutex;  
  
        /*  
         * Pointer to the walsender's latch. Used by backends to wake up this  
         * walsender when it has work to do. NULL if the walsender isn't active.  
         */  
        Latch      *latch;  
  
        /*  
         * The priority order of the standby managed by this WALSender, as listed  
         * in synchronous_standby_names, or 0 if not-listed. Protected by  
         * SyncRepLock.  
         */  
        int                     sync_standby_priority;  
} WalSnd;  

src/backend/replication/walsender.c

/*  
 * Regular reply from standby advising of WAL positions on standby server.  
 */  
static void  
ProcessStandbyReplyMessage(void)  
{  
......  
        XLogRecPtr      writePtr,  
                                flushPtr,  
                                applyPtr;  
        bool            replyRequested;  
  
        /* the caller already consumed the msgtype byte */  
        writePtr = pq_getmsgint64(&reply_message); // 接收来自walreceiver的位置信息  
        flushPtr = pq_getmsgint64(&reply_message);  
        applyPtr = pq_getmsgint64(&reply_message);  
        (void) pq_getmsgint64(&reply_message);          /* sendTime; not used ATM */  
        replyRequested = pq_getmsgbyte(&reply_message);  
  
        elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",  
                 (uint32) (writePtr >> 32), (uint32) writePtr,   
                 (uint32) (flushPtr >> 32), (uint32) flushPtr,  
                 (uint32) (applyPtr >> 32), (uint32) applyPtr,  
                 replyRequested ? " (reply requested)" : "");  
......  
        /*  
         * Update shared state for this WalSender process based on reply data from  
         * standby.  
         */  
        {  
                /* use volatile pointer to prevent code rearrangement */  
                volatile WalSnd *walsnd = MyWalSnd;  
  
                SpinLockAcquire(&walsnd->mutex);  
                walsnd->write = writePtr;  // 这几个数据实际上是walsender进程从walreceiver进程接收到的  
                walsnd->flush = flushPtr;  
                walsnd->apply = applyPtr;  
                SpinLockRelease(&walsnd->mutex);  
        }  
....  

walreceiver的位置信息是如何计算的,数据结构

src/backend/replication/walreceiver.c

/*  
 * LogstreamResult indicates the byte positions that we have already  
 * written/fsynced.  
 */  
static struct  
{  
        XLogRecPtr      Write;                  /* last byte + 1 written out in the standby */  
        XLogRecPtr      Flush;                  /* last byte + 1 flushed in the standby */  
}       LogstreamResult;  
  
// 发送位置信息给walsender  
......  
/*  
 * Send reply message to primary, indicating our current XLOG positions, oldest  
 * xmin and the current time.  
 *  
 * If 'force' is not set, the message is only sent if enough time has  
 * passed since last status update to reach wal_receiver_status_interval.  
 * If wal_receiver_status_interval is disabled altogether and 'force' is  
 * false, this is a no-op.  
 *  
 * If 'requestReply' is true, requests the server to reply immediately upon  
 * receiving this message. This is used for heartbearts, when approaching  
 * wal_receiver_timeout.  
 */  
static void  
XLogWalRcvSendReply(bool force, bool requestReply)  
{  
.....  
        /* Construct a new message */  
        writePtr = LogstreamResult.Write;  
        flushPtr = LogstreamResult.Flush;  
        applyPtr = GetXLogReplayRecPtr(NULL);  
  
        resetStringInfo(&reply_message);  
        pq_sendbyte(&reply_message, 'r');  
        pq_sendint64(&reply_message, writePtr);  
        pq_sendint64(&reply_message, flushPtr);  
        pq_sendint64(&reply_message, applyPtr);  
        pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());  
        pq_sendbyte(&reply_message, requestReply ? 1 : 0);  
......  

调用write接口和fsync接口。

/*  
 * Write XLOG data to disk.  
 */  
static void  
XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)  
{  
......  
        int                     startoff;  
        int                     byteswritten;  
  
        while (nbytes > 0)  
        {  
                int                     segbytes;  
  
                if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))  
                {  
                        bool            use_existent;  
  
                        /*  
                         * fsync() and close current file before we switch to next one. We  
                         * would otherwise have to reopen this file to fsync it later  
                         */  
                        if (recvFile >= 0)  
                        {  
                                char            xlogfname[MAXFNAMELEN];  
  
                                XLogWalRcvFlush(false);  // 调用fsync数据,指刷到磁盘,并更新flush位置为老的write位置  
......  
                /* OK to write the logs */  
                errno = 0;  
  
                byteswritten = write(recvFile, buf, segbytes);  // 调用write接口,指刷到os dirty page cache  
                if (byteswritten <= 0)  
                {  
                        /* if write didn't set errno, assume no disk space */  
                        if (errno == 0)  
                                errno = ENOSPC;  
                        ereport(PANIC,  
                                        (errcode_for_file_access(),  
                                         errmsg("could not write to log segment %s "  
                                                        "at offset %u, length %lu: %m",  
                                                        XLogFileNameP(recvFileTLI, recvSegNo),  
                                                        recvOff, (unsigned long) segbytes)));  
                }  
  
                /* Update state for write */  
                recptr += byteswritten;  // 修正最新write位置  
  
                recvOff += byteswritten;  
                nbytes -= byteswritten;  
                buf += byteswritten;  
  
                LogstreamResult.Write = recptr;  // 更新Write位置  
...  
  
/*  
 * Flush the log to disk.  
 *  
 * If we're in the midst of dying, it's unwise to do anything that might throw  
 * an error, so we skip sending a reply in that case.  
 */  
static void  
XLogWalRcvFlush(bool dying)  
{  
        if (LogstreamResult.Flush < LogstreamResult.Write)  
        {  
                /* use volatile pointer to prevent code rearrangement */  
                volatile WalRcvData *walrcv = WalRcv;  
  
                issue_xlog_fsync(recvFile, recvSegNo);  
  
                LogstreamResult.Flush = LogstreamResult.Write;  -- 将Flush改为老的Write值  
......  

fsync wal接口调用

src/backend/access/transam/xlog.c

/*  
 * Issue appropriate kind of fsync (if any) for an XLOG output file.  
 *  
 * 'fd' is a file descriptor for the XLOG file to be fsync'd.  
 * 'log' and 'seg' are for error reporting purposes.  
 */  
void  
issue_xlog_fsync(int fd, XLogSegNo segno)  
{  
        switch (sync_method)  
        {  
                case SYNC_METHOD_FSYNC:  
                        if (pg_fsync_no_writethrough(fd) != 0)  
                                ereport(PANIC,  
                                                (errcode_for_file_access(),  
                                                 errmsg("could not fsync log file %s: %m",  
                                                                XLogFileNameP(ThisTimeLineID, segno))));  
                        break;  
#ifdef HAVE_FSYNC_WRITETHROUGH  
                case SYNC_METHOD_FSYNC_WRITETHROUGH:  
                        if (pg_fsync_writethrough(fd) != 0)  
                                ereport(PANIC,  
                                                (errcode_for_file_access(),  
                                          errmsg("could not fsync write-through log file %s: %m",  
                                                         XLogFileNameP(ThisTimeLineID, segno))));  
                        break;  
#endif  
#ifdef HAVE_FDATASYNC  
                case SYNC_METHOD_FDATASYNC:  
                        if (pg_fdatasync(fd) != 0)  
                                ereport(PANIC,  
                                                (errcode_for_file_access(),  
                                                 errmsg("could not fdatasync log file %s: %m",  
                                                                XLogFileNameP(ThisTimeLineID, segno))));  
                        break;  
#endif  
                case SYNC_METHOD_OPEN:  
                case SYNC_METHOD_OPEN_DSYNC:  
                        /* write synced it already */  
                        break;  
                default:  
                        elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);  
                        break;  
        }  
}  

以上调用详见src/backend/storage/file/fd.c

现在可以小结一下了

  • sent_location 已发送给standby的位置(standby请求的最新位置)

  • write_location standby已接收到,并已调用write刷到OS DIRTY PAGE的WAL最新位置

  • flush_location standby已接收到,并已调用已通过wal_sync_method配置的fsync接口刷到disk的WAL最新位置

  • replay_location standby已接收到,并已apply进行恢复的WAL最新位置

Flag Counter

digoal’s 大量PostgreSQL文章入口