PostgreSQL How to get upstream node conninfo from standby node
背景
在PostgreSQL的upstream节点可以获得downstream节点的连接信息,当downstream节点主动连接upstream节点时,在upstream节点会有一个sender process负责发送XLOG信息,而这个sender进程其实就是backend process,在pg_stat_activity中可以查看。
pg_stat_replication的视图定义如下:
postgres=# \x
Expanded display is on.
postgres=# select pg_get_viewdef('pg_stat_replication');
-[ RECORD 1 ]--+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
pg_get_viewdef | SELECT s.pid,
| s.usesysid,
| u.rolname AS usename,
| s.application_name,
| s.client_addr,
| s.client_hostname,
| s.client_port,
| s.backend_start,
| s.backend_xmin,
| w.state,
| w.sent_location,
| w.write_location,
| w.flush_location,
| w.replay_location,
| w.sync_priority,
| w.sync_state
| FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, waiting, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin),
| pg_authid u,
| pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state)
| WHERE ((s.usesysid = u.oid) AND (s.pid = w.pid));
可以看出这里用到了pg_stat_get_activity来获得sender进程的信息,包含了client_addr。
pg_stat_get_wal_senders则是用来获取sender进程的PID的,当然这里还包含了由downstream节点的walreceiver进程发送过来的wal sent,write,flush,replay的位置以及其他信息等。
好了,那么downstream节点如何获取upstream节点的连接信息呢?
因为downstream节点在pg_stat_activity中没有walreceiver进程的信息,所以需要其他方法来获取upstream节点的信息。
其中一种方法是直接从walreceiver的共享内存中获取。既然downstream节点要连接upstream节点,它必然需要连接信息,这个连接信息存储在downstream节点的$PGDATA/recovery.conf文件中,数据库启动时startup进程会从这个文件中解析到primary_conninfo。
首先分析walreceiver进程,它实际上是一个动态的加载模块,如果在recovery.conf中配置了primary_conninfo, startup进程会唤醒它。
startup进程同时需要从recover.conf中读取primary_conninfo的信息,存储在WalRcvData结构中。
如下:
src/include/replication/walreceiver.h
/*
* MAXCONNINFO: maximum size of a connection string.
*
* XXX: Should this move to pg_config_manual.h?
*/
#define MAXCONNINFO 1024
/* Shared memory area for management of walreceiver process */
typedef struct
{
/*
* PID of currently active walreceiver process, its current state and
* start time (actually, the time at which it was requested to be
* started).
*/
pid_t pid;
WalRcvState walRcvState;
pg_time_t startTime;
/*
* receiveStart and receiveStartTLI indicate the first byte position and
* timeline that will be received. When startup process starts the
* walreceiver, it sets these to the point where it wants the streaming to
* begin.
*/
XLogRecPtr receiveStart;
TimeLineID receiveStartTLI;
/*
* receivedUpto-1 is the last byte position that has already been
* received, and receivedTLI is the timeline it came from. At the first
* startup of walreceiver, these are set to receiveStart and
* receiveStartTLI. After that, walreceiver updates these whenever it
* flushes the received WAL to disk.
*/
XLogRecPtr receivedUpto;
TimeLineID receivedTLI;
/*
* latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of
* receivedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not.
*/
XLogRecPtr latestChunkStart;
/*
* Time of send and receive of any message received.
*/
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
/*
* Latest reported end of WAL on the sender
*/
XLogRecPtr latestWalEnd;
TimestampTz latestWalEndTime;
/*
* connection string; is used for walreceiver to connect with the primary.
*/
char conninfo[MAXCONNINFO];
/*
* replication slot name; is also used for walreceiver to connect with the
* primary
*/
char slotname[NAMEDATALEN];
slock_t mutex; /* locks shared variables shown above */
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI).
*/
Latch latch;
} WalRcvData;
extern WalRcvData *WalRcv;
hook如下:
......
/* libpqwalreceiver hooks */
typedef void (*walrcv_connect_type) (char *conninfo);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint, char *slotname);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
typedef int (*walrcv_receive_type) (int timeout, char **buffer);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
extern PGDLLIMPORT walrcv_send_type walrcv_send;
typedef void (*walrcv_disconnect_type) (void);
extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) __attribute__((noreturn));
......
walreceiver模块使用WalRcv中的conninfo,连接到upstream节点。
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
/*
* Module load callback
*/
void
_PG_init(void)
{
......
walrcv_connect = libpqrcv_connect;
}
/*
* Establish the connection to the primary server for XLOG streaming
*/
static void
libpqrcv_connect(char *conninfo)
{
char conninfo_repl[MAXCONNINFO + 75];
/*
* Connect using deliberately undocumented parameter: replication. The
* database name is ignored by the server in replication mode, but specify
* "replication" for .pgpass lookup.
*/
snprintf(conninfo_repl, sizeof(conninfo_repl),
"%s dbname=replication replication=true fallback_application_name=walreceiver",
conninfo);
streamConn = PQconnectdb(conninfo_repl);
if (PQstatus(streamConn) != CONNECTION_OK)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
}
src/backend/replication/walreceiver.c
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
......
char conninfo[MAXCONNINFO];
......
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
......
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
......
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
walrcv_connect(conninfo);
DisableWalRcvImmediateExit();
......
我们可以定义一个函数,来读取WalRcv的信息,即可读取upstream节点的信息:
[root@digoal ~]# vi get_upstream_conninfo.c
#include "postgres.h"
#include <assert.h>
#include "fmgr.h"
#include "access/xlog.h"
#include "replication/walreceiver.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1(get_upstream_conninfo);
Datum
get_upstream_conninfo(PG_FUNCTION_ARGS)
{
assert(PG_NARGS() == 0);
if (!RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errhint("This functions can only be executed during recovery.")));
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
PG_RETURN_TEXT_P(cstring_to_text((char *) walrcv->conninfo));
}
编译:
[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -fPIC -c ./get_upstream_conninfo.c -o digoal.o
[root@digoal ~]# gcc -O3 -Wall -Wextra -Werror -I /opt/soft_bak/postgresql-9.4.4/src/include -g -shared digoal.o -o libdigoal.so
拷贝到$PGHOME/lib
[root@digoal ~]# cp libdigoal.so /opt/pgsql/lib/
创建函数:
postgres=# \set VERBOSITY verbose
postgres=# create or replace function get_upstream_conninfo() returns text as '$libdir/libdigoal.so', 'get_upstream_conninfo' language C STRICT;
CREATE FUNCTION
在主库执行直接返回错误即可:
postgres=# select get_upstream_conninfo();
ERROR: 55000: recovery is not in progress
HINT: This functions can only be executed during recovery.
LOCATION: get_upstream_conninfo, get_upstream_conninfo.c:25
在downstream节点执行,可以获取到upstream节点的信息:
postgres@digoal-> psql -h 127.0.0.1 -p 1922
psql (9.4.4)
Type "help" for help.
postgres=# \set VERBOSITY verbose
postgres=# select get_upstream_conninfo();
get_upstream_conninfo
----------------------------------------------------------------
host=192.168.150.128 port=1921 user=replica keepalives_idle=60
(1 row)
还有一种方法是直接读取recover.conf文件来解析连接信息,如下。
postgres=# select * from regexp_split_to_table(pg_read_file('recovery.conf'),'\n') t(a) where a ~ '^ *primary_conninfo';
a
-----------------------------------------------------------------------------------------------------------------------------------
primary_conninfo = 'host=192.168.150.128 port=1922 user=replica keepalives_idle=60' # e.g. 'host=localhost port=5432'
(1 row)
XLOG最后接收到的时间戳也可以在walrcv数据结构中获取,这个可以用来判断心跳。
#include "postgres.h"
#include <assert.h>
#include "fmgr.h"
#include "access/xlog.h"
#include "replication/walreceiver.h"
#include "utils/elog.h"
#include "utils/builtins.h"
#include "utils/timestamp.h"
#ifdef PG_MODULE_MAGIC
PG_MODULE_MAGIC;
#endif
PG_FUNCTION_INFO_V1(get_upstream_conninfo);
Datum
get_upstream_conninfo(PG_FUNCTION_ARGS)
{
assert(PG_NARGS() == 0);
if (!RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errhint("This functions can only be executed during recovery.")));
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
/* Fetch information required to start streaming */
// strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
PG_RETURN_TEXT_P(cstring_to_text((char *) walrcv->conninfo));
}
PG_FUNCTION_INFO_V1(get_rcv_timestamp);
Datum
get_rcv_timestamp(PG_FUNCTION_ARGS)
{
assert(PG_NARGS() == 0);
if (!RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errhint("This functions can only be executed during recovery.")));
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
PG_RETURN_TEXT_P(cstring_to_text(timestamptz_to_str(walrcv->latestWalEndTime)));
}
create or replace function get_rcv_timestamp() returns text as '$libdir/libdigoal.so', 'get_rcv_timestamp' language C STRICT;
postgres=# select get_rcv_timestamp();
get_rcv_timestamp
-------------------------------
2015-08-03 10:13:22.570627+08
(1 row)
参考
1. 解析配置文件的公共函数
src/backend/utils/misc/guc-file.c
*
* Read and parse a single configuration file. This function recurses
* to handle "include" directives.
*
* Input parameters:
* fp: file pointer from AllocateFile for the configuration file to parse
* config_file: absolute or relative path name of the configuration file
* depth: recursion depth (should be 0 in the outermost call)
* elevel: error logging level to use
* Output parameters:
* head_p, tail_p: head and tail of linked list of name/value pairs
*
* *head_p and *tail_p must be initialized to NULL before calling the outer
* recursion level. On exit, they contain a list of name-value pairs read
* from the input file(s).
*
* Returns TRUE if successful, FALSE if an error occurred. The error has
* already been ereport'd, it is only necessary for the caller to clean up
* its own state and release the ConfigVariable list.
*
* Note: if elevel >= ERROR then an error will not return control to the
* caller, so there is no need to check the return value in that case.
*/
bool
ParseConfigFp(FILE *fp, const char *config_file, int depth, int elevel,
ConfigVariable **head_p, ConfigVariable **tail_p)
{
......
}
2. 读取recovery,.conf
src/backend/access/transam/xlog.c
#define RECOVERY_COMMAND_FILE "recovery.conf"
/*
* See if there is a recovery command file (recovery.conf), and if so
* read in parameters for archive recovery and XLOG streaming.
*
* The file is parsed using the main configuration parser.
*/
static void
readRecoveryCommandFile(void)
{
for (item = head; item; item = item->next)
{
if (strcmp(item->name, "restore_command") == 0)
{
recoveryRestoreCommand = pstrdup(item->value);
ereport(DEBUG2,
(errmsg_internal("restore_command = '%s'",
recoveryRestoreCommand)));
}
......
else if (strcmp(item->name, "primary_conninfo") == 0)
{
PrimaryConnInfo = pstrdup(item->value);
ereport(DEBUG2,
(errmsg_internal("primary_conninfo = '%s'",
PrimaryConnInfo)));
}
}
3. 从流复制接收恢复数据库需要的XLOG数据
src/backend/access/transam/xlog.c
static bool
WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt, XLogRecPtr tliRecPtr)
{
......
if (PrimaryConnInfo)
{
XLogRecPtr ptr;
TimeLineID tli;
if (fetching_ckpt)
{
ptr = RedoStartLSN;
tli = ControlFile->checkPointCopy.ThisTimeLineID;
}
else
{
ptr = tliRecPtr;
tli = tliOfPointInHistory(tliRecPtr, expectedTLEs);
if (curFileTLI > 0 && tli < curFileTLI)
elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
(uint32) (ptr >> 32), (uint32) ptr,
tli, curFileTLI);
}
curFileTLI = tli;
RequestXLogStreaming(tli, ptr, PrimaryConnInfo,
PrimarySlotName);
receivedUpto = 0;
}