Get stream replication state from standby
背景
本文的目的是在standby节点获得几个stream replication相关的信息,从而判断当前standby节点的状态:
wal receiver进程启动时间,
wal receiver进程状态,
wal receiver进程的PID,
wal receiver进程连接到上游节点的连接信息,
接收到的XLOG的最大的地址,
已恢复的XLOG的最大地址,
standby节点的replay时间延迟(当前时间 减去 已恢复的XLOG的最后一条commit/abort/pitr target中的时间戳),
上游节点最后一次将WAL record传给wal receiver的时间。(仅仅当wal receiver进程状态为streaming时有意义,否则是wal receiver进程的初始启动时间)。
查询结果如下:
postgres=# select now(),
extract(epoch from now()) as now_epoch,
* from get_rcv_replication_stat() as
t (last_walend_time timestamptz,
last_recv_lsn pg_lsn,
last_apply_lsn pg_lsn,
last_apply_delay_ms int,
receiver_pid int,
receiver_state int,
receiver_start_epoch int8,
conninfo text );
-[ RECORD 1 ]--------+---------------------------------------------------------------
now | 2015-08-04 16:09:21.276554+08
now_epoch | 1438675761.27655
last_walend_time | 2015-08-04 16:09:20.752082+08
last_recv_lsn | 5/4B53BFF0
last_apply_lsn | 5/4B53BFF0
last_apply_delay_ms | 0
receiver_pid | 6667
receiver_state | 2
receiver_start_epoch | 1438675316
conninfo | host=192.168.150.128 port=1921 user=replica keepalives_idle=60
具体的实现
以下5类信息直接在PostgreSQL共享内存中获取(用于管理receiver进程的数据结构):
1. wal receiver进程启动时间,
2. wal receiver进程状态,
3. wal receiver进程的PID,
4. wal receiver进程连接到上游节点的连接信息,
5. 上游节点最后一次将WAL record传给wal receiver的时间。(仅仅当wal receiver进程状态为streaming时有意义,否则是wal receiver进程的初始启动时间,因为receiver和主节点断开后会自动重启,这个时间会被初始化掉)。
用于管理receiver进程的数据结构:
/* Shared memory area for management of walreceiver process */
typedef struct
{
/*
* PID of currently active walreceiver process, its current state and
* start time (actually, the time at which it was requested to be
* started). 获取receiver进程pid,状态,启动时间,(注意pg_time_t这个数据类型中存储的是time获取到的时间,是一个int64类型,和PostgreSQL的TimestampTz数据类型有区别. 请查看man 2 time)
*/
pid_t pid;
WalRcvState walRcvState;
pg_time_t startTime;
/*
* receiveStart and receiveStartTLI indicate the first byte position and
* timeline that will be received. When startup process starts the
* walreceiver, it sets these to the point where it wants the streaming to
* begin.
*/
XLogRecPtr receiveStart; // 准备从什么位置开始接收
TimeLineID receiveStartTLI;
/*
* receivedUpto-1 is the last byte position that has already been
* received, and receivedTLI is the timeline it came from. At the first
* startup of walreceiver, these are set to receiveStart and
* receiveStartTLI. After that, walreceiver updates these whenever it
* flushes the received WAL to disk.
*/
XLogRecPtr receivedUpto; // 已接收到什么位置
TimeLineID receivedTLI;
/*
* latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of
* receivedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not.
*/
XLogRecPtr latestChunkStart;
/*
* Time of send and receive of any message received.
*/
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
/*
* Latest reported end of WAL on the sender
*/
XLogRecPtr latestWalEnd; // 最后一次接收到的XLOG位置
TimestampTz latestWalEndTime; // 最后一次接收到WAL信息的时间(主节点发送WAL record包时的系统时间,所以在流复制的数据包中,存储为8个字节)
/*
* connection string; is used for walreceiver to connect with the primary.
*/
char conninfo[MAXCONNINFO]; // 连接信息
/*
* replication slot name; is also used for walreceiver to connect with the
* primary
*/
char slotname[NAMEDATALEN];
slock_t mutex; /* locks shared variables shown above */
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI).
*/
Latch latch;
} WalRcvData;
receiver进程的几种状态如下:
typedef enum
{
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
以下信息其实也来自receiver进程的共享内存:
1. 接收到的XLOG的最大的地址,
使用GetWalRcvWriteRecPtr()函数获得
/*
* Returns the last+1 byte position that walreceiver has written.
*
* Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not
* interested in that value may pass NULL for latestChunkStart. Same for
* receiveTLI.
*/
XLogRecPtr
GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
XLogRecPtr recptr;
SpinLockAcquire(&walrcv->mutex);
recptr = walrcv->receivedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
if (receiveTLI)
*receiveTLI = walrcv->receivedTLI;
SpinLockRelease(&walrcv->mutex);
return recptr;
}
以下2类信息来自控制文件:
1. 已恢复的XLOG的最大地址,
2. standby节点的replay时间延迟(当前时间 减去 已恢复的XLOG的最后一条commit/abort/pitr target中的时间戳),
对应代码
src/backend/access/transam/xlog.c
/*
* Get latest redo apply position.
*
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
GetXLogReplayRecPtr(TimeLineID *replayTLI)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
XLogRecPtr recptr;
TimeLineID tli;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->lastReplayedEndRecPtr;
tli = xlogctl->lastReplayedTLI;
SpinLockRelease(&xlogctl->info_lck);
if (replayTLI)
*replayTLI = tli;
return recptr;
}
src/backend/replication/walreceiverfuncs.c
/*
* Returns the replication apply delay in ms or -1
* if the apply delay info is not available
*/
int
GetReplicationApplyDelay(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
long secs;
int usecs;
TimestampTz chunckReplayStartTime;
SpinLockAcquire(&walrcv->mutex);
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL);
if (receivePtr == replayPtr)
return 0;
chunckReplayStartTime = GetCurrentChunkReplayStartTime();
if (chunckReplayStartTime == 0)
return -1;
TimestampDifference(chunckReplayStartTime,
GetCurrentTimestamp(),
&secs, &usecs);
return (((int) secs * 1000) + (usecs / 1000));
}
有了以上的信息,我们就可以编写一个函数来获得这些信息了。
如何写一个返回复合类型的C函数?
# vi get_upstream_conninfo.c
#include "postgres.h"
#include <assert.h>
#include "fmgr.h"
#include "access/xlog.h"
#include "replication/walreceiver.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "utils/pg_lsn.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1(get_rcv_replication_stat);
Datum
get_rcv_replication_stat(PG_FUNCTION_ARGS)
{
Assert(PG_NARGS() == 0); // 表示没有输入参数
if (!RecoveryInProgress()) // 在数据库处于恢复状态下时运行,否则不允许
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errhint("This functions can only be executed during recovery.")));
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv; // 共享内存中用于管理流复制的数据结构
TupleDesc tupdesc; // 创建一个行描述变量
Datum values[8]; // 创建一个存储值的Datum数组, 需要返回几个字段, 创建相应长度的数组
bool nulls[8]; // 创建一个数组, 表示对应的每个值是否为空
/* Initialise values and NULL flags arrays 初始化 */
MemSet(values, 0, sizeof(values));
MemSet(nulls, 0, sizeof(nulls));
/* Initialise attributes information in the tuple descriptor 定义字段类型和字段名, 到相应的头文件src/include/catalog/pg_type.h找到对应的类型 */
tupdesc = CreateTemplateTupleDesc(8, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "last_walend_time",
TIMESTAMPTZOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "last_recv_lsn",
LSNOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_apply_lsn",
LSNOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_apply_delay_ms",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 5, "receiver_pid",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "receiver_state",
INT4OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 7, "receiver_start_time",
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 8, "receiver_conninfo",
TEXTOID, -1, 0);
BlessTupleDesc(tupdesc); // 完成对返回类型的构造, 参考src/include/funcapi.h
// 接下来将每个值转换为对应的Datum存储到values数组, 对应的nulls数组仅当值为空时设置为true.
TimestampTz receipttime;
receipttime = walrcv->latestWalEndTime;
values[0] = TimestampTzGetDatum(receipttime);
XLogRecPtr applyPtr;
applyPtr = GetXLogReplayRecPtr(NULL);
if (recvPtr == 0)
nulls[2] = true;
else
values[2] = LSNGetDatum(applyPtr);
XLogRecPtr recvPtr;
recvPtr = GetWalRcvWriteRecPtr(NULL, NULL);
if (recvPtr == 0)
nulls[1] = true;
else
values[1] = LSNGetDatum(recvPtr);
int apply_delay_ms;
apply_delay_ms = GetReplicationApplyDelay();
if (apply_delay_ms == -1)
nulls[3] = true;
else
values[3] = Int32GetDatum(apply_delay_ms);
values[4] = Int32GetDatum(walrcv->pid);
values[5] = Int32GetDatum(walrcv->walRcvState);
values[6] = Int64GetDatum(walrcv->startTime);
values[7] = PointerGetDatum(cstring_to_text((char *)walrcv->conninfo));
// 返回
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(
heap_form_tuple(tupdesc, values, nulls)));
}
[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -fPIC -c ./get_upstream_conninfo.c -o digoal.o
[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -shared digoal.o -o libdigoal.so
[root@digoal ~]# cp libdigoal.so /opt/pgsql/lib
创建函数和视图
postgres=# create or replace function get_rcv_replication_stat() returns record as '$libdir/libdigoal.so', 'get_rcv_replication_stat' language C STRICT;
postgres=# create or replace view get_rcv_replication_stat as
select now(),
extract(epoch from now()) as now_epoch,
last_walend_time,
last_recv_lsn,
last_apply_lsn,
last_apply_delay_ms,
receiver_pid,
case receiver_state
when 0 then 'stopped'
when 1 then 'starting'
when 2 then 'streaming'
when 3 then 'waiting'
when 4 then 'restarting'
when 5 then 'stopping'
else null end as receiver_state,
receiver_start_epoch,
conninfo
from get_rcv_replication_stat() as
t (last_walend_time timestamptz,
last_recv_lsn pg_lsn,
last_apply_lsn pg_lsn,
last_apply_delay_ms int,
receiver_pid int,
receiver_state int,
receiver_start_epoch int8,
conninfo text );
在主节点查询
postgres=# select * from get_rcv_replication_stat ;
ERROR: recovery is not in progress
HINT: This functions can only be executed during recovery.
在standby节点查询
postgres=# select * from get_rcv_replication_stat ;
-[ RECORD 1 ]--------+---------------------------------------------------------------
now | 2015-08-04 17:00:49.520785+08
now_epoch | 1438678849.52079
last_walend_time | 2015-08-04 17:00:48.841503+08
last_recv_lsn | 5/4B568518
last_apply_lsn | 5/4B568518
last_apply_delay_ms | 0
receiver_pid | 6667
receiver_state | streaming
receiver_start_epoch | 1438675316
conninfo | host=192.168.150.128 port=1921 user=replica keepalives_idle=60
调试 , gdb
参考
1.
pg_last_xact_replay_timestamp
pg_current_xlog_insert_location
pg_current_xlog_location
pg_last_xlog_receive_location
pg_last_xlog_replay_location
2.
man 2 time
3. src/include/replication/walreceiver.h
/*
* Values for WalRcv->walRcvState.
*/
typedef enum
{
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
4. 《PostgreSQL extend function - C example》