PostgreSQL pg_stat_replication sent_location, write_location, flush_location, replay_location的差别
背景
PostgreSQL 的流复制统计信息中记录了4个WAL日志的位置信息,这些信息是standby的receiver进程反馈给primary的wal sender进程的。
统计视图如下:
pipeline=# \d+ pg_stat_replication
View "pg_catalog.pg_stat_replication"
Column | Type | Modifiers | Storage | Description
------------------+--------------------------+-----------+----------+-------------
pid | integer | | plain |
usesysid | oid | | plain |
usename | name | | plain |
application_name | text | | extended |
client_addr | inet | | main |
client_hostname | text | | extended |
client_port | integer | | plain |
backend_start | timestamp with time zone | | plain |
backend_xmin | xid | | plain |
state | text | | extended |
sent_location | pg_lsn | | plain |
write_location | pg_lsn | | plain |
flush_location | pg_lsn | | plain |
replay_location | pg_lsn | | plain |
sync_priority | integer | | plain |
sync_state | text | | extended |
View definition:
SELECT s.pid,
s.usesysid,
u.rolname AS usename,
s.application_name,
s.client_addr,
s.client_hostname,
s.client_port,
s.backend_start,
s.backend_xmin,
w.state,
w.sent_location,
w.write_location,
w.flush_location,
w.replay_location,
w.sync_priority,
w.sync_state
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin),
pg_authid u,
pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)
WHERE s.usesysid = u.oid AND s.pid = w.pid;
实际上这几个位置信息是从函数pg_stat_get_wal_senders获取到的,
这个函数的信息如下
pipeline=# \df+ pg_stat_get_wal_senders
List of functions
-[ RECORD 1 ]-------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Schema | pg_catalog
Name | pg_stat_get_wal_senders
Result data type | SETOF record
Argument data types | OUT pid integer, OUT state text, OUT sent_location pg_lsn, OUT write_location pg_lsn, OUT flush_location pg_lsn, OUT replay_location pg_lsn, OUT sync_priority integer, OUT sync_state text
Type | normal
Security | invoker
Volatility | stable
Owner | postgres
Language | internal
Source code | pg_stat_get_wal_senders
Description | statistics: information about currently active replication
pg_stat_get_wal_senders对应的源码如下
src/backend/replication/walsender.c
/*
* Returns activity of walsenders, including pids and xlog locations sent to
* standby servers.
*/
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
...
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
...
state = walsnd->state; // 4个位置信息的值是这样得来的
write = walsnd->write;
flush = walsnd->flush;
apply = walsnd->apply;
...
walsnd的定义如下
src/include/replication/walsender_private.h
/*
* Each walsender has a WalSnd struct in shared memory.
*/
typedef struct WalSnd
{
pid_t pid; /* this walsender's process id, or 0 */
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
bool needreload; /* does currently-open file need to be
* reloaded? */
/*
* The xlog locations that have been written, flushed, and applied by
* standby-side. These may be invalid if the standby-side has not offered
* values yet.
*/
XLogRecPtr write;
XLogRecPtr flush;
XLogRecPtr apply;
/* Protects shared variables shown above. */
slock_t mutex;
/*
* Pointer to the walsender's latch. Used by backends to wake up this
* walsender when it has work to do. NULL if the walsender isn't active.
*/
Latch *latch;
/*
* The priority order of the standby managed by this WALSender, as listed
* in synchronous_standby_names, or 0 if not-listed. Protected by
* SyncRepLock.
*/
int sync_standby_priority;
} WalSnd;
src/backend/replication/walsender.c
/*
* Regular reply from standby advising of WAL positions on standby server.
*/
static void
ProcessStandbyReplyMessage(void)
{
......
XLogRecPtr writePtr,
flushPtr,
applyPtr;
bool replyRequested;
/* the caller already consumed the msgtype byte */
writePtr = pq_getmsgint64(&reply_message); // 接收来自walreceiver的位置信息
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
(void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
replyRequested = pq_getmsgbyte(&reply_message);
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
(uint32) (writePtr >> 32), (uint32) writePtr,
(uint32) (flushPtr >> 32), (uint32) flushPtr,
(uint32) (applyPtr >> 32), (uint32) applyPtr,
replyRequested ? " (reply requested)" : "");
......
/*
* Update shared state for this WalSender process based on reply data from
* standby.
*/
{
/* use volatile pointer to prevent code rearrangement */
volatile WalSnd *walsnd = MyWalSnd;
SpinLockAcquire(&walsnd->mutex);
walsnd->write = writePtr; // 这几个数据实际上是walsender进程从walreceiver进程接收到的
walsnd->flush = flushPtr;
walsnd->apply = applyPtr;
SpinLockRelease(&walsnd->mutex);
}
....
walreceiver的位置信息是如何计算的,数据结构
src/backend/replication/walreceiver.c
/*
* LogstreamResult indicates the byte positions that we have already
* written/fsynced.
*/
static struct
{
XLogRecPtr Write; /* last byte + 1 written out in the standby */
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
// 发送位置信息给walsender
......
/*
* Send reply message to primary, indicating our current XLOG positions, oldest
* xmin and the current time.
*
* If 'force' is not set, the message is only sent if enough time has
* passed since last status update to reach wal_receiver_status_interval.
* If wal_receiver_status_interval is disabled altogether and 'force' is
* false, this is a no-op.
*
* If 'requestReply' is true, requests the server to reply immediately upon
* receiving this message. This is used for heartbearts, when approaching
* wal_receiver_timeout.
*/
static void
XLogWalRcvSendReply(bool force, bool requestReply)
{
.....
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
......
调用write接口和fsync接口。
/*
* Write XLOG data to disk.
*/
static void
XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
......
int startoff;
int byteswritten;
while (nbytes > 0)
{
int segbytes;
if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
{
bool use_existent;
/*
* fsync() and close current file before we switch to next one. We
* would otherwise have to reopen this file to fsync it later
*/
if (recvFile >= 0)
{
char xlogfname[MAXFNAMELEN];
XLogWalRcvFlush(false); // 调用fsync数据,指刷到磁盘,并更新flush位置为老的write位置
......
/* OK to write the logs */
errno = 0;
byteswritten = write(recvFile, buf, segbytes); // 调用write接口,指刷到os dirty page cache
if (byteswritten <= 0)
{
/* if write didn't set errno, assume no disk space */
if (errno == 0)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
XLogFileNameP(recvFileTLI, recvSegNo),
recvOff, (unsigned long) segbytes)));
}
/* Update state for write */
recptr += byteswritten; // 修正最新write位置
recvOff += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
LogstreamResult.Write = recptr; // 更新Write位置
...
/*
* Flush the log to disk.
*
* If we're in the midst of dying, it's unwise to do anything that might throw
* an error, so we skip sending a reply in that case.
*/
static void
XLogWalRcvFlush(bool dying)
{
if (LogstreamResult.Flush < LogstreamResult.Write)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write; -- 将Flush改为老的Write值
......
fsync wal接口调用
src/backend/access/transam/xlog.c
/*
* Issue appropriate kind of fsync (if any) for an XLOG output file.
*
* 'fd' is a file descriptor for the XLOG file to be fsync'd.
* 'log' and 'seg' are for error reporting purposes.
*/
void
issue_xlog_fsync(int fd, XLogSegNo segno)
{
switch (sync_method)
{
case SYNC_METHOD_FSYNC:
if (pg_fsync_no_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync log file %s: %m",
XLogFileNameP(ThisTimeLineID, segno))));
break;
#ifdef HAVE_FSYNC_WRITETHROUGH
case SYNC_METHOD_FSYNC_WRITETHROUGH:
if (pg_fsync_writethrough(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fsync write-through log file %s: %m",
XLogFileNameP(ThisTimeLineID, segno))));
break;
#endif
#ifdef HAVE_FDATASYNC
case SYNC_METHOD_FDATASYNC:
if (pg_fdatasync(fd) != 0)
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not fdatasync log file %s: %m",
XLogFileNameP(ThisTimeLineID, segno))));
break;
#endif
case SYNC_METHOD_OPEN:
case SYNC_METHOD_OPEN_DSYNC:
/* write synced it already */
break;
default:
elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
break;
}
}
以上调用详见src/backend/storage/file/fd.c
现在可以小结一下了
-
sent_location 已发送给standby的位置(standby请求的最新位置)
-
write_location standby已接收到,并已调用write刷到OS DIRTY PAGE的WAL最新位置
-
flush_location standby已接收到,并已调用已通过wal_sync_method配置的fsync接口刷到disk的WAL最新位置
-
replay_location standby已接收到,并已apply进行恢复的WAL最新位置